rabbitMq内容整理
一、RabbitMQ基础概念
RabbitMQ是一个开源的、实现AMQP(高级消息队列协议)的消息代理中间件,负责应用间异步消息传递、解耦、削峰、异步处理任务等。
核心概念有:
生产者(Producer):发送消息的程序。
消费者(Consumer):接收并处理消息的程序。
Broker:消息服务器,RabbitMQ服务端程序。
Exchange(交换机):负责根据规则分发消息到队列。
Queue(队列):存放消息的地方。
Binding(绑定):Exchange和Queue之间的关系,包含路由规则。
Routing Key(路由键):决定消息如何路由到队列。
二、RabbitMQ的消息模型
RabbitMQ支持以下Exchange类型:
交换机类型 | 工作方式 | 使用场景示例 |
---|---|---|
direct | 根据消息routing key精确匹配分发消息 | 精准消息发送,如支付通知、订单确认 |
fanout | 忽略routing key广播到所有绑定的队列 | 广播场景,如通知系统内所有服务刷新缓存 |
topic | 根据routing key模式匹配分发消息 | 模糊匹配场景,如日志收集、监控数据分发 |
headers | 根据消息头部匹配规则 | 极少使用,功能类似topic,但不关注routing key |
常用场景举例:
direct模式
一个exchange,一个queue绑定routingKey为
user.register
,生产者发送时携带user.register
路由键,消息被精确发送到该队列。
fanout模式
一个exchange绑定多个queue,发送消息时所有队列都能接收,适合发布订阅广播场景。
topic模式
可以使用通配符
*
匹配单个单词,#
匹配0或多个单词,例如:log.error
-> 精确匹配log.*
匹配log.error
、log.info
log.#
匹配log.error.http
三、RabbitMQ重要特性与工作中常用知识点
1、消息可靠性(必掌握)
RabbitMQ实现消息可靠传递的机制:
生产者端确认(Publisher Confirm)
RabbitMQ确认生产者的消息已成功到达Broker。
设置
channel.confirmSelect()
开启确认模式。
Broker端持久化
Exchange、Queue、Message都需要开启持久化(durable=True),消息会落盘。
注意:消息可靠性会影响性能。
消费者端ACK机制
消费者明确告知消息消费完成,Broker才会删除消息。
自动ACK和手动ACK(推荐)两种模式:
自动ACK (
autoAck=true
),消费异常时可能导致消息丢失。手动ACK (
channel.basicAck(deliveryTag, false)
),保障数据可靠性。
2、消息幂等性(必掌握)
防止重复消息消费引发的数据一致性问题。
一般做法:
使用消息唯一ID(UUID)去重;
利用Redis或数据库的唯一键做去重校验。
3、消息死信队列(DLX)
用于处理未被正常消费的消息(超过重试次数、消息过期、队列满了)。
DLX机制:
创建单独死信交换机和队列;
将业务队列绑定到死信交换机,消息异常或超时后自动进入死信队列。
4、消息延迟队列
利用TTL(消息存活时间)和DLX实现延迟队列功能:
设置消息或队列TTL,消息过期进入DLX。
常用场景:
订单超时未支付自动取消;
延迟发送通知。
5、消息优先级队列
设置队列为priority模式:
Map<String, Object> args = new HashMap<>(); args.put("x-max-priority", 10); channel.queueDeclare(queue, durable, exclusive, autoDelete, args);
消息发送时指定优先级:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().priority(5).build();
6、流控与消息积压问题处理(必掌握)
消费端限流(QoS)
java
复制编辑
channel.basicQos(10); // 一次最多处理10条未确认消息
设置合理的消费者数量,防止消息堆积。
监控队列长度和消费速率,及时预警处理。
四、RabbitMQ集群与高可用(常用)
镜像队列(Mirror Queue):
同步队列数据到多个节点,提高可用性,防止单点故障。
集群模式(Cluster):
主节点维护元数据,其他节点从节点复制数据。
Federation(联合)模式:
异地多数据中心间消息复制。
五、RabbitMQ性能优化(常用)
使用长连接,避免频繁开关。
批量发送消息,减少网络IO。
调整消息持久化策略,非关键数据可关闭。
合理配置消费并发数和线程池。
六、工作中常见问题与解决方法(重点)
问题 | 常见原因 | 解决办法 |
---|---|---|
消息重复消费 | 消费端未正确ACK、消息超时重投 | 保证幂等、正确ACK |
消息丢失 | Broker未持久化、生产者未确认、未ACK | 确认模式、开启持久化 |
队列消息堆积 | 消费速率慢、消息大量堆积 | 限流、增加消费者数量 |
连接数过多 | 没用连接池、频繁创建连接 | 连接池、长连接 |
消息乱序 | RabbitMQ默认先进先出,但并不完全保证顺序 | 业务逻辑自行排序或用Kafka |
七、监控管理(必掌握)
使用RabbitMQ Management Plugin
默认端口
15672
;查看队列、连接、交换机、消息堆积、消费者情况;
可手动删除消息、重置连接。
Prometheus + Grafana 监控方案:
深入监控队列深度、消息消费速率、连接数、资源消耗等。
分界线
-------------------------------------------------------------------------------------------------------------
一、为什么选择 RabbitMQ 而不是其他中间件(如 Kafka、RocketMQ)?
面试官一般想了解你对常用MQ的理解差异,考察你技术选型能力。
功能点/场景 | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|
协议 | AMQP 标准 | 自定义协议 | 自定义协议 |
消息可靠性 | 非常高(事务、Confirm、ACK) | 较高 | 高 |
吞吐量 | 中(万级) | 极高(百万级) | 高(十万级) |
延迟消息 | 原生TTL+DLX支持 | 不支持原生延迟 | 原生支持 |
消息顺序性 | 不严格保证 | 分区内严格保证 | 严格保证 |
场景偏好 | 高可靠事务性消息、业务解耦 | 日志、埋点、大数据传输 | 电商场景 |
使用难度 | 简单、易管理 | 较难(生态复杂) | 较容易(偏Java) |
举例场景:
RabbitMQ适合高可靠业务通知场景,如订单支付结果通知;
Kafka适合日志传输和大数据处理场景;
RocketMQ更适合电商领域,有严格顺序消息需求。
二、如何保证RabbitMQ的消息可靠性、幂等性?
1. 消息可靠性(生产者→MQ→消费者):
生产者:
开启confirm确认机制:
channel.confirmSelect(); // 开启确认模式
channel.basicPublish(exchange, routingKey, null, message.getBytes());
if (channel.waitForConfirms()) {System.out.println("消息投递成功");
} else {System.out.println("消息投递失败,需要重试或记录");
}
MQ服务器:
设置队列和Exchange持久化:
channel.exchangeDeclare("exchange", "direct", true);
channel.queueDeclare("queue", true, false, false, null);
channel.queueBind("queue", "exchange", "routingKey");
消息持久化发送:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化消息.build();
channel.basicPublish("exchange", "routingKey", props, message.getBytes());
消费者:
使用手动ACK机制(关键):
channel.basicConsume("queue", false, (consumerTag, delivery) -> {try {// 处理消息System.out.println("收到消息: " + new String(delivery.getBody()));// 手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 拒绝消息并重回队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
}, consumerTag -> {});
2. 消息幂等性(防止消息重复消费):
幂等性就是同一消息多次处理结果不变。
常见实现方式:
① 数据库唯一索引去重:
-- 使用订单号做唯一索引,消费前insert去重 CREATE UNIQUE INDEX order_unique ON order_table(order_id);
② Redis去重(推荐):
String msgId = message.getMsgId(); // 消息唯一标识
if (redis.setnx("msg:"+msgId, "1") == 1) {redis.expire("msg:"+msgId, 3600); // 设置过期时间1小时processMessage(message);
} else {// 消息重复,直接丢弃
}
三、RabbitMQ消费端如何限流?
消费者限流主要依靠basicQos:
// 同一时间最多允许处理10条消息(未确认状态) channel.basicQos(10);
解释:
RabbitMQ不会一次性推送超过设定值的消息给消费者,未ACK的消息数量达到10时,队列暂停向该消费者发送新消息。
四、RabbitMQ消息堆积如何处理?
常见处理办法:
增加消费者数量或并发数;
消费者限流合理设置;
临时创建新队列和消费者分流;
严重时,考虑清理无用消息。
优化方案举例:
增加消费者(并发处理):
// 启动多个consumer实例(独立进程或线程) for (int i = 0; i < 10; i++) { new Thread(new ConsumerRunnable()).start(); }
五、如何使用RabbitMQ实现延迟消息功能?
延迟消息通过TTL(消息生存时间)+ DLX(死信队列)实现。
实现步骤(详细示例):
① 定义正常队列与死信队列
// 死信交换机和队列
channel.exchangeDeclare("dlx_exchange", "direct", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "timeout");// 正常业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "timeout");
args.put("x-message-ttl", 60000); // 队列TTL 60秒channel.queueDeclare("order_queue", true, false, false, args);
② 发送消息到order_queue
,过期进入死信队列:
channel.basicPublish("", "order_queue", null, message.getBytes());
③ 消费死信队列,实现延迟处理:
channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {System.out.println("延迟消息:" + new String(delivery.getBody()));
}, consumerTag -> {});
六、RabbitMQ的确认机制与死信队列机制怎么用?
确认机制前文(第二节)已详细解释。
死信队列适用于消息无法被消费(超时、异常):
关键场景:
消息TTL过期;
队列满了;
消费者拒绝消息(basicNack)。
实际使用:
常用于异常消息降级处理、失败重试,防止无限重试导致队列拥堵。
七、RabbitMQ集群有哪些方式?如何实现高可用?
1. 集群类型:
普通模式(集群元数据同步,无队列同步)
镜像队列模式(常用,高可用)
Federation模式(跨数据中心场景)
2. 镜像队列实现高可用(推荐):
通过策略实现自动镜像队列:
shell
复制编辑
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
解释:
^
表示所有队列;ha-mode:all
表示复制到所有节点。