当前位置: 首页 > news >正文

北京联通网站备案php 网站模板

北京联通网站备案,php 网站模板,wordpress标签搜索引擎,建设厅网站装修合同模板一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。代码示例:// 初始化消费者,指定消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer("my_consumer_gro…

一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。

代码示例:

        // 初始化消费者,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");//指定RocketMq服务器地址consumer.setNamesrvAddr("localhost:9876");//每次拉取最多拉取消息10条 consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅topic下的说有tagconsumer.subscribe("topic_test", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(msg->{log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));})return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();

简单了解一下注册监听器:

消费消息的方法consumeMessage有两个参数

  1. List<MessageExt> msgs:消费一次性从消息队列里面拉取的消息列表,可通过setConsumeMessageBatchMaxSize()方法来配置每次最多拉取消息条数
  2. ConsumeConcurrentlyContext context: 是 RocketMQ 并发消费上下文对象,它包含了当前消息消费的一些上下文信息和控制参数。

返回ConsumeConcurrentlyStatus类型:

  1. CONSUME_SUCCESS:用于确定同一批消息全部消费成功
  2. RECONSUME_LATER:如果消息消费失败,同一批的所有消息将重新被消费,直到达到最大重试次数。如果同一批的单条消息消费失败返回了RECONSUME_LATER,那么同一批的其他消息即使被成功消费了也会被重新消费。

现在有这么一种情况:生产者的topic可以由用户动态配置,消费者具体消费哪些topic下的消息是不知道的,当消费者正在运行时可以实现动态订阅topic。

实现方式:

@Slf4j
@Component
@Getter
public class DynamicMonitoringEvent {private DefaultMQPushConsumer consumer;private final List<String> topics = new CopyOnWriteArrayList<>();@PostConstructprivate void init(){getInstance();start();}public  DefaultMQPushConsumer getInstance(){log.info("初始化消费者");if (consumer == null){DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer(group);dconsumer.setNamesrvAddr(nameServer);dconsumer.setConsumeMessageBatchMaxSize(10);dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);dconsumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));//处理业务逻辑}return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer = dconsumer;}return consumer;}public void start(){try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}}// 动态设定topic的订阅// 注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖public void setConsumer(String topic) {try {if (!StringUtils.hasText(topic) || topics.contains(topic)){return;}topics.add(topic);// 动态添加topic监听consumer.subscribe(topic, "*");//也可以重新注册监听器//  dconsumer.registerMessageListener(new MessageListenerConcurrently() {//     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,//                                                     ConsumeConcurrentlyContext context) {//         for (MessageExt msg : msgs){//                 log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());//                 log.info("--body:"+new String(msg.getBody()));//         }//         return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }log.info("动态订阅topic:{} 成功", topic);} catch (MQClientException e) {log.error("动态订阅topic:{} 失败", topic, e);}}
}

在外部调用setConsumer方法实现动态订阅topic

@Autowired
private DynamicMonitoringEvent dynamicMonitoringEvent;public void setDynamicTopic() {Set<String> topics = //获取topic列表for (String topic : topics){dynamicMonitoringEvent.setConsumer(topic);}}

注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖,所以我们需要做一下判断,防止重复订阅同一个topic,如果注册了新的监听器,会覆盖之前的监听器,一个消费者只能又一个监听器

http://www.dtcms.com/a/420157.html

相关文章:

  • 清河做网站哪里好齐河网站建设
  • CodeBuddy CLI工具深度测评:从零到一实现鸿蒙游戏开发实践
  • 网站前后台模板服务专业的网站建设服务
  • 资讯网站策划怎么写龙岗公司网站
  • fr后缀网站在线做logo
  • 茶楼 网站免费网站建设域名
  • NeRF+3DGS——提升渲染质量与压缩模型参数
  • Appcelerator打包ipa有哪些优势
  • 五华建设银行网站宁波seo网络推广优化价格
  • 网站目录结构说明上海网络建站模板
  • 佛山网站制作流程推广网站实例
  • 阿里云服务器做网站djangowordpress后台500错误
  • Nginx proxy_pass 末尾斜杠(/)
  • 【MySQL】图书管理系统
  • 1.简述网站建设流程网站内页做友链
  • 做淘宝客要自己的网站建设网站的简单编程语言
  • 数据结构 01 线性表
  • 为什么有的网站打不开WordPress京东自动转链插件
  • MySQL——数据库基础与库的操作
  • 网站建站上市公司国外论文类网站有哪些方面
  • 网站建设有哪些分工分建筑网站、
  • asp网站改php网站方法wordpress禁用修正版
  • 堆 动态内存 超级玛丽demo7
  • 空壳网站查询WordPress下拉菜单栏
  • 《高并发架构实战课》学习笔记
  • 网站备案 人工审核平面设计需要用到的软件
  • 网站301跳转怎么做的安阳市网站建设
  • 参考资料:Linux系统U盘拔出识别慢问题
  • 银川公司网站建设广州万安建设监理有限公司网站
  • 专业做鞋子网站苏州网站建设2万起