当前位置: 首页 > news >正文

动态订阅kafka mq实现(消费者组动态上下线)

和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码

    /**
     * Kafka topic container集合
     */
    private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();

	public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {
	/*
		BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置
		包含topic、sever、client,自定义即可
	*/
        ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(true);
        if (consumerFactory == null) {
            return;
        }
        factory.setConsumerFactory(consumerFactory);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());
        //设置为false,解决client后自动加-0的问题
        container.setAlwaysClientIdSuffix(false);
        container.setupMessageListener((MessageListener<String, String>) record -> {
           //TODO:你的消费逻辑,record即为消息体
                }
            } catch (IllegalArgumentException e) {
                log.error("registerKafkaListeners JSON解析失败", e);
            } catch (NullPointerException e) {
                log.error("registerKafkaListeners 消息为空或部分字段缺失", e);
            } catch (Exception e) {
                log.error("registerKafkaListeners 注册异常", e);
            }
        });
        container.start();
        topics.put(binlogPortDatabaseConfig.getTopic(), container);
    }


    public void factoryDel(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);
        if (!topic.isEmpty()) {
            container.stop();
            topics.remove(topic);
        }
    }

    public ConsumerFactory<String, String> createConsumerFactory() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);
        if (SystemEnvUtil.isTest()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);
        }
        if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);
        }
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));

        Map<String, Object> configMap = new java.util.HashMap<>();
        for (Map.Entry<Object, Object> entry : props.entrySet()) {
            configMap.put((String) entry.getKey(), entry.getValue());
        }
        return new DefaultKafkaConsumerFactory<>(configMap);
    }



http://www.dtcms.com/a/32147.html

相关文章:

  • 代码随想录-训练营-day35
  • 基于ffmpeg+openGL ES实现的视频编辑工具-添加转场(九)
  • C语言进阶习题【3】(7预处理)——写一个宏计算结构体变量相对于首地址的偏移
  • 先进制造aps专题三十 用免费生产排程软件isuperaps进行长期生产计划制定
  • 计算机图形学:实验环境配置
  • 基于Matlab实现串口实时显示波形GUI界面(源码)
  • Linux 驱动入门(6)—— IRDA(红外遥控模块)驱动
  • 代码随想录算法训练营day40(补0208)
  • “死”循环(查漏补缺)
  • 055 SpringCache
  • cs106x-lecture14(Autumn 2017)-SPL实现
  • 【Java进阶学习 第五篇】JDK8、9中的接口新特性
  • ARM Cortex-M3 技术解析:核寄存器R1-R15介绍及使用
  • 第五章:工程化实践 - 第三节 - Tailwind CSS 大型项目最佳实践
  • kafka+spring cloud stream 发送接收消息
  • 华为OD机试真题-相对开音节-OD统一考试(E卷)
  • Meterpreter之getsystem命令提权原理详解
  • Zotero 快速参考文献导出(特定期刊引用)
  • 区块链相关方法-波士顿矩阵 (BCG Matrix)
  • Codes 开源免费研发项目管理平台 2025年第一个大版本3.0.0 版本发布及创新的轻IPD实现
  • 在LangFlow中集成OpenAI Compatible API类型的大语言模型
  • 不同类型的网站选择不同的服务器,那么应该怎么选择服务器呢?
  • STM32-心知天气项目
  • python包重要修改
  • 如何把windows机器作为SSH客户端免密登录
  • Markdown使用方法文字版解读
  • 数据表的存储过程和函数介绍
  • OpenBMC:BmcWeb app.run
  • tortoiseGit的使用和上传拉取
  • 使用docker开发镜像编译