当前位置: 首页 > news >正文

RocketMq消费者动态订阅topic

一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。

代码示例:

        // 初始化消费者,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");//指定RocketMq服务器地址consumer.setNamesrvAddr("localhost:9876");//每次拉取最多拉取消息10条 consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅topic下的说有tagconsumer.subscribe("topic_test", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(msg->{log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));})return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();

简单了解一下注册监听器:

消费消息的方法consumeMessage有两个参数

  1. List<MessageExt> msgs:消费一次性从消息队列里面拉取的消息列表,可通过setConsumeMessageBatchMaxSize()方法来配置每次最多拉取消息条数
  2. ConsumeConcurrentlyContext context: 是 RocketMQ 并发消费上下文对象,它包含了当前消息消费的一些上下文信息和控制参数。

返回ConsumeConcurrentlyStatus类型:

  1. CONSUME_SUCCESS:用于确定同一批消息全部消费成功
  2. RECONSUME_LATER:如果消息消费失败,同一批的所有消息将重新被消费,直到达到最大重试次数。如果同一批的单条消息消费失败返回了RECONSUME_LATER,那么同一批的其他消息即使被成功消费了也会被重新消费。

现在有这么一种情况:生产者的topic可以由用户动态配置,消费者具体消费哪些topic下的消息是不知道的,当消费者正在运行时可以实现动态订阅topic。

实现方式:

@Slf4j
@Component
@Getter
public class DynamicMonitoringEvent {private DefaultMQPushConsumer consumer;private final List<String> topics = new CopyOnWriteArrayList<>();@PostConstructprivate void init(){getInstance();start();}public  DefaultMQPushConsumer getInstance(){log.info("初始化消费者");if (consumer == null){DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer(group);dconsumer.setNamesrvAddr(nameServer);dconsumer.setConsumeMessageBatchMaxSize(10);dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);dconsumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));//处理业务逻辑}return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer = dconsumer;}return consumer;}public void start(){try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}}// 动态设定topic的订阅// 注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖public void setConsumer(String topic) {try {if (!StringUtils.hasText(topic) || topics.contains(topic)){return;}topics.add(topic);// 动态添加topic监听consumer.subscribe(topic, "*");//也可以重新注册监听器//  dconsumer.registerMessageListener(new MessageListenerConcurrently() {//     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,//                                                     ConsumeConcurrentlyContext context) {//         for (MessageExt msg : msgs){//                 log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());//                 log.info("--body:"+new String(msg.getBody()));//         }//         return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }log.info("动态订阅topic:{} 成功", topic);} catch (MQClientException e) {log.error("动态订阅topic:{} 失败", topic, e);}}
}

在外部调用setConsumer方法实现动态订阅topic

@Autowired
private DynamicMonitoringEvent dynamicMonitoringEvent;public void setDynamicTopic() {Set<String> topics = //获取topic列表for (String topic : topics){dynamicMonitoringEvent.setConsumer(topic);}}

注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖,所以我们需要做一下判断,防止重复订阅同一个topic,如果注册了新的监听器,会覆盖之前的监听器,一个消费者只能又一个监听器

http://www.dtcms.com/a/339975.html

相关文章:

  • 【PyTorch项目实战】OpenNMT本地机器翻译框架 —— 支持本地部署和自定义训练
  • 千里马招标网站的核心技术分析
  • qwen2.5vl(1): 环境安装及运行
  • 二维图像处理(完整版2)
  • iOS安全和逆向系列教程 第22篇:iOS应用网络安全与通信保护
  • 自学python第10天
  • 路由器最大传输速率测试
  • VS Code 终端完全指南
  • Pandas中数据清理、连接数据以及合并多个数据集的方法
  • 仿新浪微博 typecho 主题模板源码
  • 1. AutoSAR 技术学习
  • Spring AOP核心原理与实战指南
  • 任务十一 搜索页面开发
  • Incredibuild 新增 Unity 支持:击破构建时间过长的痛点
  • AutoSarAP状态管理的状态机能否理解成C++的类?
  • 电视系统:开启视听新时代
  • 一个多功能的文件分享工具--zdir手动部署教程
  • 垂直领域大模型构建:法律行业“类ChatGPT”系统的训练与落地
  • el-table合并单元格
  • 接口自动化测试大全(python+pytest+allure)
  • Angular极速入门
  • 【CUDA教程--3】通过简单的矩阵运算入门CUDA
  • cursor+mcp-clickhouse进行数据分析
  • Spring循环依赖源码调试详解,用两级缓存代替三级缓存
  • JB4-9-任务调度
  • 网络通信基础:从数据链路层到传输层
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘paramiko’问题
  • Leetcode 3652. Best Time to Buy and Sell Stock using Strategy
  • 【20250819】mathtype的使用
  • Sklearn 机器学习 房价预估 计算房价和特征值的相关性