RocketMq消费者动态订阅topic
一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。
代码示例:
// 初始化消费者,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");//指定RocketMq服务器地址consumer.setNamesrvAddr("localhost:9876");//每次拉取最多拉取消息10条 consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅topic下的说有tagconsumer.subscribe("topic_test", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(msg->{log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));})return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();
简单了解一下注册监听器:
消费消息的方法consumeMessage有两个参数
- List<MessageExt> msgs:消费一次性从消息队列里面拉取的消息列表,可通过setConsumeMessageBatchMaxSize()方法来配置每次最多拉取消息条数
- ConsumeConcurrentlyContext context: 是 RocketMQ 并发消费上下文对象,它包含了当前消息消费的一些上下文信息和控制参数。
返回ConsumeConcurrentlyStatus类型:
- CONSUME_SUCCESS:用于确定同一批消息全部消费成功
- RECONSUME_LATER:如果消息消费失败,同一批的所有消息将重新被消费,直到达到最大重试次数。如果同一批的单条消息消费失败返回了RECONSUME_LATER,那么同一批的其他消息即使被成功消费了也会被重新消费。
现在有这么一种情况:生产者的topic可以由用户动态配置,消费者具体消费哪些topic下的消息是不知道的,当消费者正在运行时可以实现动态订阅topic。
实现方式:
@Slf4j
@Component
@Getter
public class DynamicMonitoringEvent {private DefaultMQPushConsumer consumer;private final List<String> topics = new CopyOnWriteArrayList<>();@PostConstructprivate void init(){getInstance();start();}public DefaultMQPushConsumer getInstance(){log.info("初始化消费者");if (consumer == null){DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer(group);dconsumer.setNamesrvAddr(nameServer);dconsumer.setConsumeMessageBatchMaxSize(10);dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);dconsumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));//处理业务逻辑}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer = dconsumer;}return consumer;}public void start(){try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}}// 动态设定topic的订阅// 注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖public void setConsumer(String topic) {try {if (!StringUtils.hasText(topic) || topics.contains(topic)){return;}topics.add(topic);// 动态添加topic监听consumer.subscribe(topic, "*");//也可以重新注册监听器// dconsumer.registerMessageListener(new MessageListenerConcurrently() {// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,// ConsumeConcurrentlyContext context) {// for (MessageExt msg : msgs){// log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());// log.info("--body:"+new String(msg.getBody()));// }// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }log.info("动态订阅topic:{} 成功", topic);} catch (MQClientException e) {log.error("动态订阅topic:{} 失败", topic, e);}}
}
在外部调用setConsumer方法实现动态订阅topic
@Autowired
private DynamicMonitoringEvent dynamicMonitoringEvent;public void setDynamicTopic() {Set<String> topics = //获取topic列表for (String topic : topics){dynamicMonitoringEvent.setConsumer(topic);}}
注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖,所以我们需要做一下判断,防止重复订阅同一个topic,如果注册了新的监听器,会覆盖之前的监听器,一个消费者只能又一个监听器