当前位置: 首页 > news >正文

RocketMQ消息模型

消息确认机制

思考:

对于生产者:1.我的消息怎么才是发送成功的呢?

对于消费者:2.如何控制自己的消费进度?

消息的生产端采用消息确认加多次重试机制保障消息正常发送到RocketMQ

生产者三种发送方式:

1.单一发送 sendOneway

生产者只负责发送,是否成功不做任何处理,效率高,适合允许消息丢失的场景。例如日志

public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();}
}
2.同步发送send

同步模式下生产者往broker发送消息后,会阻塞当前线程,等待broker返回相应结果,等保证消息的绝对安全,例如失败了可以重试,但是效率确实太低。

注:失败重发消息的时候,最好要带上唯一的系统标识,这样在消费者端,才能自行做幂等判断。也就是用具有业务含义的OrderID这样的字段来判断消息有没有被重复处理

 SendResult sendResult = producer.send(msg);public enum SendStatus {SEND_OK,
//:消息写入主 broker 成功,但同步刷盘超时。FLUSH_DISK_TIMEOUT,
//含义:消息写入主 broker 成功,但在设定时间内没有等到从 broker 的确认(即主从同步超时)。FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE;private SendStatus() {}
}
3.异步发送

异步发送机制下,生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。

onSuccess和onException。当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。

他的弊端:

1.触发onException并不表示就一定是发送失败了,默认超时时间是3秒,会因为网络问题,Broker处理满了造成超时问题,可以通过producer.setSendMsgTimeout();来设置时间。

2.收到生产者producer.shutdown的影响,SendCallback还是需要附属于生产者的主线程才能执行。

   producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});

Consumer采用状态确认机制保证消费者一定能正确处理消息

返回的是一个枚举值,如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

   consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

具体细节:

1.重试次数限制(默认16)+死信队列(超过重试上限进入这里,需要人工处理或删除)。

2.为了防止阻塞正常的消息,会给消费失败的消费者创建一个专属的 %RETRY% 重试 Topic。

3.RocketMQ中消费者组都是订阅主题和消费逻辑相同的备份,所以当消息重试时,Broker只往消费者组中随意一个实例推送即可,这也是重试能够正确运行的基础,MQDefaultMQConsumer中并没有要求消费者组不能重复,也就是说可以实现一些,订阅主题和逻辑完全不同的消费服务,共同组成一个消费者组,这种情况不会报错,但是消息处理逻辑无法保证,需要注意。

4.消费是否成功有枚举值判断,因此建议设置为同步方式处理逻辑。

消费者组也可以自行指定起始消费点位

建立新的consumer,通过下面的方法,确认从何时起开始消费。

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumerTimestamp("20131223171201"); //如果设定的时间,需要加时间参数public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET, //从对列的最后一条消息开始消费CONSUME_FROM_FIRST_OFFSET, //从对列的第一条消息开始消费CONSUME_FROM_TIMESTAMP; //从某一个时间点开始重新消费
}

注:如果消费者设置了 CONSUME_FROM_LAST_OFFSET,此时 该队列的最小 offset 是 0,最大 offset 不为 0(说明这个队列已经有消息了),那么是能够拉到旧消息。

广播消息

广播模型和集群模型是rocketMQ的消费者端处理消息的两种模式;

集群模式下一个消息只能被多个消费者实例 共同 处理一次。

广播模式下,一个消息,会推送给所有的消费者实例处理,不再关心消费者组。

核心代码(默认集群模式):

​ 启动多个消费者,广播模式下,这些消费者都会消费一次消息。
consumer.setMessageModel(MessageModel.BROADCASTING);

实现思路:

集群模式下,broker会为每一个consumerGroup分配一个offset自己管理,当consumer消费时根据offset获取数据,这样就可以保证一个消息只能被consumerGroup消费一次。广播模式下这个offset在consumer端管理,这样Broker推送消息时,就不在管理ConsumerGroup,只要Consumer来拉取消息,就返回对应的消息。

注意点:

Broker不维护消费进度,意味着如果失败,也无法进行重试。

Consumer端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。如果Offset丢了,Consuer依然可以拉取消息。

​ 比如生产者发送了1~10号消息。消费者当消费到第6个时宕机了。当他重启时,Broker端已经把第10个消息都推送完成了。如果消费者端维护好了自己的Offset,那么他就可以在服务重启时,重新向Broker申请6号到10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运行,但是6到10号消息就无法再申请了。后续这个消费者就只能获取10号以后的消息。

过滤消息

场景:同一个topic有多种不同消息,我们希望消耗某些消息。

1.Tag

生产者发送消息时,增加Tag属性,效率很快

 String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("TagFilterTest",tags[i %& tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}

消费者拉自己感兴趣的

consumer.subscribe("TagFilterTest", "TagA");
2.SQL过滤

由于Tag只能写一写简单的过滤因此有了SQL过滤

生产者发送加上             
msg.putUserProperty("a", String.valueOf(i));消费者sql过滤consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));

注:使用SQL进行过滤,需要在Broker端,将参数enablePropertyFilter设置成true。这个参数默认是false。

实现思路:

​实际上,Tags和用户自定义的属性,都是随着消息一起传递的,所以,消费者端是可以拿到消息的Tags和自定义属性;

        consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
这两行System.out.println(msg.getTags());System.out.println(msg.getProperties());}System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

注意点:

1.消息过滤,其实在Broker端和在Consumer端都可以做。Consumer端也可以自行获取用户属性,不感兴趣的消息,直接返回不成功的状态,跳过该消息就行了。但是RocketMQ会在Broker端完成过滤条件的判断,只将Consumer感兴趣的消息推送给Consumer。这样的好处是减少了不必要的网络IO,但是缺点是加大了服务端的压力。不过在RocketMQ的良好设计下,更建议使用消息过滤机制。

​ 2.Consumer不感兴趣的消息并不表示直接丢弃。通常是需要在同一个消费者组,定制另外的消费者实例,消费那些剩下的消息。但是,如果一直没有另外的Consumer,那么,Broker端还是会推进Offset

3.使用Tag过滤时,如果希望匹配多个Tag,可以使用两个竖线(||)连接多个Tag值。另外,也可以使用星号(*)匹配所有。

​ 4.使用SQL顾虑时,SQL语句是按照SQL92标准来执行的。SQL语句中支持一些常见的基本操作:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

顺序消息机制

MessageQueueSelector和MessageListenerOrderly

生产者:

for (int i = 0; i < 10; i++) {int orderId = i;for(int j = 0 ; j <= 5 ; j ++){Message msg =new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id & mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}}

消费者

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for(MessageExt msg:msgs){System.out.println("收到消息内容 "+new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

示意图: 

生产者和消费者共同配置实现的

1.生产者只有将一批有顺序要求的消息,放到同一个MesasgeQueue上,通过MessageQueue的FIFO特性保证这一批消息的顺序。(其他的方式为轮询)

2.消费者需要实现MessageListenerOrderly接口,实际上在服务端,处理MessageListenerOrderly时,会给一个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下一个MessageQueue的消息。

局部有序,尽可能将效率分散开;

消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序,如果出现问题建议 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT等待一会;

延迟消息

Rocket特色功能

当前版本RocketMQ提供了两种实现延迟消息的机制,一种是指定固定的延迟级别,一种是指定消息发送时间。

 // 指定固定的延迟级别Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));message.setDelayTimeLevel(3); //10秒之后发送// 指定消息发送时间Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L); //指定10秒之后的时间点

延迟级别:

实现思路:

对于指定固定延迟级别的延迟消息,RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这里每个对列就对应了一种延迟级别。然后每次扫描这18个队列里的消息,进行延迟操作就可以了。

另外指定时间点的延迟消息,RocketMQ是通过时间轮算法实现的

批量消息

 		List<Message> messages = new ArrayList<>(MESSAGE_COUNT);for (int i = 0; i < MESSAGE_COUNT; i++) {messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes(StandardCharsets.UTF_8)));}//split the large batch into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s", sendResult);}

同一批的topic必须相同,大小不要太大,尽量不超过1M。

事物消息

ACL权限控制

1.配置topic的prem属性,可以设置权限; 2 禁读禁订阅 4:仅订阅 6:全部

2.在broker.conf中打开acl的标志:aclEnable=true。然后就可以用他提供的plain_acl.yml来进行权限配置了。并且这个配置文件是热加载

更加可力度的对用户进行控制

http://www.dtcms.com/a/279485.html

相关文章:

  • 选择一个系统作为主数据源的优势与考量
  • Java-ThreadLocal
  • 微信131~140
  • Linux连接跟踪Conntrack:原理、应用与内核实现
  • OSPF高级特性之GR
  • echarts应用到swiper 轮播图中,每次鼠标一点击图表所在slide,图表就会消失
  • LSV负载均衡
  • PostgreSQL ExecInitIndexScan 函数解析
  • k8s-高级调度(二)
  • 如何使用Cisco DevNet提供的免费ACI学习实验室(Learning Labs)?(Grok3 回答)
  • PostgreSQL 16 Administration Cookbook 读书笔记:第6章 Security
  • DLL 文件 OSError: [WinError 1401] 应用程序无法启动问题解决
  • 七、深度学习——RNN
  • HTTPS 协议原理
  • ZYNQ双核通信终极指南:FreeRTOS移植+OpenAMP双核通信+固化实战
  • 一文明白AI、AIGC、LLM、GPT、Agent、workFlow、MCP、RAG概念与关系
  • 浏览器防录屏是怎样提高视频安全性?
  • 现有医疗AI记忆、规划与工具使用的创新路径分析
  • 【Linux网络】多路转接poll、epoll
  • vue3 JavaScript 获取 el-table 单元格 赋红色外框
  • mac上用datagrip连接es
  • MFC/C++语言怎么比较CString类型最后一个字符
  • K8S的平台核心架构思想[面向抽象编程]
  • LVS(Linux Virtual Server)集群技术详解
  • linux 内核: 访问当前进程的 task_struct
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 架构搭建
  • C++-linux 6.makefile和cmake
  • 深入掌握Performance面板与LCP/FCP指标优化指南
  • 学习笔记——农作物遥感识别与大范围农作物类别制图的若干关键问题
  • 计算两个经纬度之间的距离(JavaScript 实现)