MQ 面试宝典
目录
- 第一题:介绍一下 ActiveMQ、RocketMQ、RabbitMQ,几个有什么区别
- 第二题:各个MQ的高可用是如何实现的
- 第三题:ActiveMQ、RocketMQ、RabbitMQ几个是如何保证消息的可靠性传输的,如果消息丢失了怎么处理
- 第四题:ActiveMQ、RocketMQ、RabbitMQ分别是如何发布消息的
- 第五题:如何保证消息的顺序性?RocketMQ的顺序消息是如何实现的?
- 第六题:如何解决消息重复消费问题?如何保证消息的幂等性?
- 第七题:RocketMQ的事务消息是如何实现的?两阶段提交在MQ中如何应用?
- 第八题:MQ支持哪些消息过滤方式?Tag过滤和SQL过滤有什么区别?
- 第九题:如何优化MQ的性能?有哪些调优参数?
- 第十题:如何处理消息堆积问题?如何快速恢复消费?
第一题:介绍一下 ActiveMQ、RocketMQ、RabbitMQ,几个有什么区别
消息队列(MQ)是分布式系统中重要的中间件,用于解耦、异步处理、削峰填谷等场景。
1. ActiveMQ
- 定义:Apache ActiveMQ是开源的基于JMS规范的消息中间件
- 协议支持:支持多种协议(OpenWire、STOMP、AMQP、MQTT、REST等)
- 核心特性:持久化(KahaDB、JDBC、LevelDB)、事务支持、消息确认、集群模式
- 适用场景:传统企业级应用、跨语言集成、学习研究
2. RocketMQ
- 定义:Apache RocketMQ是阿里巴巴开源的分布式消息中间件
- 设计理念:专为分布式系统设计,高性能、高可靠
- 核心特性:顺序消息、事务消息、消息过滤(Tag/SQL92)、消息回溯、定时消息
- 架构组件:NameServer(注册中心)、Broker(存储转发)、Producer、Consumer
- 适用场景:电商系统、金融系统、大数据处理、高并发场景
3. RabbitMQ
- 定义:RabbitMQ是基于AMQP协议的开源消息中间件
- 开发语言:使用Erlang语言开发
- 核心特性:灵活路由(Direct、Topic、Fanout、Headers Exchange)、消息确认、集群支持、插件丰富
- 适用场景:微服务架构、任务队列、日志收集、事件驱动
4. 三者对比
性能对比
特性 | ActiveMQ | RocketMQ | RabbitMQ |
---|---|---|---|
吞吐量 | 中等 | 很高 | 中等 |
延迟 | 中等 | 很低 | 低 |
并发支持 | 中等 | 很高 | 中等 |
功能对比
特性 | ActiveMQ | RocketMQ | RabbitMQ |
---|---|---|---|
消息顺序 | 不支持 | 支持 | 支持 |
事务消息 | 支持 | 支持 | 支持 |
消息过滤 | 支持 | 支持 | 支持 |
消息回溯 | 不支持 | 支持 | 不支持 |
选型建议
- 高并发场景:选择RocketMQ,性能最高,适合电商、金融等核心业务
- 微服务架构:选择RabbitMQ,路由灵活,插件丰富,适合服务解耦
- 传统企业应用:选择ActiveMQ,JMS标准,学习成本低,适合中小规模
零拷贝技术详解
传统数据拷贝过程:
应用程序 → 内核缓冲区 → Socket缓冲区 → 网卡↓ ↓ ↓数据拷贝 数据拷贝 数据拷贝
零拷贝优化过程:
应用程序 → 内核缓冲区 → 网卡(直接传输)↓ ↓数据拷贝 无拷贝
RocketMQ中的零拷贝应用:
- CommitLog顺序写入:使用mmap映射文件到内存
- 消息消费:使用sendfile直接传输文件内容
- 性能提升:CPU使用率降低30-50%,网络吞吐量提升2-3倍
第二题:各个MQ的高可用是如何实现的
1. ActiveMQ 高可用实现
Master-Slave模式
- 共享存储模式:多个ActiveMQ实例共享同一个存储
- 工作原理:只有一个实例处于Active状态,其他实例处于Standby状态
- 故障转移:当Active实例故障时,Standby实例自动接管服务
配置示例
spring:activemq:broker-url: failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)
2. RocketMQ 高可用实现
NameServer集群
- 注册中心:NameServer作为注册中心,管理Broker信息
- 集群部署:多个NameServer实例组成集群
- 数据同步:NameServer之间数据最终一致
Broker主从架构
- Master-Slave结构:每个Broker组包含一个Master和多个Slave
- 数据同步:Master将消息同步到Slave
- 读写分离:Master处理写请求,Slave处理读请求
配置示例
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0 # 0表示Master,大于0表示Slave
3. RabbitMQ 高可用实现
镜像队列(Mirrored Queues)
- 队列复制:队列内容在多个节点间镜像复制
- 主从结构:一个主节点,多个从节点
- 故障转移:主节点故障时,从节点自动提升为主节点
集群模式
- 节点对等:所有节点地位平等,可以独立运行
- 元数据同步:队列、交换器、绑定关系等元数据在集群间同步
配置示例
# 镜像队列策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
4. 高可用技术对比
架构模式对比
MQ产品 | 主要架构 | 数据同步方式 | 故障转移机制 |
---|---|---|---|
ActiveMQ | Master-Slave/集群 | 共享存储/网络复制 | 自动切换 |
RocketMQ | NameServer+Broker集群 | 主从同步 | 自动主从切换 |
RabbitMQ | 对等集群+镜像队列 | 镜像复制 | 自动节点提升 |
故障恢复时间对比
MQ产品 | 故障检测时间 | 故障转移时间 | 服务可用性 |
---|---|---|---|
ActiveMQ | 30-60秒 | 10-30秒 | 99.9% |
RocketMQ | 10-30秒 | 5-15秒 | 99.99% |
RabbitMQ | 30-60秒 | 10-30秒 | 99.9% |
第三题:ActiveMQ、RocketMQ、RabbitMQ几个是如何保证消息的可靠性传输的,如果消息丢失了怎么处理
1. 消息丢失的可能场景
生产者端消息丢失
- 网络异常、Broker故障、发送失败、配置错误
消息队列端消息丢失
- 存储故障、内存溢出、配置问题、集群故障
消费者端消息丢失
- 消费异常、确认丢失、重复消费、超时处理
2. 生产者端可靠性保障
ActiveMQ生产端可靠性
// 消息确认机制
public void sendReliableMessage() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = factory.createConnection();Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);Queue queue = session.createQueue("reliable.queue");MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久化消息TextMessage message = session.createTextMessage("ActiveMQ Reliable Message");producer.send(message);
}
RocketMQ生产端可靠性
// 同步发送确认
public void sendReliableMessage() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));SendResult result = producer.send(msg); // 同步发送,等待确认if (result.getSendStatus() == SendStatus.SEND_OK) {System.out.println("消息发送成功");}
}
RabbitMQ生产端可靠性
// 发布确认机制
public void sendReliableMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect(); // 启用发布确认channel.basicPublish("exchange_name", "routing.key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));if (channel.waitForConfirms()) {System.out.println("消息发送成功");}
}
3. 消息队列端可靠性保障
消息持久化对比
MQ产品 | 消息持久化 | 队列持久化 | 主从同步 |
---|---|---|---|
ActiveMQ | 支持 | 支持 | 支持 |
RocketMQ | 支持 | 支持 | 支持 |
RabbitMQ | 支持 | 支持 | 支持 |
主从同步配置
brokerRole=SYNC_MASTER # 同步主节点
flushDiskType=SYNC_FLUSH # 同步刷盘
4. 消费者端可靠性保障
手动确认机制
// RabbitMQ手动确认示例
public void consumeWithManualAck() throws Exception {Channel channel = connection.createChannel();channel.basicQos(1); // 设置预取数量为1boolean autoAck = false; // 手动确认模式Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {try {String message = new String(body, "UTF-8");processMessage(message); // 处理业务逻辑channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认} catch (Exception e) {channel.basicNack(envelope.getDeliveryTag(), false, false); // 拒绝消息}}};channel.basicConsume("queue_name", autoAck, consumer);
}
幂等性处理
// 幂等性处理示例
public class IdempotentConsumer {private Set<String> processedMessages = ConcurrentHashMap.newKeySet();public void consumeWithIdempotency(String messageId, String message) {if (processedMessages.contains(messageId)) {System.out.println("消息已处理,跳过: " + messageId);return;}try {processBusinessLogic(message);processedMessages.add(messageId); // 记录已处理的消息ID} catch (Exception e) {throw e;}}
}
5. 重试机制对比
ActiveMQ重试机制:客户端重试,指数退避算法
RocketMQ重试机制:自动重试,指数退避算法
RabbitMQ重试机制:发布确认,手动重试
6. 消息丢失检测和处理
监控告警机制
// 消息丢失监控
public class MessageLossMonitor {private AtomicLong sentCount = new AtomicLong(0);private AtomicLong confirmedCount = new AtomicLong(0);public void checkMessageLoss() {long sent = sentCount.get();long confirmed = confirmedCount.get();long lost = sent - confirmed;double lossRate = (double) lost / sent;if (lossRate > 0.01) { // 丢失率超过1%System.out.println("警告:消息丢失率过高: " + (lossRate * 100) + "%");}}
}
第四题:ActiveMQ、RocketMQ、RabbitMQ分别是如何发布消息的
1. ActiveMQ 消息发布机制
JMS消息模型
- 点对点模型(P2P):一对一消息传递,使用Queue
- 发布/订阅模型(Pub/Sub):一对多消息传递,使用Topic
P2P消息发布
// ActiveMQ点对点消息发布
public void sendQueueMessage() throws JMSException {ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = factory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue("test.queue");MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.PERSISTENT);TextMessage message = session.createTextMessage("Hello ActiveMQ Queue");producer.send(message);
}
Pub/Sub消息发布
// ActiveMQ发布/订阅消息发布
public void sendTopicMessage() throws JMSException {ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = factory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("test.topic");MessageProducer producer = session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);TextMessage message = session.createTextMessage("Hello ActiveMQ Topic");producer.send(message);
}
2. RocketMQ 消息发布机制
Topic和Tag机制
- Topic:消息主题,用于消息分类
- Tag:消息标签,用于消息过滤
三种发送方式
// 1. 同步消息发布
public void sendSyncMessage() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));SendResult result = producer.send(message); // 同步发送System.out.println("同步消息发送结果: " + result.getSendStatus());
}// 2. 异步消息发布
public void sendAsyncMessage() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("TopicTest", "TagB", "Hello RocketMQ Async".getBytes("UTF-8"));producer.send(message, new SendCallback() { // 异步发送@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步消息发送成功: " + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.println("异步消息发送失败: " + e.getMessage());}});
}// 3. 单向消息发布
public void sendOnewayMessage() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("TopicTest", "TagC", "Hello RocketMQ Oneway".getBytes("UTF-8"));producer.sendOneway(message); // 单向发送,不关心结果
}
3. RabbitMQ 消息发布机制
Exchange路由机制
- Direct Exchange:直接路由,精确匹配routing key
- Topic Exchange:主题路由,支持通配符匹配
- Fanout Exchange:广播路由,忽略routing key
- Headers Exchange:头部路由,基于消息头匹配
Direct Exchange消息发布
// RabbitMQ Direct Exchange消息发布
public void sendDirectMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct_exchange", "direct", true);channel.queueDeclare("direct_queue", true, false, false, null);channel.queueBind("direct_queue", "direct_exchange", "direct.routing.key");String message = "Hello RabbitMQ Direct";channel.basicPublish("direct_exchange", "direct.routing.key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
}
Topic Exchange消息发布
// RabbitMQ Topic Exchange消息发布
public void sendTopicMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topic_exchange", "topic", true);String queueName = "topic_queue";channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到Exchange,使用routing key模式channel.queueBind(queueName, "topic_exchange", "user.order.*"); // 匹配 user.order.xxxchannel.queueBind(queueName, "topic_exchange", "user.*.created"); // 匹配 user.xxx.createdString[] routingKeys = {"user.order.created", "user.payment.created"};for (String routingKey : routingKeys) {String message = "Hello RabbitMQ Topic - " + routingKey;channel.basicPublish("topic_exchange", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));}
}
Fanout Exchange消息发布
// RabbitMQ Fanout Exchange消息发布
public void sendFanoutMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout_exchange", "fanout", true);String message = "Hello RabbitMQ Fanout";channel.basicPublish("fanout_exchange", "", // Routing Key(Fanout忽略)MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
}
4. 消息发布方式对比
发布模式对比
MQ产品 | 发布模式 | 路由机制 | 消息确认 | 事务支持 |
---|---|---|---|---|
ActiveMQ | Queue/Topic | JMS标准 | 支持 | 支持 |
RocketMQ | Topic/Tag | 主题过滤 | 支持 | 支持 |
RabbitMQ | Exchange | 灵活路由 | 支持 | 支持 |
性能特点对比
MQ产品 | 同步发送 | 异步发送 | 批量发送 | 单向发送 |
---|---|---|---|---|
ActiveMQ | 支持 | 支持 | 支持 | 支持 |
RocketMQ | 支持 | 支持 | 支持 | 支持 |
RabbitMQ | 支持 | 支持 | 支持 | 不支持 |
5. 消息发布最佳实践
性能优化策略
- 连接池:使用连接池减少连接开销
- 批量操作:批量发送和接收消息
- 异步处理:使用异步发送提高吞吐量
- 压缩传输:对消息进行压缩减少网络传输
监控指标
- 发送速率:每秒发送的消息数量
- 发送延迟:消息发送的平均延迟
- 成功率:消息发送的成功率
- 错误率:消息发送的错误率
第五题:如何保证消息的顺序性?RocketMQ的顺序消息是如何实现的?
消息顺序性是分布式消息系统中一个重要且复杂的问题,涉及消息的发送顺序、存储顺序和消费顺序。不同MQ产品对顺序性的支持程度不同,理解顺序消息的实现原理对设计高可靠的消息系统至关重要。
1. 消息顺序性的基本概念
什么是消息顺序性?
- 发送顺序:生产者发送消息的顺序
- 存储顺序:消息在队列中的存储顺序
- 消费顺序:消费者接收消息的顺序
顺序性的重要性
- 业务逻辑:某些业务场景需要保证消息的处理顺序
- 数据一致性:避免因消息乱序导致的数据不一致
- 用户体验:保证用户操作的逻辑顺序
2. RocketMQ顺序消息实现
RocketMQ顺序消息类型
- 分区顺序消息:同一分区内消息有序,不同分区间无序
- 全局顺序消息:所有消息严格按发送顺序消费
分区顺序消息实现
// 分区顺序消息发送
public class OrderedMessageProducer {public void sendOrderedMessage() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();// 2. 发送顺序消息for (int i = 0; i < 10; i++) {String messageBody = "Ordered Message " + i;Message message = new Message("ordered_topic", "tag", messageBody.getBytes("UTF-8"));// 3. 设置消息队列选择器,保证同一订单的消息发送到同一队列SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 根据业务键选择队列,保证同一业务的消息在同一队列String orderId = (String) arg;int index = Math.abs(orderId.hashCode()) % mqs.size();return mqs.get(index);}}, "ORDER_001"); // 业务键:订单IDSystem.out.println("发送结果: " + sendResult);}producer.shutdown();}
}// 分区顺序消息消费
public class OrderedMessageConsumer {public void consumeOrderedMessage() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 2. 订阅主题consumer.subscribe("ordered_topic", "*");// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages,ConsumeOrderlyContext context) {for (MessageExt message : messages) {try {// 处理消息String body = new String(message.getBody(), "UTF-8");System.out.println("消费顺序消息: " + body + ", 队列ID: " + message.getQueueId() + ", 消息ID: " + message.getMsgId());// 模拟业务处理processOrderedMessage(message);} catch (Exception e) {System.out.println("消息处理失败: " + e.getMessage());// 顺序消息消费失败,返回SUSPEND_CURRENT_QUEUE_A_MOMENTreturn ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}});// 4. 启动消费者consumer.start();}private void processOrderedMessage(MessageExt message) {// 模拟业务处理逻辑System.out.println("处理订单消息: " + new String(message.getBody(), "UTF-8"));}
}
全局顺序消息实现
// 全局顺序消息实现
public class GlobalOrderedMessage {public void setupGlobalOrderedMessage() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("global_ordered_producer");producer.setNamesrvAddr("localhost:9876");producer.start();// 2. 发送全局顺序消息(所有消息发送到同一个队列)for (int i = 0; i < 100; i++) {String messageBody = "Global Ordered Message " + i;Message message = new Message("global_ordered_topic", "tag", messageBody.getBytes("UTF-8"));// 3. 使用固定的队列选择器,所有消息发送到队列0SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 所有消息发送到第一个队列,保证全局顺序return mqs.get(0);}}, null);System.out.println("发送全局顺序消息: " + sendResult);}producer.shutdown();}
}
3. 其他MQ的顺序消息实现
RabbitMQ顺序消息实现
// RabbitMQ顺序消息实现
public class RabbitMQOrderedMessage {public void setupRabbitMQOrderedMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 声明队列(单队列保证顺序)channel.queueDeclare("ordered_queue", true, false, false, null);// 2. 发送顺序消息for (int i = 0; i < 10; i++) {String message = "Ordered Message " + i;channel.basicPublish("", "ordered_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));}// 3. 消费顺序消息channel.basicQos(1); // 设置QoS为1,确保一次只处理一条消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {try {String message = new String(body, "UTF-8");System.out.println("消费顺序消息: " + message);// 处理消息processMessage(message);// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}}};channel.basicConsume("ordered_queue", false, consumer);}private void processMessage(String message) {System.out.println("处理消息: " + message);}
}
ActiveMQ顺序消息实现
// ActiveMQ顺序消息实现
public class ActiveMQOrderedMessage {public void setupActiveMQOrderedMessage() throws Exception {// 1. 创建连接ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 2. 创建队列Queue queue = session.createQueue("ordered.queue");// 3. 创建生产者MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 4. 发送顺序消息for (int i = 0; i < 10; i++) {TextMessage message = session.createTextMessage("Ordered Message " + i);producer.send(message);}// 5. 创建消费者MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;System.out.println("消费顺序消息: " + textMessage.getText());}} catch (Exception e) {e.printStackTrace();}}});}
}
4. 顺序消息的实现原理
RocketMQ顺序消息原理
- 队列选择:通过MessageQueueSelector选择特定队列
- 单队列消费:同一队列内的消息按顺序消费
- 锁机制:消费时对队列加锁,保证单线程消费
- 重试机制:消费失败时暂停当前队列,避免乱序
顺序消息的关键点
- 队列数量:分区顺序消息需要多个队列,全局顺序消息只需要一个队列
- 消费线程:顺序消费使用单线程,并发消费使用多线程
- 失败处理:消费失败时不能跳过消息,需要暂停队列
- 性能影响:顺序消费会降低并发性能
5. 顺序消息的最佳实践
设计原则
- 业务分析:分析业务是否真正需要顺序性
- 分区设计:合理设计分区键,避免热点问题
- 性能平衡:在顺序性和性能之间找到平衡点
- 监控告警:监控顺序消息的消费进度和延迟
注意事项
- 性能影响:顺序消费会降低并发性能
- 故障处理:单个消息失败会影响后续消息消费
- 扩容限制:全局顺序消息无法水平扩容
- 业务设计:尽量通过业务设计避免对顺序性的依赖
监控指标
- 顺序消息消费延迟
- 顺序消息消费TPS
- 顺序消息失败率
- 队列消费进度
第六题:如何解决消息重复消费问题?如何保证消息的幂等性?
消息重复消费是分布式消息系统中不可避免的问题,而幂等性是解决重复消费问题的核心方案。理解重复消费的原因、幂等性的设计原则和实现方法,对构建高可靠的消息系统至关重要。
1. 消息重复消费的基本概念
什么是消息重复消费?
- 网络重传:网络异常导致消息重复发送
- 消费重试:消费失败后重新消费同一条消息
- 负载均衡:多个消费者同时消费同一条消息
- 系统故障:系统故障恢复后重复处理消息
重复消费的影响
- 数据不一致:重复处理导致数据状态错误
- 业务逻辑错误:重复执行业务操作
- 资源浪费:重复处理消耗系统资源
- 用户体验差:用户看到重复的操作结果
2. 幂等性的基本概念
什么是幂等性?
- 数学定义:f(f(x)) = f(x),多次执行结果相同
- 业务定义:同一操作执行多次,结果与执行一次相同
- 系统定义:系统对同一请求的多次处理结果一致
幂等性的重要性
- 数据一致性:保证数据状态的一致性
- 系统可靠性:提高系统的容错能力
- 用户体验:避免重复操作带来的困扰
- 业务正确性:确保业务逻辑的正确执行
3. 幂等性设计原则
幂等性设计原则
- 唯一标识:为每个操作分配唯一标识
- 状态检查:执行前检查操作状态
- 原子操作:保证检查和操作的原子性
- 状态更新:及时更新操作状态
幂等性实现方式
- 数据库唯一约束:利用数据库唯一性约束
- 分布式锁:使用分布式锁保证原子性
- 状态机:通过状态机控制操作流程
- 去重表:维护操作记录表
4. 消息幂等性实现方案
方案1:数据库唯一约束
// 基于数据库唯一约束的幂等性实现
public class DatabaseIdempotentService {@Autowiredprivate OrderMapper orderMapper;public void processOrderMessage(OrderMessage message) {try {// 1. 尝试插入订单记录(利用唯一约束)Order order = new Order();order.setOrderId(message.getOrderId());order.setUserId(message.getUserId());order.setAmount(message.getAmount());order.setStatus("PROCESSING");order.setCreateTime(new Date());orderMapper.insert(order);// 2. 执行业务逻辑processOrderBusiness(order);// 3. 更新订单状态order.setStatus("COMPLETED");orderMapper.updateById(order);} catch (DuplicateKeyException e) {// 订单已存在,直接返回(幂等性保证)System.out.println("订单已存在,跳过处理: " + message.getOrderId());}}private void processOrderBusiness(Order order) {// 模拟业务处理System.out.println("处理订单业务: " + order.getOrderId());}
}
方案2:Redis分布式锁
// 基于Redis分布式锁的幂等性实现
public class RedisIdempotentService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void processOrderMessage(OrderMessage message) {String lockKey = "order_lock:" + message.getOrderId();String lockValue = UUID.randomUUID().toString();try {// 1. 尝试获取分布式锁Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, Duration.ofMinutes(5));if (!lockAcquired) {System.out.println("订单正在处理中,跳过: " + message.getOrderId());return;}// 2. 检查订单是否已处理String processedKey = "order_processed:" + message.getOrderId();if (redisTemplate.hasKey(processedKey)) {System.out.println("订单已处理,跳过: " + message.getOrderId());return;}// 3. 执行业务逻辑processOrderBusiness(message);// 4. 标记订单已处理redisTemplate.opsForValue().set(processedKey, "1", Duration.ofHours(24));} finally {// 5. 释放分布式锁releaseLock(lockKey, lockValue);}}private void releaseLock(String lockKey, String lockValue) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) else return 0 end";redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockKey), lockValue);}private void processOrderBusiness(OrderMessage message) {// 模拟业务处理System.out.println("处理订单业务: " + message.getOrderId());}
}
方案3:消息去重表
// 基于消息去重表的幂等性实现
public class MessageDeduplicationService {@Autowiredprivate MessageRecordMapper messageRecordMapper;public void processOrderMessage(OrderMessage message) {String messageId = message.getMessageId();try {// 1. 检查消息是否已处理MessageRecord existingRecord = messageRecordMapper.selectByMessageId(messageId);if (existingRecord != null) {System.out.println("消息已处理,跳过: " + messageId);return;}// 2. 插入消息记录(利用唯一约束)MessageRecord record = new MessageRecord();record.setMessageId(messageId);record.setStatus("PROCESSING");record.setCreateTime(new Date());messageRecordMapper.insert(record);// 3. 执行业务逻辑processOrderBusiness(message);// 4. 更新消息状态record.setStatus("COMPLETED");record.setUpdateTime(new Date());messageRecordMapper.updateById(record);} catch (DuplicateKeyException e) {System.out.println("消息重复,跳过处理: " + messageId);}}private void processOrderBusiness(OrderMessage message) {// 模拟业务处理System.out.println("处理订单业务: " + message.getOrderId());}
}
方案4:业务状态机
// 基于业务状态机的幂等性实现
public class StateMachineIdempotentService {@Autowiredprivate OrderMapper orderMapper;public void processOrderMessage(OrderMessage message) {String orderId = message.getOrderId();// 1. 查询订单当前状态Order order = orderMapper.selectByOrderId(orderId);if (order == null) {// 订单不存在,创建新订单createNewOrder(message);} else {// 订单存在,检查状态String currentStatus = order.getStatus();switch (currentStatus) {case "PENDING":// 待处理状态,可以处理processOrderBusiness(order);break;case "PROCESSING":// 处理中状态,跳过System.out.println("订单处理中,跳过: " + orderId);break;case "COMPLETED":// 已完成状态,跳过System.out.println("订单已完成,跳过: " + orderId);break;case "FAILED":// 失败状态,可以重试retryOrderBusiness(order);break;default:System.out.println("未知订单状态: " + currentStatus);}}}private void createNewOrder(OrderMessage message) {Order order = new Order();order.setOrderId(message.getOrderId());order.setUserId(message.getUserId());order.setAmount(message.getAmount());order.setStatus("PENDING");order.setCreateTime(new Date());orderMapper.insert(order);processOrderBusiness(order);}private void processOrderBusiness(Order order) {try {// 更新状态为处理中order.setStatus("PROCESSING");orderMapper.updateById(order);// 模拟业务处理System.out.println("处理订单业务: " + order.getOrderId());// 更新状态为已完成order.setStatus("COMPLETED");orderMapper.updateById(order);} catch (Exception e) {// 更新状态为失败order.setStatus("FAILED");orderMapper.updateById(order);throw e;}}private void retryOrderBusiness(Order order) {System.out.println("重试订单业务: " + order.getOrderId());processOrderBusiness(order);}
}
5. 不同MQ的幂等性处理
RocketMQ幂等性处理
// RocketMQ幂等性处理
public class RocketMQIdempotentConsumer {public void consumeMessage() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("idempotent_consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("order_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {try {// 1. 获取消息ID作为幂等键String messageId = message.getMsgId();// 2. 检查消息是否已处理if (isMessageProcessed(messageId)) {System.out.println("消息已处理,跳过: " + messageId);continue;}// 3. 处理消息processMessage(message);// 4. 标记消息已处理markMessageProcessed(messageId);} catch (Exception e) {System.out.println("消息处理失败: " + e.getMessage());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}private boolean isMessageProcessed(String messageId) {// 检查消息是否已处理return redisTemplate.hasKey("message_processed:" + messageId);}private void markMessageProcessed(String messageId) {// 标记消息已处理redisTemplate.opsForValue().set("message_processed:" + messageId, "1", Duration.ofHours(24));}private void processMessage(MessageExt message) {// 处理消息业务逻辑System.out.println("处理消息: " + new String(message.getBody(), "UTF-8"));}
}
RabbitMQ幂等性处理
// RabbitMQ幂等性处理
public class RabbitMQIdempotentConsumer {public void consumeMessage() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("order_queue", true, false, false, null);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {try {// 1. 获取消息IDString messageId = properties.getMessageId();// 2. 检查消息是否已处理if (isMessageProcessed(messageId)) {System.out.println("消息已处理,跳过: " + messageId);channel.basicAck(envelope.getDeliveryTag(), false);return;}// 3. 处理消息processMessage(new String(body, "UTF-8"));// 4. 标记消息已处理markMessageProcessed(messageId);// 5. 确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {System.out.println("消息处理失败: " + e.getMessage());// 拒绝消息,不重新入队try {channel.basicNack(envelope.getDeliveryTag(), false, false);} catch (Exception ex) {ex.printStackTrace();}}}};channel.basicConsume("order_queue", false, consumer);}private boolean isMessageProcessed(String messageId) {return redisTemplate.hasKey("message_processed:" + messageId);}private void markMessageProcessed(String messageId) {redisTemplate.opsForValue().set("message_processed:" + messageId, "1", Duration.ofHours(24));}private void processMessage(String messageBody) {System.out.println("处理消息: " + messageBody);}
}
6. 幂等性最佳实践
设计原则
- 业务分析:分析业务是否天然幂等
- 幂等键设计:选择合适的幂等键
- 状态管理:合理设计状态流转
- 异常处理:处理幂等性检查异常
注意事项
- 性能影响:幂等性检查会增加性能开销
- 存储成本:需要存储幂等性记录
- 清理策略:定期清理过期的幂等性记录
- 分布式一致性:保证幂等性检查的原子性
监控指标
- 重复消息数量
- 幂等性检查耗时
- 幂等性检查成功率
- 消息处理延迟
第七题:RocketMQ的事务消息是如何实现的?两阶段提交在MQ中如何应用?
事务消息是分布式消息系统中的高级特性,用于解决分布式事务问题。RocketMQ的事务消息基于两阶段提交协议实现,能够保证本地事务和消息发送的一致性。理解事务消息的实现原理对设计分布式事务系统至关重要。
1. 事务消息的基本概念
什么是事务消息?
- 本地事务:业务系统中的数据库操作
- 消息发送:向消息队列发送消息
- 事务一致性:保证本地事务和消息发送的一致性
事务消息的应用场景
- 订单支付:支付成功后发送订单消息
- 库存扣减:扣减库存后发送库存变更消息
- 用户注册:注册成功后发送欢迎消息
- 数据同步:数据变更后发送同步消息
事务消息的挑战
- 网络异常:网络故障导致消息发送失败
- 系统故障:系统崩溃导致事务状态不一致
- 并发问题:高并发下的状态管理
- 性能影响:事务消息的性能开销
2. RocketMQ事务消息实现原理
两阶段提交流程
第一阶段:发送半消息 → 执行本地事务 → 提交/回滚
第二阶段:根据本地事务结果 → 提交/回滚消息
事务消息状态
- PREPARED:半消息状态,等待本地事务结果
- COMMIT:本地事务成功,消息可被消费
- ROLLBACK:本地事务失败,消息被丢弃
- UNKNOWN:本地事务状态未知,需要回查
3. RocketMQ事务消息实现
事务消息发送
// RocketMQ事务消息发送
public class TransactionMessageProducer {public void sendTransactionMessage() throws Exception {// 1. 创建事务消息生产者TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");producer.setNamesrvAddr("localhost:9876");// 2. 设置事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务boolean success = executeLocalBusiness(msg, arg);if (success) {System.out.println("本地事务执行成功,提交消息");return LocalTransactionState.COMMIT_MESSAGE;} else {System.out.println("本地事务执行失败,回滚消息");return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {System.out.println("本地事务执行异常,回查消息");return LocalTransactionState.UNKNOW;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 事务回查逻辑return checkLocalTransactionStatus(msg);}});// 3. 启动生产者producer.start();// 4. 发送事务消息for (int i = 0; i < 10; i++) {String messageBody = "Transaction Message " + i;Message message = new Message("transaction_topic", "tag", messageBody.getBytes("UTF-8"));// 设置消息属性message.putUserProperty("orderId", "ORDER_" + i);message.putUserProperty("userId", "USER_" + i);// 发送事务消息TransactionSendResult result = producer.sendMessageInTransaction(message, "ORDER_" + i);System.out.println("事务消息发送结果: " + result);}// 5. 关闭生产者producer.shutdown();}private boolean executeLocalBusiness(Message msg, Object arg) {try {// 模拟本地事务:创建订单String orderId = (String) arg;System.out.println("执行本地事务:创建订单 " + orderId);// 模拟数据库操作createOrder(orderId);// 模拟业务逻辑Thread.sleep(100);return true; // 事务成功} catch (Exception e) {System.out.println("本地事务执行失败: " + e.getMessage());return false; // 事务失败}}private LocalTransactionState checkLocalTransactionStatus(MessageExt msg) {try {// 根据消息内容查询本地事务状态String orderId = msg.getUserProperty("orderId");System.out.println("回查本地事务状态: " + orderId);// 查询数据库中的订单状态OrderStatus status = queryOrderStatus(orderId);switch (status) {case CREATED:return LocalTransactionState.COMMIT_MESSAGE;case FAILED:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.UNKNOW;}} catch (Exception e) {System.out.println("事务回查异常: " + e.getMessage());return LocalTransactionState.UNKNOW;}}private void createOrder(String orderId) {// 模拟创建订单System.out.println("创建订单: " + orderId);}private OrderStatus queryOrderStatus(String orderId) {// 模拟查询订单状态return OrderStatus.CREATED;}
}
事务消息消费
// 事务消息消费
public class TransactionMessageConsumer {public void consumeTransactionMessage() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 2. 订阅主题consumer.subscribe("transaction_topic", "*");// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {try {// 处理事务消息processTransactionMessage(message);} catch (Exception e) {System.out.println("事务消息处理失败: " + e.getMessage());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 4. 启动消费者consumer.start();}private void processTransactionMessage(MessageExt message) {String messageBody = new String(message.getBody(), "UTF-8");String orderId = message.getUserProperty("orderId");String userId = message.getUserProperty("userId");System.out.println("处理事务消息:");System.out.println("消息内容: " + messageBody);System.out.println("订单ID: " + orderId);System.out.println("用户ID: " + userId);// 处理业务逻辑processOrderMessage(orderId, userId);}private void processOrderMessage(String orderId, String userId) {// 模拟处理订单消息System.out.println("处理订单消息: " + orderId + ", 用户: " + userId);}
}
4. 事务消息的配置和优化
事务消息配置
// 事务消息配置
public class TransactionMessageConfig {public void configureTransactionMessage() throws Exception {TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");producer.setNamesrvAddr("localhost:9876");// 1. 设置事务监听器producer.setTransactionListener(new CustomTransactionListener());// 2. 设置线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("transaction-thread-" + System.currentTimeMillis());return thread;}});producer.setExecutorService(executorService);// 3. 设置事务回查超时时间producer.setCheckThreadPoolMinSize(1);producer.setCheckThreadPoolMaxSize(1);producer.setCheckRequestHoldMax(2000);// 4. 启动生产者producer.start();}
}
事务消息优化
// 事务消息优化
public class OptimizedTransactionMessage {private final Map<String, LocalTransactionState> transactionStates = new ConcurrentHashMap<>();public void sendOptimizedTransactionMessage() throws Exception {TransactionMQProducer producer = new TransactionMQProducer("optimized_transaction_producer");producer.setNamesrvAddr("localhost:9876");producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String transactionId = msg.getTransactionId();try {// 1. 执行本地事务boolean success = executeLocalBusiness(msg, arg);// 2. 记录事务状态LocalTransactionState state = success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;transactionStates.put(transactionId, state);return state;} catch (Exception e) {transactionStates.put(transactionId, LocalTransactionState.UNKNOW);return LocalTransactionState.UNKNOW;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String transactionId = msg.getTransactionId();// 1. 从缓存中获取事务状态LocalTransactionState cachedState = transactionStates.get(transactionId);if (cachedState != null) {return cachedState;}// 2. 从数据库查询事务状态return queryTransactionStatusFromDatabase(transactionId);}});producer.start();}private boolean executeLocalBusiness(Message msg, Object arg) {// 执行本地事务return true;}private LocalTransactionState queryTransactionStatusFromDatabase(String transactionId) {// 从数据库查询事务状态return LocalTransactionState.COMMIT_MESSAGE;}
}
5. 事务消息的最佳实践
设计原则
- 事务边界:明确事务的边界和范围
- 异常处理:合理处理事务异常和回查
- 性能优化:优化事务消息的性能
- 监控告警:监控事务消息的状态
注意事项
- 事务回查:实现可靠的事务回查逻辑
- 性能影响:事务消息会影响性能
- 状态管理:合理管理事务状态
- 异常恢复:处理系统异常和恢复
监控指标
- 事务消息发送成功率
- 事务消息消费延迟
- 事务回查次数
- 事务消息失败率
6. 事务消息与其他方案的对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
事务消息 | 最终一致性、性能好 | 实现复杂、有延迟 | 异步场景 |
本地消息表 | 实现简单、可靠性高 | 性能较差、维护复杂 | 同步场景 |
Saga模式 | 灵活性高、可补偿 | 实现复杂、状态管理难 | 长事务 |
TCC模式 | 强一致性、性能好 | 实现复杂、业务侵入 | 短事务 |
第八题:MQ支持哪些消息过滤方式?Tag过滤和SQL过滤有什么区别?
消息过滤是消息中间件的重要功能,用于根据特定条件筛选消息,提高消息处理的效率和精确性。不同MQ产品支持不同的过滤方式,理解各种过滤机制的原理和适用场景对设计高效的消息系统至关重要。
1. 消息过滤的基本概念
什么是消息过滤?
- 生产者过滤:生产者在发送时进行过滤
- Broker过滤:消息代理在存储时进行过滤
- 消费者过滤:消费者在接收时进行过滤
消息过滤的优势
- 提高效率:减少不必要的消息传输和处理
- 精确投递:只处理感兴趣的消息
- 资源节约:节省网络带宽和存储空间
- 业务解耦:支持更灵活的业务逻辑
消息过滤的挑战
- 性能影响:过滤操作可能影响性能
- 存储开销:过滤条件需要额外存储
- 复杂度增加:增加系统复杂度
- 维护成本:需要维护过滤规则
2. RocketMQ消息过滤
RocketMQ过滤方式
- Tag过滤:基于消息标签的简单过滤
- SQL过滤:基于消息属性的复杂过滤
- 类过滤:基于自定义过滤器的过滤
Tag过滤实现
// RocketMQ Tag过滤
public class RocketMQTagFilter {public void sendTagMessage() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("tag_filter_producer");producer.setNamesrvAddr("localhost:9876");producer.start();// 2. 发送不同Tag的消息String[] tags = {"TAG_A", "TAG_B", "TAG_C"};for (int i = 0; i < 10; i++) {String tag = tags[i % tags.length];String messageBody = "Tag Message " + i + " with " + tag;Message message = new Message("tag_filter_topic", tag, messageBody.getBytes("UTF-8"));// 设置消息属性message.putUserProperty("orderId", "ORDER_" + i);message.putUserProperty("userId", "USER_" + i);message.putUserProperty("amount", String.valueOf(100 + i));SendResult result = producer.send(message);System.out.println("发送Tag消息: " + tag + ", 结果: " + result);}producer.shutdown();}public void consumeTagMessage() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag_filter_consumer");consumer.setNamesrvAddr("localhost:9876");// 2. 订阅主题,只消费TAG_A和TAG_B的消息consumer.subscribe("tag_filter_topic", "TAG_A || TAG_B");// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {String tag = message.getTags();String body = new String(message.getBody(), "UTF-8");System.out.println("消费Tag消息: " + tag + ", 内容: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
SQL过滤实现
// RocketMQ SQL过滤
public class RocketMQSQLFilter {public void sendSQLMessage() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("sql_filter_producer");producer.setNamesrvAddr("localhost:9876");producer.start();// 2. 发送带有属性的消息for (int i = 0; i < 20; i++) {String messageBody = "SQL Filter Message " + i;Message message = new Message("sql_filter_topic", "TAG", messageBody.getBytes("UTF-8"));// 设置消息属性message.putUserProperty("orderId", "ORDER_" + i);message.putUserProperty("userId", "USER_" + i);message.putUserProperty("amount", String.valueOf(100 + i * 10));message.putUserProperty("region", i % 2 == 0 ? "NORTH" : "SOUTH");message.putUserProperty("category", i % 3 == 0 ? "ELECTRONICS" : "CLOTHING");SendResult result = producer.send(message);System.out.println("发送SQL消息: " + result);}producer.shutdown();}public void consumeSQLMessage() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql_filter_consumer");consumer.setNamesrvAddr("localhost:9876");// 2. 订阅主题,使用SQL过滤// 过滤条件:金额大于150且地区为NORTH的消息String sqlFilter = "amount > 150 AND region = 'NORTH'";consumer.subscribe("sql_filter_topic", MessageSelector.bySql(sqlFilter));// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {String body = new String(message.getBody(), "UTF-8");String orderId = message.getUserProperty("orderId");String amount = message.getUserProperty("amount");String region = message.getUserProperty("region");System.out.println("消费SQL过滤消息:");System.out.println("内容: " + body);System.out.println("订单ID: " + orderId);System.out.println("金额: " + amount);System.out.println("地区: " + region);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
类过滤实现
// RocketMQ 类过滤
public class RocketMQClassFilter {public void sendClassMessage() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("class_filter_producer");producer.setNamesrvAddr("localhost:9876");producer.start();// 2. 发送消息for (int i = 0; i < 10; i++) {String messageBody = "Class Filter Message " + i;Message message = new Message("class_filter_topic", "TAG", messageBody.getBytes("UTF-8"));// 设置消息属性message.putUserProperty("orderId", "ORDER_" + i);message.putUserProperty("priority", String.valueOf(i % 3));SendResult result = producer.send(message);System.out.println("发送类过滤消息: " + result);}producer.shutdown();}public void consumeClassMessage() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("class_filter_consumer");consumer.setNamesrvAddr("localhost:9876");// 2. 订阅主题,使用类过滤consumer.subscribe("class_filter_topic", MessageSelector.byMessageFilter(new MessageFilter() {@Overridepublic boolean match(MessageExt msg) {// 自定义过滤逻辑:只处理优先级为0的消息String priority = msg.getUserProperty("priority");return "0".equals(priority);}}));// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {String body = new String(message.getBody(), "UTF-8");String priority = message.getUserProperty("priority");System.out.println("消费类过滤消息: " + body + ", 优先级: " + priority);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
3. RabbitMQ消息过滤
RabbitMQ过滤方式
- Exchange路由:通过Exchange类型进行路由过滤
- 队列绑定:通过队列绑定规则进行过滤
- 消息属性:基于消息头属性进行过滤
Exchange路由过滤
// RabbitMQ Exchange路由过滤
public class RabbitMQExchangeFilter {public void setupExchangeFilter() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 声明Topic Exchangechannel.exchangeDeclare("filter_exchange", "topic", true);// 2. 声明队列channel.queueDeclare("high_priority_queue", true, false, false, null);channel.queueDeclare("low_priority_queue", true, false, false, null);channel.queueDeclare("urgent_queue", true, false, false, null);// 3. 绑定队列到Exchange,使用路由键过滤channel.queueBind("high_priority_queue", "filter_exchange", "order.high.*");channel.queueBind("low_priority_queue", "filter_exchange", "order.low.*");channel.queueBind("urgent_queue", "filter_exchange", "order.urgent.*");// 4. 发送消息String[] routingKeys = {"order.high.payment", "order.low.inventory", "order.urgent.refund"};for (int i = 0; i < 10; i++) {String routingKey = routingKeys[i % routingKeys.length];String message = "Filtered Message " + i + " with " + routingKey;// 设置消息属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().priority(i % 3).messageId("MSG_" + i).build();channel.basicPublish("filter_exchange", routingKey, props, message.getBytes("UTF-8"));System.out.println("发送过滤消息: " + routingKey + ", 内容: " + message);}// 5. 消费消息consumeFilteredMessages(channel);connection.close();}private void consumeFilteredMessages(Channel channel) throws Exception {// 消费高优先级队列channel.basicConsume("high_priority_queue", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {String message = new String(body, "UTF-8");System.out.println("高优先级队列消费: " + message);}});// 消费低优先级队列channel.basicConsume("low_priority_queue", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {String message = new String(body, "UTF-8");System.out.println("低优先级队列消费: " + message);}});// 消费紧急队列channel.basicConsume("urgent_queue", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {String message = new String(body, "UTF-8");System.out.println("紧急队列消费: " + message);}});}
}
消息头属性过滤
// RabbitMQ 消息头属性过滤
public class RabbitMQHeaderFilter {public void setupHeaderFilter() throws Exception {ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 声明Headers Exchangechannel.exchangeDeclare("header_filter_exchange", "headers", true);// 2. 声明队列channel.queueDeclare("vip_queue", true, false, false, null);channel.queueDeclare("normal_queue", true, false, false, null);// 3. 绑定队列到Exchange,使用消息头过滤Map<String, Object> vipArgs = new HashMap<>();vipArgs.put("x-match", "all"); // 所有条件都必须匹配vipArgs.put("userType", "VIP");vipArgs.put("region", "NORTH");channel.queueBind("vip_queue", "header_filter_exchange", "", vipArgs);Map<String, Object> normalArgs = new HashMap<>();normalArgs.put("x-match", "any"); // 任意条件匹配即可normalArgs.put("userType", "NORMAL");normalArgs.put("region", "SOUTH");channel.queueBind("normal_queue", "header_filter_exchange", "", normalArgs);// 4. 发送消息sendHeaderMessages(channel);// 5. 消费消息consumeHeaderMessages(channel);connection.close();}private void sendHeaderMessages(Channel channel) throws Exception {// 发送VIP用户消息Map<String, Object> vipHeaders = new HashMap<>();vipHeaders.put("userType", "VIP");vipHeaders.put("region", "NORTH");vipHeaders.put("priority", "HIGH");AMQP.BasicProperties vipProps = new AMQP.BasicProperties.Builder().headers(vipHeaders).build();String vipMessage = "VIP User Message";channel.basicPublish("header_filter_exchange", "", vipProps, vipMessage.getBytes("UTF-8"));System.out.println("发送VIP消息: " + vipMessage);// 发送普通用户消息Map<String, Object> normalHeaders = new HashMap<>();normalHeaders.put("userType", "NORMAL");normalHeaders.put("region", "SOUTH");normalHeaders.put("priority", "LOW");AMQP.BasicProperties normalProps = new AMQP.BasicProperties.Builder().headers(normalHeaders).build();String normalMessage = "Normal User Message";channel.basicPublish("header_filter_exchange", "", normalProps, normalMessage.getBytes("UTF-8"));System.out.println("发送普通消息: " + normalMessage);}private void consumeHeaderMessages(Channel channel) throws Exception {// 消费VIP队列channel.basicConsume("vip_queue", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {String message = new String(body, "UTF-8");System.out.println("VIP队列消费: " + message);}});// 消费普通队列channel.basicConsume("normal_queue", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {String message = new String(body, "UTF-8");System.out.println("普通队列消费: " + message);}});}
}
4. 消息过滤性能对比
过滤方式性能对比
过滤方式 | 性能 | 灵活性 | 复杂度 | 适用场景 |
---|---|---|---|---|
Tag过滤 | 高 | 低 | 低 | 简单分类 |
SQL过滤 | 中 | 高 | 中 | 复杂条件 |
类过滤 | 低 | 最高 | 高 | 自定义逻辑 |
Exchange路由 | 高 | 中 | 中 | 路由分发 |
消息头过滤 | 中 | 高 | 中 | 属性匹配 |
性能优化建议
- 合理选择过滤方式:根据业务需求选择合适的过滤方式
- 避免复杂过滤:避免过于复杂的过滤条件
- 缓存过滤结果:缓存常用的过滤结果
- 监控过滤性能:监控过滤操作的性能指标
5. 消息过滤最佳实践
设计原则
- 业务需求:根据业务需求设计过滤规则
- 性能考虑:平衡过滤功能和性能
- 维护性:保持过滤规则的简单和可维护
- 扩展性:支持过滤规则的动态调整
注意事项
- 过滤顺序:注意过滤操作的执行顺序
- 异常处理:处理过滤过程中的异常
- 资源管理:合理管理过滤相关的资源
- 监控告警:监控过滤操作的执行情况
监控指标
- 过滤消息数量
- 过滤操作耗时
- 过滤成功率
- 过滤规则命中率
第九题:如何优化MQ的性能?有哪些调优参数?
MQ性能优化是生产环境中的关键问题,涉及生产者性能、Broker性能、消费者性能和网络性能等多个方面。理解性能瓶颈、调优参数和优化策略对构建高性能的消息系统至关重要。
1. MQ性能优化的基本概念
性能优化的目标
- 吞吐量:提高消息处理的吞吐量
- 延迟:降低消息处理的延迟
- 资源利用率:提高系统资源利用率
- 稳定性:保证系统稳定运行
性能瓶颈分析
- 网络瓶颈:网络带宽和延迟限制
- 磁盘瓶颈:磁盘IO性能限制
- 内存瓶颈:内存容量和GC影响
- CPU瓶颈:CPU计算能力限制
性能优化原则
- 测量优先:先测量再优化
- 瓶颈识别:识别真正的性能瓶颈
- 逐步优化:逐步进行优化调整
- 监控验证:持续监控优化效果
2. RocketMQ性能优化
生产者性能优化
// RocketMQ生产者性能优化
public class OptimizedRocketMQProducer {public void setupOptimizedProducer() throws Exception {// 1. 创建生产者DefaultMQProducer producer = new DefaultMQProducer("optimized_producer_group");producer.setNamesrvAddr("localhost:9876");// 2. 性能优化配置producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小4MBproducer.setSendMsgTimeout(3000); // 发送超时时间3秒producer.setRetryTimesWhenSendFailed(2); // 发送失败重试次数producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数// 3. 批量发送优化producer.setBatchSize(1000); // 批量发送大小producer.setBatchTimeout(100); // 批量发送超时时间// 4. 网络优化producer.setClientIP("192.168.1.100"); // 设置客户端IPproducer.setInstanceName("ProducerInstance_" + System.currentTimeMillis());// 5. 启动生产者producer.start();// 6. 批量发送消息sendBatchMessages(producer);producer.shutdown();}private void sendBatchMessages(DefaultMQProducer producer) throws Exception {List<Message> messages = new ArrayList<>();// 准备批量消息for (int i = 0; i < 1000; i++) {String messageBody = "Batch Message " + i;Message message = new Message("optimized_topic", "TAG", messageBody.getBytes("UTF-8"));messages.add(message);}// 批量发送SendResult result = producer.send(messages);System.out.println("批量发送结果: " + result);}
}
消费者性能优化
// RocketMQ消费者性能优化
public class OptimizedRocketMQConsumer {public void setupOptimizedConsumer() throws Exception {// 1. 创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 2. 性能优化配置consumer.setConsumeThreadMin(20); // 最小消费线程数consumer.setConsumeThreadMax(64); // 最大消费线程数consumer.setConsumeMessageBatchMaxSize(32); // 批量消费消息数量consumer.setPullBatchSize(32); // 批量拉取消息数量consumer.setPullInterval(0); // 拉取间隔,0表示不间隔consumer.setPullThresholdForQueue(1000); // 队列拉取阈值consumer.setPullThresholdSizeForQueue(100); // 队列拉取大小阈值consumer.setPullThresholdForTopic(10000); // 主题拉取阈值consumer.setPullThresholdSizeForTopic(1000); // 主题拉取大小阈值// 3. 消费模式优化consumer.setMessageModel(MessageModel.CLUSTERING); // 集群消费模式consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从最新位置消费// 4. 订阅主题consumer.subscribe("optimized_topic", "*");// 5. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {// 批量处理消息processBatchMessages(messages);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 6. 启动消费者consumer.start();}private void processBatchMessages(List<MessageExt> messages) {// 批量处理消息,提高处理效率for (MessageExt message : messages) {String body = new String(message.getBody(), "UTF-8");System.out.println("处理消息: " + body);}}
}
Broker性能优化配置
# RocketMQ Broker性能优化配置
# 文件配置:broker.conf# 1. 存储优化
storePathRootDir=/opt/rocketmq/store
storePathCommitLog=/opt/rocketmq/store/commitlog
storePathConsumeQueue=/opt/rocketmq/store/consumequeue
storePathIndex=/opt/rocketmq/store/index# 2. 内存优化
mappedFileSizeCommitLog=1073741824 # 1GB
mappedFileSizeConsumeQueue=300000 # 300KB
mappedFileSizeIndex=40000000 # 40MB# 3. 刷盘优化
flushDiskType=ASYNC_FLUSH # 异步刷盘
flushCommitLogLeastPages=4 # 最少刷盘页数
flushCommitLogThoroughInterval=10000 # 强制刷盘间隔# 4. 网络优化
listenPort=10911 # 监听端口
haListenPort=10912 # HA监听端口
sendMessageThreadPoolNums=16 # 发送消息线程池大小
pullMessageThreadPoolNums=20 # 拉取消息线程池大小# 5. 性能优化
maxMessageSize=4194304 # 最大消息大小4MB
maxTransferBytesOnMessageInMemory=262144 # 内存中最大传输字节数
maxTransferCountOnMessageInMemory=32 # 内存中最大传输消息数
maxTransferBytesOnMessageInDisk=65536 # 磁盘中最大传输字节数
maxTransferCountOnMessageInDisk=8 # 磁盘中最大传输消息数# 6. 清理优化
deleteWhen=04 # 删除时间
fileReservedTime=72 # 文件保留时间72小时
3. RabbitMQ性能优化
RabbitMQ性能优化配置
// RabbitMQ性能优化
public class OptimizedRabbitMQ {public void setupOptimizedRabbitMQ() throws Exception {// 1. 连接工厂优化ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");// 2. 连接池优化factory.setRequestedHeartBeat(60); // 心跳间隔60秒factory.setConnectionTimeout(30000); // 连接超时30秒factory.setNetworkRecoveryInterval(5000); // 网络恢复间隔5秒factory.setAutomaticRecoveryEnabled(true); // 启用自动恢复factory.setTopologyRecoveryEnabled(true); // 启用拓扑恢复// 3. 连接池配置factory.setChannelMax(200); // 最大通道数factory.setFrameMax(131072); // 最大帧大小128KBfactory.setRequestedChannelMax(100); // 请求的最大通道数// 4. 创建连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 5. 通道优化channel.basicQos(100); // 设置QoS,一次最多处理100条消息channel.confirmSelect(); // 启用确认模式// 6. 声明队列和ExchangesetupOptimizedQueues(channel);// 7. 发送优化消息sendOptimizedMessages(channel);connection.close();}private void setupOptimizedQueues(Channel channel) throws Exception {// 1. 声明Exchangechannel.exchangeDeclare("optimized_exchange", "direct", true, false, null);// 2. 声明队列,设置优化参数Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-max-length", 10000); // 队列最大长度queueArgs.put("x-message-ttl", 3600000); // 消息TTL 1小时queueArgs.put("x-max-priority", 10); // 最大优先级channel.queueDeclare("optimized_queue", true, false, false, queueArgs);channel.queueBind("optimized_queue", "optimized_exchange", "optimized.routing.key");}private void sendOptimizedMessages(Channel channel) throws Exception {// 1. 批量发送消息for (int i = 0; i < 1000; i++) {String message = "Optimized Message " + i;// 2. 设置消息属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.priority(i % 10) // 设置优先级.messageId("MSG_" + i).timestamp(new Date()).build();// 3. 发送消息channel.basicPublish("optimized_exchange", "optimized.routing.key", props, message.getBytes("UTF-8"));}// 4. 等待确认channel.waitForConfirms(5000);System.out.println("所有消息发送完成并确认");}
}
RabbitMQ服务器优化配置
# RabbitMQ服务器优化配置
# 文件:/etc/rabbitmq/rabbitmq.conf# 1. 内存优化
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.5
disk_free_limit.relative = 2.0# 2. 网络优化
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true
tcp_listen_options.keepalive = true
tcp_listen_options.exit_on_close = false# 3. 连接优化
channel_max = 200
frame_max = 131072
heartbeat = 60# 4. 队列优化
queue_master_locator = min-masters
queue_index_embed_msgs_below = 4096# 5. 性能优化
collect_statistics_interval = 5000
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
4. 性能监控和调优
性能监控指标
// MQ性能监控
public class MQPerformanceMonitor {public void monitorRocketMQPerformance() throws Exception {// 1. 创建管理工具DefaultMQAdminExt adminExt = new DefaultMQAdminExt();adminExt.setNamesrvAddr("localhost:9876");adminExt.start();try {// 2. 监控生产者性能monitorProducerPerformance(adminExt);// 3. 监控消费者性能monitorConsumerPerformance(adminExt);// 4. 监控Broker性能monitorBrokerPerformance(adminExt);} finally {adminExt.shutdown();}}private void monitorProducerPerformance(DefaultMQAdminExt adminExt) throws Exception {// 监控生产者统计信息ProducerConnection producerConnection = adminExt.examineProducerConnectionInfo("producer_group", "test_topic");if (producerConnection != null) {System.out.println("生产者连接数: " + producerConnection.getConnectionSet().size());}}private void monitorConsumerPerformance(DefaultMQAdminExt adminExt) throws Exception {// 监控消费者统计信息ConsumeStats consumeStats = adminExt.examineConsumeStats("consumer_group");if (consumeStats != null) {System.out.println("消费TPS: " + consumeStats.getConsumeTps());System.out.println("消费延迟: " + consumeStats.getConsumeLatency());System.out.println("消费进度: " + consumeStats.getOffsetTable());}}private void monitorBrokerPerformance(DefaultMQAdminExt adminExt) throws Exception {// 监控Broker统计信息BrokerStatsData brokerStats = adminExt.viewBrokerStatsData("broker_name", "broker_stats");if (brokerStats != null) {System.out.println("Broker统计信息: " + brokerStats.getStatsTable());}}
}
性能调优参数
组件 | 参数 | 说明 | 推荐值 |
---|---|---|---|
生产者 | sendMsgTimeout | 发送超时时间 | 3000ms |
生产者 | retryTimesWhenSendFailed | 发送失败重试次数 | 2-3次 |
生产者 | compressMsgBodyOverHowmuch | 消息压缩阈值 | 4KB |
消费者 | consumeThreadMin | 最小消费线程数 | 20 |
消费者 | consumeThreadMax | 最大消费线程数 | 64 |
消费者 | consumeMessageBatchMaxSize | 批量消费大小 | 32 |
Broker | mappedFileSizeCommitLog | CommitLog文件大小 | 1GB |
Broker | flushDiskType | 刷盘方式 | ASYNC_FLUSH |
5. 性能优化最佳实践
优化策略
- 批量操作:使用批量发送和消费
- 异步处理:使用异步发送和处理
- 连接池:使用连接池管理连接
- 压缩优化:对消息进行压缩
- 缓存优化:合理使用缓存
注意事项
- 内存管理:注意内存使用和GC影响
- 网络优化:优化网络配置和参数
- 磁盘优化:选择合适的存储和刷盘策略
- 监控告警:建立完善的监控体系
监控指标
- 消息发送TPS
- 消息消费TPS
- 消息处理延迟
- 系统资源使用率
- 错误率和重试率
第十题:如何处理消息堆积问题?如何快速恢复消费?
消息堆积是生产环境中的常见故障,指消息的生产速度超过消费速度,导致消息在队列中大量积压。理解堆积原因、处理策略和恢复方案对保障系统稳定运行至关重要。
1. 消息堆积的基本概念
什么是消息堆积?
- 生产速度 > 消费速度:消息生产过快,消费跟不上
- 消费者故障:消费者宕机或处理异常
- 消费能力不足:消费者数量或处理能力不够
- 业务逻辑问题:消费逻辑存在性能瓶颈
消息堆积的影响
- 内存压力:大量消息占用内存
- 磁盘压力:消息持久化占用磁盘空间
- 处理延迟:消息处理延迟增加
- 系统不稳定:可能导致系统崩溃
堆积严重程度判断
- 轻微堆积:堆积量 < 1万条,延迟 < 1分钟
- 中等堆积:堆积量 1-10万条,延迟 1-10分钟
- 严重堆积:堆积量 > 10万条,延迟 > 10分钟
- 紧急堆积:堆积量 > 100万条,延迟 > 1小时
2. 消息堆积的原因分析
生产者原因
- 突发流量:业务高峰期流量激增
- 批量发送:大量消息批量发送
- 重试机制:消息重试导致重复发送
- 配置不当:生产者配置不合理
消费者原因
- 消费能力不足:消费者数量或线程数不够
- 处理逻辑复杂:消费逻辑存在性能瓶颈
- 外部依赖:依赖外部服务响应慢
- 资源不足:CPU、内存、网络资源不足
系统原因
- 网络问题:网络延迟或丢包
- 磁盘IO:磁盘IO性能瓶颈
- GC影响:频繁GC影响处理性能
- 锁竞争:线程锁竞争影响并发
3. 消息堆积的处理策略
紧急处理策略
// 消息堆积紧急处理
public class MessageBacklogEmergencyHandler {public void handleEmergencyBacklog() throws Exception {// 1. 立即扩容消费者scaleUpConsumers();// 2. 临时降级处理enableTemporaryDegradation();// 3. 分流处理enableMessageShunting();// 4. 监控告警setupEmergencyMonitoring();}private void scaleUpConsumers() {System.out.println("紧急扩容:增加消费者实例和线程数");}private void enableTemporaryDegradation() {System.out.println("临时降级:关闭非核心功能,简化处理逻辑");}private void enableMessageShunting() {System.out.println("消息分流:分发到多个队列并行处理");}private void setupEmergencyMonitoring() {System.out.println("紧急监控:实时监控堆积情况,设置告警阈值");}
}
消费者扩容策略
// 消费者扩容处理
public class ConsumerScalingHandler {public void scaleUpConsumers() throws Exception {// 创建多个消费者实例for (int i = 0; i < 5; i++) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("scaled_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 设置消费线程数consumer.setConsumeThreadMin(20);consumer.setConsumeThreadMax(64);// 设置批量消费consumer.setConsumeMessageBatchMaxSize(32);consumer.setPullBatchSize(32);// 订阅主题consumer.subscribe("backlog_topic", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,ConsumeConcurrentlyContext context) {// 快速处理消息processMessagesQuickly(messages);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("启动消费者实例: " + i);}}private void processMessagesQuickly(List<MessageExt> messages) {// 快速处理消息,减少处理时间for (MessageExt message : messages) {String body = new String(message.getBody(), "UTF-8");System.out.println("快速处理消息: " + body);// 简化处理逻辑,只做核心处理// 非核心逻辑可以异步处理或跳过}}
}
消息分流处理
// 消息分流处理
public class MessageShuntingHandler {public void handleMessageShunting() throws Exception {// 创建多个队列来分流消息String[] queueNames = {"backlog_queue_1", "backlog_queue_2", "backlog_queue_3", "backlog_queue_4", "backlog_queue_5"};ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明Exchangechannel.exchangeDeclare("backlog_exchange", "direct", true);for (String queueName : queueNames) {// 声明队列channel.queueDeclare(queueName, true, false, false, null);// 绑定到Exchangechannel.queueBind(queueName, "backlog_exchange", queueName);System.out.println("创建队列: " + queueName);}// 发送消息到不同队列for (int i = 0; i < 1000; i++) {String message = "Shunted Message " + i;String routingKey = "backlog_queue_" + (i % 5 + 1);channel.basicPublish("backlog_exchange", routingKey, null, message.getBytes("UTF-8"));}System.out.println("消息分流完成");connection.close();}
}
4. 消息堆积的预防措施
预防策略
// 消息堆积预防
public class MessageBacklogPrevention {public void setupPreventionMeasures() {// 1. 容量规划System.out.println("容量规划: 评估业务峰值,计算所需资源,预留缓冲容量");// 2. 监控告警System.out.println("监控告警: 实时监控队列长度,监控消费延迟,设置告警阈值");// 3. 自动扩容System.out.println("自动扩容: 基于队列长度扩容,自动增加消费者,自动调整线程数");// 4. 限流保护System.out.println("限流保护: 生产者限流,消费者限流,动态调整限流,熔断保护");}
}
5. 消息堆积的恢复方案
快速恢复方案
// 消息堆积快速恢复
public class MessageBacklogRecovery {public void quickRecovery() throws Exception {// 1. 评估堆积情况System.out.println("评估堆积情况: 检查队列长度,分析堆积原因,评估影响范围");// 2. 选择恢复策略System.out.println("选择恢复策略: 轻微堆积增加消费者,中等堆积扩容+分流,严重堆积紧急处理+降级");// 3. 执行恢复操作System.out.println("执行恢复操作: 立即扩容,消息分流,降级处理,清理积压");// 4. 验证恢复效果System.out.println("验证恢复效果: 监控队列长度变化,检查消费速度,验证处理延迟");}
}
6. 消息堆积最佳实践
处理原则
- 快速响应:发现堆积立即处理
- 分级处理:根据严重程度选择策略
- 预防为主:建立预防机制
- 持续监控:实时监控堆积情况
注意事项
- 数据安全:处理堆积时保证数据安全
- 业务影响:评估对业务的影响
- 资源消耗:注意扩容的资源消耗
- 恢复时间:控制恢复时间在可接受范围内
监控指标
- 队列长度
- 消费延迟
- 消费TPS
- 系统资源使用率
- 错误率和重试率
关键知识点总结
- 基本概念:ActiveMQ基于JMS,RocketMQ专为分布式设计,RabbitMQ基于AMQP
- 性能特点:RocketMQ性能最高,RabbitMQ功能最灵活,ActiveMQ最传统
- 可靠性保证:生产者确认、消息队列持久化、消费者手动确认
- 高可用架构:Master-Slave、集群、镜像队列等不同高可用架构
- 发布机制:不同MQ采用不同的消息发布模式和机制
- 最佳实践:监控告警、定期检查、故障演练、容量规划