当前位置: 首页 > news >正文

北京JAVA基础面试30天打卡08

RocketMQ、RabbitMQ与Kafka对比及常见问题解决方案

一、概述

消息队列(Message Queue, MQ)是企业IT系统内部通信的核心手段,用于提升性能、实现系统解耦和流量削峰。它具有低耦合、可靠投递、广播、流量控制、最终一致性等功能,是异步RPC的主要手段之一。当前主流消息中间件包括ActiveMQ、RabbitMQ、Kafka和RocketMQ等。本文对比RocketMQ、RabbitMQ和Kafka,并总结常见问题的解决方案。

Kafka是一种分布式流处理平台,最初由LinkedIn开发,现由Apache维护,专注于高吞吐量、持久化和实时数据流处理,广泛用于大数据和日志聚合场景。


二、特性对比

1. RocketMQ

RocketMQ是阿里巴巴自主研发的分布式消息中间件,具有高性能、高可靠性和高可扩展性,广泛应用于高并发场景。

  • NameServer:轻量级服务协调与治理组件,负责记录和维护Topic、Broker信息,监控Broker运行状态。NameServer几乎无状态,可集群部署,节点间无信息同步,类似注册中心。
  • Broker:消息服务器,提供核心消息服务。每个Broker与NameServer集群中的所有节点保持长连接,定时注册Topic信息。
  • Producer:消息生产者,负责生成消息并发送至Broker。
  • Consumer:消息消费者,从Broker获取消息并处理业务逻辑。

2. RabbitMQ

RabbitMQ是基于AMQP协议的开源消息中间件,注重灵活的路由机制和易用性,适合中小型企业或复杂路由场景。

  • Exchange:交换机,根据路由规则将消息转发到对应队列。
  • Broker:消息服务器,提供核心消息服务。
  • Channel:基于TCP连接的虚拟连接,用于消息传输。
  • Routing Key:生产者发送消息时携带的键,指定消息路由规则。
  • Binding Key:绑定Exchange与Queue时指定的键,Routing Key与Binding Key匹配时,消息被路由到对应Queue。

3. Kafka

Kafka是分布式、持久化的消息系统,设计为高吞吐量的日志流处理平台,支持分区和副本机制,适合大数据管道和实时分析。

  • Topic:消息主题,可分为多个分区(Partition),每个分区是一个有序的日志序列,支持并行消费。
  • Broker:Kafka服务器节点,负责存储和转发消息。多个Broker组成集群。
  • Producer:消息生产者,将消息发送到指定Topic的分区,支持分区策略(如轮询、键哈希)。
  • Consumer:消息消费者,通过Consumer Group实现负载均衡,每个消费者订阅Topic并消费分区消息。
  • Zookeeper/KRaft:早期依赖Zookeeper进行元数据管理和选举,新版使用KRaft(Kafka Raft)模式实现内置共识,无需Zookeeper。

三、常见问题及解决方案

1. 重复消费

问题描述:消费者消费消息后需发送确认消息(ACK)给消息队列,通知消息已被消费。若确认消息因网络故障等原因未送达,消息队列可能重复分发消息给其他消费者。

解决方案

  • 保证消息幂等性

    :确保消息多次消费不影响结果。常见方法:

    • 使用唯一消息ID,消费者检查是否已处理。
    • 数据库操作使用唯一约束或版本号控制。
  • RocketMQ

    • 消费者在业务逻辑处理完成后发送ACK,确保消息被正确消费。
    • 使用事务性消息或本地事务状态表,防止重复消费影响业务。
  • RabbitMQ

    • 采用手动确认模式(Manual ACK),处理消息成功后再回复确认。
    • 消费者通过检查消息的唯一标识(如Message ID)避免重复处理。
  • Kafka

    • 消费者管理Offset(消费偏移量),手动提交Offset(disable auto-commit)。
    • 如果消费者崩溃未提交Offset,重启后从上次Offset消费,可能重复;通过幂等操作或Exactly-Once语义(启用idempotence)处理。
    • Consumer Group中,Rebalance可能导致重复消费,使用唯一ID或状态存储(如数据库)确保幂等。

2. 数据丢失

问题描述:消息可能在生产者、消息队列或消费者端丢失,导致业务异常。

RocketMQ
  • 生产者丢数据
    • 使用同步发送(send()),同步感知发送结果,失败可重试(默认重试3次)。
    • 失败消息存储在CommitLog中,支持后续重试。
  • 消息队列丢数据
    • 消息持久化到CommitLog,即使Broker宕机后重启,未消费消息可恢复。
    • 支持同步刷盘(确保消息写入磁盘)和异步刷盘(高性能但可能丢失少量数据)。
    • Broker集群支持1主N从,同步复制确保主节点磁盘故障不丢失消息,异步复制性能更高但有毫秒级延迟。
  • 消费者丢数据
    • 完全消费成功后发送ACK。
    • 维护持久化的Offset记录消费进度,防止因故障丢失消费状态。
RabbitMQ
  • 生产者丢数据
    • 使用事务模式(支持回滚)或Confirm模式(ACK确认),确保生产者可靠发送。
  • 消息队列丢数据
    • 开启消息持久化,消息写入磁盘后通知生产者ACK。
    • 配合Confirm机制,确保消息持久化到磁盘。
  • 消费者丢数据
    • 禁用自动确认模式,改为手动确认(Manual ACK),确保消息处理成功后再确认。
    • 消费者维护消费状态,避免因故障重复消费或丢失。
Kafka
  • 生产者丢数据
    • 配置acks参数:acks=0(不确认,高性能可能丢失)、acks=1(Leader确认)、acks=all(所有副本确认,确保不丢失)。
    • 启用重试和幂等生产者(enable.idempotence=true),防止重复发送。
  • 消息队列丢数据
    • 消息持久化到日志文件(Log Segments),支持配置保留策略(时间或大小)。
    • 通过Replication Factor(副本因子)设置分区副本数,Leader-Follower机制确保高可用;min.insync.replicas配置最小同步副本数。
    • Broker宕机时,副本可选举新Leader,消息不丢失(视副本配置)。
  • 消费者丢数据
    • 禁用自动提交Offset(enable.auto.commit=false),手动提交确保处理成功。
    • 如果自动提交,处理中崩溃可能丢失消息;使用Exactly-Once语义结合事务处理。

3. 消费顺序

问题描述:某些业务场景要求消息按顺序消费,但分布式系统或多线程消费可能导致乱序。

解决方案

  • 单线程消费:保证队列内消息按顺序处理,但可能影响性能。
  • 消息编号:为消息分配序列号,消费者根据编号判断顺序。
  • Queue有序性
    • 消息队列内部数据天然有序。
    • 消费者端通过单线程消费或内存队列排序,确保顺序处理。
  • RocketMQ
    • 使用顺序消息(Sequential Message),将相关消息发送到同一分区,保证分区内顺序。
    • 消费者单线程拉取并处理分区消息。
  • RabbitMQ
    • 利用Queue的FIFO特性,单线程消费确保顺序。
    • 多线程消费时,消费者内部维护内存队列进行排序。
  • Kafka
    • 分区内消息严格有序(append-only日志),但多分区无全局顺序。
    • 对于顺序需求,将相关消息发送到同一分区(基于键哈希)。
    • 消费者组中,每个分区分配给单一消费者,确保分区内顺序;多线程消费需消费者内部协调。

4. 高可用

问题描述:消息队列需保证高可用,防止单点故障导致服务不可用。

RocketMQ
  • 多Master模式
    • 配置简单,性能最高。
    • 单机宕机或重启期间,该机器未消费消息不可用,影响实时性。
    • 可能有少量消息丢失(视配置)。
  • 多Master多Slave异步模式
    • 每Master配一个Slave,消息写入Master,异步复制到Slave。
    • 性能接近多Master,实时性高,主备切换对应用透明。
    • Master宕机或磁盘损坏可能丢失少量消息(毫秒级延迟)。
  • 多Master多Slave同步模式
    • 每Master配一个Slave,消息同步写入主备,成功后返回。
    • 服务和数据可用性高,但性能略低于异步模式。
    • 主节点宕机后,备节点无法自动切换为主,需人工干预。
RabbitMQ
  • 普通集群模式
    • 多台机器运行RabbitMQ实例,Queue仅存储在一个实例上,其他实例同步元数据。
    • 消费时,若连接到非Queue所在实例,会从Queue所在实例拉取数据。
    • 若Queue所在实例宕机,需等待其恢复(持久化消息不丢失),影响实时性。
  • 镜像集群模式
    • Queue的元数据和消息同步到多个实例,写入消息时自动同步到所有实例的Queue。
    • 优点:高可用,单节点宕机不影响服务。
    • 缺点:
      • 数据同步导致性能开销大。
      • 无法线性扩容,因每个节点存储全量数据,单节点容量受限。
Kafka
  • 分布式Broker集群
    • 通过分区和副本机制实现高可用,每个分区有多个副本分布在不同Broker。
    • Leader选举由Controller(基于Zookeeper或KRaft)管理,Broker宕机时自动切换到Follower副本。
    • 支持水平扩展,添加Broker可重新分配分区,提高吞吐量。
  • 优点:高吞吐(百万级TPS),数据持久化强,适合大规模数据流。
  • 缺点:配置复杂,依赖外部协调(如Zookeeper,KRaft缓解);实时性不如RabbitMQ,但延迟低(毫秒级)。

四、总结

  • RocketMQ:适合高并发交易场景,强调性能和分布式架构,NameServer和Broker设计支持大规模集群。数据丢失防护完善,适合对实时性要求高的场景,但在同步模式下主备切换需人工干预。
  • RabbitMQ:基于AMQP协议,灵活的路由机制适合复杂路由场景,易用性强。但镜像集群性能开销大,扩展性受限,适合中小规模应用。
  • Kafka:专注于高吞吐量和数据流处理,分区机制支持并行消费和扩展,适合日志、大数据管道。但不原生支持复杂路由,顺序消费限于分区内,配置较复杂。
  • 常见问题解决方案
    • 重复消费:三者均通过幂等性和手动确认/提交Offset解决。
    • 数据丢失:RocketMQ和Kafka通过主从/副本复制,RabbitMQ通过持久化和Confirm。
    • 消费顺序:利用分区/队列有序性,结合单线程或键分区。
    • 高可用:Kafka的分布式副本最强扩展性,RabbitMQ镜像集群数据一致性高,RocketMQ平衡性能与可靠性。

市场上几大消息队列对比如下:

对比项RabbitMQActiveMQRocketMQKafka
公司RabbitApache阿里Apache
语言ErlangJavaJavaScala & Java
协议支持AMQPOpenWire、STOMP、REST、XMPP、AMQP自定义自定义协议,社区封装了 HTTP 协议支持
客户端支持语言官方支持 Erlang、Java、Ruby 等,社区产出多种 API,几乎支持所有语言Java、C、C++、Python、PHP、Perl、.NET 等Java、C++(不成熟)官方支持 Java,社区产出多种 API,如 PHP、Python 等
单机吞吐量万级(约 3 万)万级(约 4 万)十万级(约 10 万)十万级(约 10 万)
消息延迟微秒级毫秒级毫秒级毫秒以内
可用性高,基于主从架构实现可用性高,基于主从架构实现可用性非常高,分布式架构非常高,分布式架构,一个数据多副本
消息可靠性-有较低概率丢失数据经过参数优化配置,可做到零丢失经过参数配置,可做到零丢失
功能支持基于 Erlang 开发,并发性能极强,性能极好,延时低MQ 领域功能极其完备MQ 功能较为完备,分布式扩展性好功能较为简单,主要支持基本 MQ 功能
优势Erlang 开发,性能极好、延时低,吞吐量万级,功能完备,管理界面优秀,社区活跃,互联网公司使用多成熟稳定,功能强大,业内大量应用接口简单易用,阿里出品,吞吐量大,分布式扩展方便,社区活跃,支持大规模 Topic 和复杂业务场景,可定制开发超高吞吐量,毫秒级延时,极高可用性和可靠性,分布式扩展方便
劣势吞吐量较低,Erlang 开发不易定制,集群动态扩展麻烦偶尔有低概率消息丢失,社区活跃度不高不遵循 JMS 规范,系统迁移需改大量代码,存在被替代风险可能发生消息重复消费
应用各类场景均有使用主要用于解耦和异步,较少用于大规模吞吐适用于大规模吞吐、复杂业务场景大数据实时计算、日志采集等场景的业界标准

选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:

RabbitMQ:

优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
缺点:性能和吞吐量不太理想,不易进行二次开发
RocketMQ:

优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
缺点:兼容性上不是太好
Kafka:

优点:拥有强大的性能及吞吐量,兼容性很好
缺点:由于“攒一波再处理”导致延迟比较高

RocketMQ专栏

1. 推模式(Push)与拉模式(Pull)的区别与实现
推模式:RocketMQ 的 PushConsumer 实际基于长轮询(Long Polling)实现,Broker 收到请求后若队列无消息,会挂起请求并在新消息到达时立即响应。
拉模式:消费者主动拉取,需自行控制频率(如 DefaultLitePullConsumer),适用于需精准控制消费速率的场景。
对比:
推模式实时性高,但需 Broker 维护连接状态,可能因消费能力不足导致积压。
拉模式灵活性高,但需处理消息延迟与空轮询问题。

2. 如何保证消息顺序性?
生产者:通过 MessageQueueSelector 将同一业务 ID 的消息发送至固定队列(如哈希取模)。
消费者:使用 MessageListenerOrderly 监听器,锁定队列并单线程消费。
源码关键点:RebalanceLockManager 管理队列锁,确保同一队列仅被一个线程消费。

3. 事务消息的实现机制
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
应用场景:跨系统分布式事务(如订单创建与库存扣减)。

4. 消息积压的解决方案
临时扩容:增加 Consumer 实例或线程数,提升消费能力。
批量消费:调整 consumeMessageBatchMaxSize 参数,一次处理多条消息。
跳过非关键消息:若允许部分消息丢失,可重置消费位点(resetOffsetByTime)。
异步处理:将耗时操作(如 DB 写入)异步化,减少消费阻塞。

5. 消息的存储结构是怎样的?CommitLog 和 ConsumeQueue 的关系?
CommitLog 存储原始消息,ConsumeQueue 存储逻辑队列的偏移量,通过偏移量快速定位消息。

6. Consumer 的负载均衡策略是什么?
平均分配、一致性 Hash 等,通过 RebalanceService 定时调整队列分配。

7. 如何实现消息的精准一次投递?
RocketMQ 不保证,需业务端结合事务消息 + 幂等性实现。

8. Broker 的刷盘机制如何选择?
高可靠性场景用 SYNC_FLUSH,高性能场景用 ASYNC_FLUSH。

9. NameServer 宕机后,Producer 和 Consumer 还能工作吗?
可以,客户端会缓存路由信息,但无法感知新 Broker 或 Topic 变化。

10. 性能调优
Broker 参数:
sendMessageThreadPoolNums:发送线程数。
pullMessageThreadPoolNums:拉取线程数。
零拷贝技术:通过 MappedFile 内存映射文件减少数据拷贝。

11. Broker 如何处理拉取请求?
长轮询机制:Consumer 拉取请求无消息时,Broker 挂起请求(默认 30s),新消息到达后立即响应。
源码关键点:PullRequestHoldService 管理挂起请求,通过 checkHoldRequest 周期性检查消息到达。

12. RocketMQ 消息存储结构:CommitLog 与 ConsumeQueue 的关系
CommitLog:所有 Topic 的消息按顺序追加写入,文件名格式为 {文件起始偏移量}.log,固定大小 1GB(可配置)。
ConsumeQueue:逻辑队列索引,存储消息在 CommitLog 中的偏移量、大小、Tag HashCode,文件名格式为 {Topic}/{QueueId}/{ConsumeQueueOffset}。
关系:消费者通过 ConsumeQueue 快速定位 CommitLog 中的消息,实现高效检索。

13. 主从同步机制(SYNC/ASYNC)的区别与选型
SYNC_MASTER:
生产者收到 Slave 写入成功 ACK 后才返回,保证数据强一致。
适用场景:金融交易、资金扣减。
ASYNC_MASTER:
主节点写入成功即返回,Slave 异步复制,性能更高。
适用场景:日志传输、允许短暂不一致。

14. 消息重试与死信队列(DLQ)机制
重试队列:消费失败的消息进入重试队列(命名格式:%RETRY%{ConsumerGroup}),按延迟等级(1s, 5s, 10s…)重试。
死信队列:重试 16 次后仍失败,消息进入死信队列(%DLQ%{ConsumerGroup}),需人工处理。
配置参数:maxReconsumeTimes(默认 16 次)。

15. 如何实现消息轨迹(Trace)追踪?
开启方式:Broker 配置 traceTopicEnable=true,Producer/Consumer 设置 enableMsgTrace=true。
原理:消息发送/消费时,额外生成轨迹数据写入内部 Topic RMQ_SYS_TRACE_TOPIC。
查询工具:RocketMQ Console 或自定义消费者订阅轨迹 Topic。

16. Rebalance 机制如何工作?
触发条件:Consumer 数量变化、Broker 上下线、Topic 路由变更。
流程:
客户端定时向 Broker 发送心跳,上报 Consumer Group 信息。
Broker 通过 RebalanceService 计算队列分配策略(平均分配、一致性 Hash)。
Consumer 根据新分配结果调整拉取队列。
源码入口:RebalanceImpl#rebalanceByTopic。

17. RocketMQ 5.0 新特性(如 Proxy 模式)
Proxy 模式:解耦 Broker 与客户端协议,支持多语言客户端(如 HTTP/gRPC),增强云原生兼容性。
事务增强:支持 TCC 模式,提供更灵活的事务解决方案。
轻量级 SDK:简化客户端依赖,提升启动速度。
三、高级特性与源码原理

18. 零拷贝技术
RocketMQ:使用 mmap 内存映射文件,减少用户态与内核态数据拷贝。
Kafka:采用 sendfile 系统调用,实现更高吞吐但灵活性较低。

19. DLedger 高可用机制
基于 Raft 协议实现主从选举,主节点故障时自动切换,保障数据一致性。

20. 消息过滤
Tag 过滤:Broker 端过滤,减少网络传输。
SQL 过滤:需开启 enablePropertyFilter=true,支持复杂条件匹配。

21. 事务消息实现细节
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
源码分析:TransactionMQProducer 处理本地事务回调,TransactionalMessageService 管理事务状态。

22. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。

23. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。

24. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。

25. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。

26. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。

27. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。

场景设计题

1 .设计一个高并发秒杀系统,如何利用 RocketMQ 优化?

    流量削峰:将秒杀请求写入 RocketMQ 队列,异步处理订单创建与库存扣减。顺序消息:使用哈希选择器将同一用户请求路由到固定队列,避免超卖。事务消息:扣减库存与生成订单通过事务消息保证最终一致性。动态扩容:根据监控指标(如堆积消息数)自动扩容 Consumer,快速消化积压

2 . 设计一个秒杀系统,如何用 RocketMQ 解决超卖问题?

    消息队列削峰填谷 + 数据库乐观锁 + 事务消息保证最终库存一致。

3 . 如何实现分布式事务(订单扣库存+生成订单)?

    事务消息:半消息预扣库存,本地事务生成订单,失败则回滚库存。

4.如何设计一个异地多活消息队列系统?

    跨城同步:Broker 集群分机房部署,通过 Async replication 同步消息。单元化路由:Producer 根据用户 ID 哈希选择本地机房 Broker,减少跨城延迟。容灾切换:监控机房状态,自动切换消息路由至可用机房。

5.消息丢失的可能原因与解决方案

    生产者丢失:原因:异步发送未处理 SendCallback 异常。解决:使用同步发送 + 重试机制。Broker 丢失:原因:异步刷盘时宕机,PageCache 数据未落盘。解决:SYNC_FLUSH 刷盘 + 主从同步。消费者丢失:原因:消费成功但 Offset 未提交。解决:先处理业务逻辑,再手动提交 Offset。、
Demo实操
  1. 引入依赖
<dependencies><!-- RocketMQ 客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2. 配置文件 application.yml

yamlrocketmq:namesrvAddr: 127.0.0.1:9876producer:group: test-producer-groupconsumer:group: test-consumer-grouptopic: TestTopic

3. 生产者配置类

javapackage com.example.demo.config;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.group}")private String producerGroup;@Beanpublic DefaultMQProducer mqProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.start();System.out.println("🚀 RocketMQ Producer 启动成功");return producer;}
}

4. 生产者发送接口

javapackage com.example.demo.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate DefaultMQProducer producer;@Value("${rocketmq.topic}")private String topic;@PostMapping("/send")public String sendMessage(@RequestParam String msg) throws Exception {Message message = new Message(topic, msg.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);return "发送成功: " + result;}
}

5. 消费者配置类

java复制编辑
package com.example.demo.config;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;
import java.util.List;@Configuration
public class RocketMQConsumerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.topic}")private String topic;@Beanpublic DefaultMQPushConsumer mqConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println("📩 收到消息: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("✅ RocketMQ Consumer 启动成功");return consumer;}
}

最后有兴趣可以尝试自动重试,TraceId 追踪, 异步发送, 批量发送

http://www.dtcms.com/a/328102.html

相关文章:

  • C++动态代理技术详解:实现原理与应用场景
  • Java静态代理和动态代理
  • Linux驱动开发probe字符设备的完整创建流程
  • 【游戏优化笔记】开发中如何减少建筑和树木等环境元素的资源消耗?
  • 【RHCE】自动化备份全网服务器数据平台
  • 36-综合案例开发-2
  • Chrome插件开发【manifest.json】
  • 【传奇开心果系列】Flet框架桌面程序组件Custom Ribbon自定义组件模板
  • Class34锚框
  • 分享单位开通固定公网IP,不需要找运营商申请
  • sqli-libs通关教程(41-50)
  • lesson36:MySQL从入门到精通:全面掌握数据库操作与核心原理
  • Elasticsearch JS 客户端子客户端(Child Client)实践指南
  • DAY38作业(补)
  • CTO如何通过录音转写和音频降噪,提升企业远程协作效率?
  • Secure 第四天作业
  • Linux环境部署RocketMQ
  • C++算法·排序
  • 第六十四章:AI的“觅食”之路:数据采集器设计与多源数据获取
  • DL-FWI 的三项主要任务: 网络构建, 数据生成, 训练控制
  • 跑腿APP开发未来趋势:同城O2O系统源码在智能调度与个性化中的进化
  • Spring Boot项目中调用第三方接口
  • HCIP项目之OSPF综合实验
  • Flux.1系列模型解析--Kontext
  • 8月12号打卡
  • 【Leetcode hot 100】560.和为K的子数组
  • 无人机航拍数据集|第13期 无人机城市斑马线目标检测YOLO数据集963张yolov11/yolov8/yolov5可训练
  • 为什么304不锈钢仍会生锈?
  • Ubuntu20.06环境下安装VS Code及中文设置方法
  • CSRF 攻击