当前位置: 首页 > news >正文

SpringBoot整合RocketMQ与客户端注意事项

SpringBoot整合RocketMQ

引入依赖(5.3.0比较稳定)

    <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies>

配置文件

rocketmq.name-server=192.168.234.141:9876
#配了发送者,才会初始化RocketMQTemplate或者不配置,自己根据业务注入不同的RocketMQTemplate
rocketmq.producer.group=springBootGroup
rocketmq.producer.send-message-timeout=10000
rocketmq.#如果这里不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup#rocketmq.pull-consumer.group=
#rocketmq.pull-consumer.topic=

消费者注册到spring容器

@Component
//@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}

生产者消息在需要生产消息的地方注入生产者调用方法

@Component
public class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resourceprivate DefaultMQProducer defaultMQProducer;public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);//pull消息需要配置rocketmq.consumer.topic和rocketmq.consumer.group参数//this.rocketMQTemplate.receive();//事务消息//this.rocketMQTemplate.sendMessageInTransaction}}

事务监听
2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。

//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
//@RocketMQTransactionListener(rocketMQTemplateBeanName = "ExtRocketMQTemplate")
@RocketMQTransactionListener()
public class MyTransactionImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {}
}

客户端注意事项

消息的ID,Key和Tag

  • MessageId是RocketMQ内部给每条消息分配的唯⼀索引
  • key是Message中的补充信息
    针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能。
  • 通过Tag进⾏消息过滤性能⾮常⾼

最佳实践

⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤:message.setTags(“TagA”)。

Kafka的⼀⼤问题是Topic过多,会造成Partition⽂件过多,影响性能。⽽RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多,还是会加⼤RocketMQ的元数据维护的性能消耗。所以,在使⽤时,还是需要对Topic进⾏合理的分配。使⽤Tag区分消息时,尽量直接使⽤Tag过滤,不要使⽤复杂的SQL过滤。因为消息过滤机制虽然可以减少⽹络IO,但是毕竟会加⼤Broker端的消息处理压⼒。所以,消息过滤的逻辑,还是越简单越好。

消费者端进行幂等控制

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多⼀次:每条消息最多只会被消费⼀次
  • at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
  • exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次

消息幂等的必要性

在互联⽹应⽤中,尤其在⽹络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复
    当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 投递时消息重复
    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。 为了保证消息⾄少被消费⼀次,消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

而要处理这个问题,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进行传递。

关注错误消息重试

多关注重试队列,可以及时了解消费者端的运⾏情况。这个队列中出现了⼤量的消息,就意味着消费者的运行出现了问题,要及时跟踪进行干预
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
在这里插入图片描述

这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer⾥的消息监听器返回状态改为RECONSUME_LATER测试⼀下。

如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列。

手动处理死信队列

当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。而如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的⼀种特殊队列:死信队列。

意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,⼀般需要人工去查看死信队列中的消息,对错误原因进⾏排查。然后对死信消息进行处理,⽐如转发到正常的Topic重新进⾏消费,或者丢弃。

死信队列的名称是%DLQ%+ConsumGroup

死信队列的特征:

  • ⼀个死信队列对应⼀个ConsumGroup,而不是对应某个消费者实例。
  • 如果⼀个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • ⼀个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

默认创建出来的死信队列,他⾥⾯的消息是⽆法读取的,在控制台和消费者中都⽆法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要⼿动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台操作)。

相关文章:

  • 项目课题——基于NB-IoT的智能水表设计
  • PPHGNetV2源代码解析
  • Python训练营打卡 Day46
  • C# 日志管理功能代码
  • 浅谈python如何做接口自动化
  • Qt生成日志与以及捕获崩溃文件(mingw64位,winDbg)————附带详细解说
  • 第4天:RNN应用(心脏病预测)
  • python实战:如何对word文档的格式进行定制化排版
  • 每日八股文6.6
  • 多模态+空间智能:考拉悠然以AI+智慧灯杆,点亮城市治理新方式
  • 达梦DB操作记录
  • Splash动态渲染技术全解析:从基础到企业级应用(2025最新版)
  • 学习日记-day23-6.6
  • Linux LVM与磁盘配额
  • MySQL基本操作(续)
  • BEV和OCC学习-5:数据预处理流程
  • 【更新至2024年】2003-2024年高铁线路信息数据
  • Maven相关问题:jna版本与ES冲突 + aop失效
  • VTK|8.2.0升级到9.4.2遇到的问题及解决方法
  • 基于nacos2.5.1的MCP服务端微服务项目开发环境配置简介
  • 英语网站建设/上海单个关键词优化
  • 建设银行网站怎样查询贷款信息吗/常用的网络推广方法有
  • 建设网站是什么职位/亚马逊跨境电商
  • 如何做旅游网站的思维导图/软文范文200字
  • 做网站接广告赚钱么/搜索引擎优化seo优惠
  • 2015百度推广网站遭到攻击/软文是指什么