当前位置: 首页 > news >正文

kafka动态监听主题

简单版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class DynamicKafkaListenerService {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener((MessageListener<String, String>) record -> {
            System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }
}

手动ack版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;

@Service
public class DynamicKafkaListenerService {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置消息监听器为 AcknowledgingMessageListener
        containerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                // 模拟消息处理逻辑
                // 处理完成后手动确认消息
                if (ack != null) {
                    ack.acknowledge();
                }
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }
}

批量处理版本 

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置批量消息监听器
        containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                }
                // 模拟消息处理逻辑
                // 处理完成后手动批量确认消息
                ack.acknowledge();
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
    }

可关闭版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class DynamicKafkaListenerService {


    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    // 用于保存每个主题对应的监听器容器
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();

    /**
     * 开启一个监听
     */
    public void registerListener(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        // 设置手动确认模式
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置批量消息监听器
        containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
            try {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
                }
                // 模拟消息处理逻辑
                // 处理完成后手动批量确认消息
                ack.acknowledge();
            } catch (Exception e) {
                // 处理异常情况,例如记录日志或重试等
                System.err.println("消息处理失败: " + e.getMessage());
            }
        });
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        container.setBeanName(topic + "-listener");
        container.start();
        // 将监听器容器保存到 map 中
        containerMap.put(topic, container);
    }

    /**
     * 关闭一个监听
     */
    public void stopListener(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);
        if (container != null && container.isRunning()) {
            container.stop();
            // 从 map 中移除已停止的监听器容器
            containerMap.remove(topic);
        }
    }
}

调用添加监听

    /**
     * 配置详情
     */
    @GetMapping("/getModelZdyConfInfo")
    public String getModelZdyConfInfo(String topic) {
        dynamicKafkaListenerService.registerListener(topic);
        return "添加" + topic + "监听成功";
    }

http://www.dtcms.com/a/19688.html

相关文章:

  • Flutter PIP 插件 ---- iOS Video Call
  • w211医疗报销系统的设计与实现
  • YOLOv5 目标检测优化:降低误检与漏检
  • 编程考古-TurboPascal中Turbo到底是什么
  • 车载OS简介
  • binance python
  • PVE 磁盘管理详解:从 Windows 到 Linux 的思维转换(文末附资源)
  • 力扣动态规划-31【算法学习day.125】
  • Python VsCode DeepSeek接入
  • Qt MainWindow
  • java集合框架之Map系列
  • 华为IPD简介
  • LeetCode 232: 用栈实现队列
  • w210基于Springboot开发的精简博客系统的设计与实现
  • windows10本地的JMeter+Influxdb+Grafana压测性能测试,【亲测,避坑】
  • 梅花易数【邵雍】起卦方法
  • OpenMetadata 获取 MySQL 数据库表血缘关系详解
  • 【kafka系列】broker
  • DeepSeek官方推荐的AI集成系统
  • Windows安装Rust环境(详细教程)
  • 解读 Flink Source 接口重构后的 KafkaSource
  • AcWing——61. 最长不含重复字符的子字符串
  • 基于AIOHTTP、Websocket和Vue3一步步实现web部署平台,无延迟控制台输出,接近原生SSH连接
  • 刷题记录(回顾)HOT100 二叉树-10: ​199. 二叉树的右视图
  • 【仪器仪表专题】案例:示波器控制通道开关SCPI命令不同的原因
  • 使用verilog 实现cordic 算法 ---- 向量模式
  • 【java】方法--拷贝数组
  • Hami项目开发笔记
  • 1.从零开始学会Vue--{{基础指令}}
  • 浅介绍redis特性