当前位置: 首页 > news >正文

RocketMQ常见问题梳理

MQ常见问题深度剖析:消息不丢失、顺序性、幂等性与积压处理

本文基于RocketMQ核心原理,结合Kafka/RabbitMQ对比,深入分析MQ四大核心问题解决方案


一、消息不丢失保障机制

消息丢失风险点

  1. 跨网络传输:生产者→Broker、Broker→消费者、主从同步
  2. Broker缓存机制:PageCache异步刷盘导致数据未持久化
  3. 极端故障:整个MQ集群宕机

生产者保证方案

1. 发送确认机制
// RocketMQ同步发送(强安全)
SendResult result = producer.send(msg, 20*1000); // Kafka Future获取(同步效果)
RecordMetadata metadata = producer.send(record).get();// RabbitMQ Publisher Confirms
channel.addConfirmListener(ackCallback, nackCallback);
2. RocketMQ事务消息(双保险)
Commit
Rollback
发送半消息
执行本地事务
事务状态?
提交消息
丢弃消息

设计本质:通过多次网络确认+本地事务绑定,解决业务操作与消息发送的一致性


Broker存储保证

刷盘策略对比
MQ同步刷盘异步刷盘
RocketMQflushDiskType=SYNC_FLUSH默认策略
实际10ms间隔刷盘(非实时)
Kafkalog.flush.interval.messages=1默认Long.MAX
RabbitMQ经典队列不支持实时刷盘流队列依赖OS刷盘

关键结论:任何MQ都无法100%保证断电不丢消息,需结合生产者确认使用


主从同步策略

  1. RocketMQ普通集群

    • 角色模式:ASYNC_MASTER/SYNC_MASTER/SLAVE
    • 故障处理:Master宕机后Slave不切换,等待恢复避免数据丢失
  2. Kafka机制差异

    • Leader切换后,旧Leader未同步数据直接丢弃(可用性优先)
  3. Dledger集群(推荐)

    • 基于Raft协议实现多数派写入
    • 网络分区时可能丢失未确认消息

消费者保障

核心原则:同步处理+消费确认

// 错误示范(异步处理导致丢失)
consumer.registerMessageListener((msgs, context) -> {new Thread(() -> processMsg(msgs)).start(); // 异步线程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 立即确认
});

正确做法:业务处理完成后再提交ACK/Offset


极端场景:全集群宕机

降级方案

  1. 生产者写入本地缓存(DB/文件)
  2. 独立线程异步重试投递
  3. MQ恢复后优先处理缓存消息

二、消息顺序性保障

局部有序实现原理

组件生产者保证消费者保证
RocketMQ相同Hash写入同一MessageQueue单线程消费队列(并发控制)
Kafka指定Partition key单Partition单线程消费
RabbitMQ绑定同一队列单消费者消费单队列

全局有序代价:设置Topic/Partition=1,严重牺牲吞吐量(不推荐)


三、消息幂等性保障

重复消费场景

  1. 生产者重试导致重复发送
  2. 消费者ACK丢失触发重投

解决方案

生产者端
MQ幂等机制
RocketMQMessageID去重(不适用于事务消息)
Kafka开启idempotence:
- PID+SequenceNumber
Broker维护<PID,Partition>序列号
消费者端
// 业务层幂等处理示例
ConcurrentHashMap<String, Boolean> processedMsgIds = new ConcurrentHashMap<>();void processMessage(Message msg) {String bizId = msg.getKeys(); // 业务唯一标识if(processedMsgIds.putIfAbsent(bizId, true) != null) {return; // 已处理则跳过}// 核心业务逻辑...
}

最佳实践:优先使用业务唯一标识(如订单ID)而非MessageID


四、消息积压处理

积压根源

  • 消费者吞吐量不足:处理逻辑复杂或资源不足
  • 设计缺陷:生产者速率 >> 消费者能力

应急方案

1. 消费者扩容
MQ扩容限制优化手段
RocketMQConsumer数 ≤ MessageQueue数增加队列数(需重建Topic)
RabbitMQClassic Queue无明确限制调整Qos权重
2. 建立临时Topic
积压Topic
新建临时Topic
迁移未消费消息
启动只读消费者组快速消费
结果写入DB/缓存
3. 死信队列兜底
  • RocketMQ默认保留72小时
  • 需手动订阅处理:%DLQ%ConsumerGroupName

五、课程核心总结

  1. 设计哲学差异

    • RocketMQ:金融级数据安全(阿里系)
    • Kafka:高吞吐日志处理(LinkedIn)
    • RabbitMQ:灵活消息路由(传统企业)
  2. 方案选择本质

    业务场景
    技术选型
    参数调优
    监控迭代

不存在完美解决方案,只有最适合业务场景的平衡点设计

http://www.dtcms.com/a/297181.html

相关文章:

  • 三、Spark 运行环境部署:全面掌握四种核心模式
  • 【内网穿透】使用FRP实现内网与公网Linux/Ubuntu服务器穿透项目部署多项目穿透方案
  • vue使用xlsx库导出excel
  • 编程语言Java——核心技术篇(三)异常处理详解
  • 字符串 “asdasjkfkasgfgshaahsfaf” 经过哈夫曼编码之后存储比特数是多少?
  • [实战] 用1 PPS 驯服本地恒温晶振(OCXO/TCXO)
  • 医疗AI跨机构建模实施总结:基于 Flower 联邦学习与差分隐私的实践指南
  • ESP32学习笔记_Components(1)——使用LED Strip组件点亮LED灯带
  • 迷宫生成与寻路可视化
  • 广州 VR 安全用电技术:工作原理、特性及优势探析​
  • 天通卫星赋能三防智能平板:AORO P1100打造全域通信新范式
  • 【数据结构与算法】数据结构初阶:详解二叉树(六)——二叉树应用:二叉树选择题
  • 【数据库】探索DBeaver:一款强大的免费开源数据库管理工具
  • 医疗数据挖掘Python机器学习案例
  • PAT 甲级题目讲解:1008《Elevator》
  • Agent领域,近年来的前沿研究方向:多智能体协作、认知启发架构、伦理安全、边缘计算集成
  • Modbus RTU转Profinet网关与涡街液体流量计:工业自动化数据传输的完美协同
  • 【橘子分布式】gRPC(番外篇-拦截器)
  • 关闭chrome自带的跨域限制,简化本地开发
  • XORIndex:朝鲜不断发展的供应链恶意软件再次瞄准 npm 生态系统
  • 《基于电阻抗断层扫描(EIT)驱动的肌肉骨骼模型表征人体手臂动态意图用于人机交互》论文解读
  • Linux: network: wireshark: esp attempt to detec null-encrypted esp payloads
  • chrome使用Modheader插件让浏览器直接预览图片,不下载
  • 算法思维进阶 力扣 62.不同路径 暴力搜索 记忆化搜索 DFS 动态规划 C++详细算法解析 每日一题
  • kafka如何保证数据不丢失
  • 机器学习中knn的详细知识点
  • Linux725 磁盘阵列RAID0 RAID1
  • OneCode3.0 Gallery 组件前后端映射机制:从注解配置到前端渲染的完整链路
  • 应用代码解释
  • 从零开始的云计算生活——番外6,使用zabbix对中间件监控