RabbitMQ—基础篇
MQ介绍
-
应用场景
- 异步解耦:用于上下游模块通信,是更彻底的一种异步任务
- 流量削峰:短时间内高并发场景下,将大量的请求置于MQ中,防止流量冲击
- 数据收集:生产者只需将数据发送到 MQ 主题(Topic)或队列(Queue),消费者按需订阅数据
-
MQ分类
-
ActiveMQ:单机万级吞吐量,消息丢失概率低;但现在已经基本不进行维护
-
Kafka:为大数据而生的消息中间件,适合处理大量数据
-
RocketMQ:单机吞吐量十万级,支持分布式,消息不会丢失;适合金融互联网领域
-
RabbitMQ:万级吞吐量,性能好时效低;适合中小规模场景
-
-
MQ协议:MQ运行时遵循的传输协议
- AMQP:特性是 分布式事务支持、消息的持久化支持、高性能和高可靠的消息处理优势 ,RabbitMQ采用的就是此协议
- MQTT:特性是 轻量、结构简单、传输快、不支持事务、没有持久化设计
- Kafka:特性是 结构简单、解析速度快、无事务支持、有持久化设计
-
MQ协议和HTTP协议
- MQ协议是长连接,注重传输高性能;HTTP协议是短连接,注重资源轻量化
- MQ协议采用二进制帧或固定格式消息,传输效率较高;HTTP协议采用请求/响应模式,其安全性要求降低了传输效率
RabbitMQ运行原理
-
核心组件
- 生产者/消费者:MQ的上下游模块,一般为相关微服务
- 代理服务器(Broker):一个Broker就是一个RabbitMQ节点,集群部署即多Broker,所有的Vhost共同组成了Broker
- 虚拟主机(Vhost):相当于一个小型RabbitMQ,一个Vhost由交换机、绑定、队列等组件组成,Vhost之间相互独立
- 交换机(Exchange):接收来自生产者的消息,根据Binding将消息转发给相应队列
- 绑定(Binding):指定交换机将消息路由到哪些队列,不同交换机类型有不同的绑定规则
- 队列(Queue):队列需要和交换机绑定,是真正存储消息的组件,本质是一个消息缓存区
-
运行流程
- 生产者发出消息请求,获得连接中的一个Channel,进而连接到指定Vhost中的交换机
- 消息到达交换机后,根据消息中的routingKey和消息队列匹配(Binding),从而将消息分配到指定队列中
- 当消费者订阅队列后,RabbitMQ 会主动将消息推送给消费者
-
设计思想
- Broker:可以认为Broker就是RabbitMQ服务,多Broker可以用于集群部署
- Vhost:用于逻辑隔离、权限控制和资源管理,尤其是在复杂系统中的安全、维护和性能问题
- Channel连接:解决实现并发安全和连接复用,多个Channel共享同一个
Connection
,减少网络开销并保证线程安全 - Exchang:下游模块只需关注自己的队列,交换机根据业务需求转发到相应队列
┌──────────────┐ ┌──────────────┐│ Producer │ │ Consumer ││ ┌─────────┐ │ │ ┌─────────┐ ││ │ Channel │ │ │ │ Channel │ ││ └─────────┘ │ │ └─────────┘ │└───────╋──────┘ └───────╋──────┘│ │┌────────────╋───────────────────────────╋────────────────────────────┐ │ │ Broker(RabbitMQ Server) │ ││ ┌───────╋───────────────────────────╋─────────┐ ┌─────────┐ │ │ │ ▼ Vhost1 ▼ │ │ Vhost2 │ │ │ │ ┌────────┐ ┌───────┐ ┌────────────────┐│ │ Vhsot3 │ │ │ │ │Exchange│-->│Binding│-->│Queue1,Queue2...││ │ Vhost4 │ │ │ │ └────────┘ └───────┘ └────────────────┘│ │ ... │ │ │ └─────────────────────────────────────────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────────────┘
消息确认机制
- 目的:MQ实现了生产者和消费者之间的数据通信,其中每个步骤都可能导致数据丢失
生产者 ---(步骤1)---> 交换机 ---(步骤2)---> 队列 (步骤3:持久化)---(步骤4)---> 消费者
生产者确认机制
-
目的:保证生产者成功发送消息到达RabbitMQ
- RabbitMQ重连机制:生产者端未成功连接到RabbitMQ,如果客户端连接失败会自动重试直至超时报错
- RabbitMQ确认机制:生产者成功发送了消息,但RabbitMQ处理消息失败时,会被检测到并处理
-
生产者确认机制:支持以下两种机制
- Confirm机制:生产者发送消息后,交换机接收到消息即返回
ACK
,否则返回NACK
- Return机制:生产者发送消息后,交换机正确路由队列中返回
ACK
,否则返回NACK
- 工作中不建议使用Return机制,可以用死信队列处理失败:路由失败后会过期,自动转到死信队列
- Confirm机制:生产者发送消息后,交换机接收到消息即返回
-
失败处理
requeue=true
:生产者收到NACK
时,会自动重发消息直至成功requeue=false
:丢弃消息
-
配置方法:需通过客户端代码或配置文件实现,见下文
消费者确认机制
-
目的:保证消费者成功消费了消息
- 消费者成功消费消息后,返回
ACK
给RabbitMQ - 消费者消费失败后,返回
NACK
给RabbitMQ
- 消费者成功消费消息后,返回
-
失败处理
requeue=true
:消息重新回到原队列,消费者会再次收到requeue=false
:消息被丢弃或进入死信队列- 消息未确认:消息处于挂起状态,直至消息处理完毕、TTL过期或者消费者端连接断开
-
配置方法:需通过客户端代码或配置文件实现,见下文
数据持久化
-
目的:备份数据,支持以下两种刷盘方式
- 异步刷盘(默认):RabbitMQ将数据写入内存中后即返回
ACK
给生产者,后续会异步持久化到硬盘中 - 同步刷盘:RabbitMQ将数据写入内存并成功持久化到硬盘后才返回
ACK
给生产者
- 异步刷盘(默认):RabbitMQ将数据写入内存中后即返回
-
配置方法
- 创建队列/交换机时,选择
durable
属性即可开启组件持久化 - 发送消息时,设置消息属性
delivery_mode=2
即可开启消息持久化 - 通过修改broker的配置文件实现同步刷盘/异步刷盘
- 创建队列/交换机时,选择
-
惰性队列:RabbitMQ提供了惰性队列功能,惰性队列会强制持久化消息,无论是否已配置持久化策略
- 创建队列时,加参数
x-queue-mode = lazy
即可 - RabbitMQ3.12版本后,所有队列默认都是惰性队列
- 消费消息时消息才会从硬盘加载到内存中,即内存中只保存最近的消息
- 创建队列时,加参数
死信队列
-
目的:消息丢失的兜底方案,当消息异常时可以放入死信队列,后续单独处理异常信息
- 消息消费失败
- 消息堆积,即队列中的消息数量超过
x-max-length
- 消息TTL过期(消息重新投递不会重置TTL)
-
配置方法
- 死信交换机就是Direct交换机,死信队列就是普通队列
- 绑定死信交换机和业务队列
service_queue
,如下配置业务队列:
x-dead-letter-exchange
:死信交换机名称
x-dead-letter-routing-key
:死信路由键,死信交换机可以绑定多个队列,可以根据死信路由键路由到指定死信队列中
其他参数:0(如x-message-ttl
设置消息过期时间)按需配置
硬盘↑|内存↑|
生产者 ---> 交换机 ---> 队列 ---> 消费者↑ │ │ ↑ |└────ACK────┘ | └───ACK───┘↓ 死信交换机 ---> 死信队列 ---> 死信处理器
工作模式
-
RabbitMQ对于常见的业务场景,提供了多种工作模式,这些工作模式封装了相应的功能
-
配置方法
- 调试时可以直接通过图形化控制台快速设置
- 开发阶段需要通过客户端代码或配置文件实现
工作队列模式
-
说明:一个消息队列,多个消费者;多个消费者轮询消费此消息队列且不会重复消费
-
应用场景:耗时任务的分布式处理,如订单处理、数据同步、文件上传等
-
控制台模拟
- 创建队列,名称为
work_queue
,Routing Key
设置为队列名字 - 不显式绑定交换机,此时会默认绑定到
AMQP default
交换机上 - 向
AMQP default
发送消息,Routing Key
为work_queue
,观察simple_queue
是否收到消息 - 多消费者同时消费消息,可以在消费者端设置公平分发,即处理完当前消息并发送
ACK
后,才会接收新消息
- 创建队列,名称为
生产者 ---> 隐式交换机AMQP default ---> 单消息队列 ---> 消费者1,消费者2,..
广播模式
-
说明:一个交换机,多个队列,多个消费者;交换机将消息转发给多个队列,一个消费者对应一个队列
-
应用场景:通知系统、实时数据同步、日志分发等
-
控制台模拟
- 创建交换机,类型为
fanout
,或者直接使用内置的amq.fanout
交换机 - 创建多个队列,绑定到该交换机上,忽略
Routing Key
- 向交换机发送消息,观察所有的队列是否都收到消息
- 创建交换机,类型为
┌── (广播) ---> 队列1 ---> 消费者1,消费者2,..
生产者 ---> fanout交换机 |── (广播) ---> 队列2 ---> 消费者1,消费者2,..└── (广播) ---> 队列3 ---> 消费者1,消费者2,.....
路由模式
-
说明:一个交换机,多个队列,多个消费者;交换机将消息根据
Routing Key
转发给指定队列,一个消费者对应一个队列 -
应用场景:较于广播模式,路由模式按特定规则分发消息的场景,如日志分级处理、多租户系统消息路由等
-
控制台模拟
- 创建交换机,类型为
direct
,或者直接使用内置的amq.direct
- 创建多个队列,绑定到该交换机上,不同的队列
Routing Key
互异 - 向交换机发送多条带有不同
Routing Key
的消息,观察匹配的队列是否收到消息
- 创建交换机,类型为
┌── (Routing Key精确匹配) ---> 队列1 ---> 消费者,...
生产者 ---> direct交换机 |── (Routing Key精确匹配) ---> 队列2 ---> 消费者,...└── (Routing Key精确匹配) ---> 队列3 ---> 消费者,......
主题模式
-
说明:一个交换机,多个队列,多组消费者;`一个队列可以被多个消费者轮询,相当于 路由模式+工作模式
-
应用场景:较于路由模式,主题模式的
Routing Key
带有通配符,可以模糊匹配队列,适合模糊匹配的复杂路由场景 -
控制台模拟
- 创建交换机,类型为
topic
,或者直接使用内置的amq.topic
- 创建多个队列,绑定到该交换机上,设置相应的
Routing Key
- 向交换机发送多条带有不同
Routing Key
的消息,观察匹配的队列是否收到消息
- 创建交换机,类型为
-
通配符使用以下两种符号:
*
(星号):匹配 单个单词(以.
分隔)#
(井号):匹配 零个或多个单词(包括空单词)
┌── (Routing Key模糊匹配) ---> 队列1 ---> 消费者,...
生产者 ---> topic交换机 |── (Routing Key模糊匹配) ---> 队列2 ---> 消费者,...└── (Routing Key模糊匹配) ---> 队列3 ---> 消费者,......
头模式
-
说明:RabbitMQ的消息支持传递
Headers
参数(键值对形式),头模式即采用headers
匹配- 至少两个参数:一个参数必须是
x-match = all或any
,其他参数为自定义的key-value
对,例如type=1
- 如果
x-match = all
,消息中的所有headers必须与绑定中的参数完全匹配 - 如果
x-match = any
,消息中的headers中只要有一个与绑定中的参数匹配即可 - 如果匹配成功,消息将被路由到绑定的队列中;如果匹配失败,消息将被丢弃或发送到死信队列
- 至少两个参数:一个参数必须是
-
应用场景:需要基于消息元数据(如 JSON、XML 格式消息)路由的场景
-
控制台模拟
- 创建交换机,类型为
headers
,或者直接使用内置的amq.headers
- 创建多个队列,绑定到该交换机上,设置相应的
Arguments
- 向交换机发送多条带有不同
headers
的消息,观察匹配的队列是否收到消息
- 创建交换机,类型为
┌── (headers匹配) ---> 队列1 ---> 消费者,...
生产者 ---> headers交换机 |── (headers匹配) ---> 队列2 ---> 消费者,...└── (headers匹配) ---> 队列3 ---> 消费者,......
Java客户端
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
快速配置
spring:rabbitmq:# 连接配置host: your.rabbitmq.server.com # RabbitMQ服务器地址port: 5672 # 默认端口5672username: your_username # 用户名password: your_password # 密码virtual-host: / # 虚拟主机,默认为/# 连接超时和心跳设置connection-timeout: 5000 # 连接超时时间(毫秒)requested-heartbeat: 60 # 心跳间隔(秒)# 缓存配置cache:channel:size: 25 # 通道缓存大小connection:mode: channel # 连接缓存模式(channel|connection)size: 1 # 连接缓存大小# 发布者配置publisher-confirm-type: correlated # 发布者确认类型(none|simple|correlated)publisher-returns: true # 是否启用返回机制 # 模板配置template:retry:enabled: true # 是否启用重试initial-interval: 1000ms # 初始重试间隔max-attempts: 3 # 最大重试次数multiplier: 1.0 # 重试间隔乘数mandatory: true # 是否强制消息确认receive-timeout: 1000ms # 接收超时时间# 监听器配置listener:simple:acknowledge-mode: auto # 确认模式(auto|manual|none)auto-startup: true # 是否自动启动监听器concurrency: 1 # 最小消费者数量max-concurrency: 1 # 最大消费者数量prefetch: 250 # 预取消息数量,防止消息堆积default-requeue-rejected: true # 是否重新排队被拒绝的消息retry:enabled: true # 是否启用监听器重试initial-interval: 1000ms # 初始重试间隔max-attempts: 3 # 最大重试次数multiplier: 1.0 # 重试间隔乘数stateless: true # 是否无状态重试# 声明队列queues:- name: my-queuedurable: trueexclusive: falseauto-delete: falsearguments:x-queue-type: quorum # 仲裁队列# 声明交换机exchanges:- name: my-exchangetype: directdurable: trueauto-delete: false# 声明绑定bindings:- queue: my-queueexchange: my-exchangerouting-key: my.routing.key
高级配置
- 连接配置
@Configuration
public class RabbitMQConfig {// 配置连接工厂,用于配置rabbitmq连接@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/");// 可选配置connectionFactory.setConnectionTimeout(5000); // 连接超时connectionFactory.setRequestedHeartBeat(60); // 心跳间隔connectionFactory.setPublisherConfirms(true); // 开启生产者comfirm机制// connectionFactory.setPublisherReturns(true); // 开启生产者return机制return connectionFactory;}// 配置RabbitTemplate,用于发送和接收消息@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());// 配置生产者comfirm机制rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {if (ack) {log.info("消息 {} 发送成功", correlation.getId());} else {log.error("消息 {} 发送失败: {}", correlation.getId(), cause);}});// 配置生产者return机制// rabbitTemplate.setReturnsCallback(returned -> {// System.out.println("消息无法路由到队列: " + returned.getMessage());// });// rabbitTemplate.setMandatory(true); // 设置mandatory标志为true,确保消息无法路由时触发returnsCallback// 配置重试规则(哪些异常需要重试)RetryTemplate retryTemplate = new RetryTemplate();Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();retryableExceptions.put(org.springframework.amqp.AmqpIOException.class, true); // 网络异常retryableExceptions.put(org.springframework.amqp.AmqpConnectException.class, true); // 连接异常retryableExceptions.put(org.springframework.amqp.UncategorizedAmqpException.class, true); // 其他 AMQP 异常SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions); // 最多重试 3 次retryTemplate.setRetryPolicy(retryPolicy);// 配置退避策略(重试间隔)ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(1000); // 初始间隔 1sbackOffPolicy.setMultiplier(2.0); // 间隔倍数(1s -> 2s -> 4s)backOffPolicy.setMaxInterval(10000); // 最大间隔 10sretryTemplate.setBackOffPolicy(backOffPolicy); // 设置 RetryTemplaterabbitTemplate.setRetryTemplate(retryTemplate());return rabbitTemplate;}// 配置RabbitAdmin,用于管理RabbitMQ的拓扑结构(如交换机、队列、绑定关系)@Beanpublic AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}
}
- 配置组件:如果组件已存在,则不会覆盖创建,但如果同名组件参数不一致会报错
@Configuration
public class RabbitMQDeclarablesConfig {// ========== 交换机和队列 ==========@Beanpublic DirectExchange directExchange() { // 创建交换机return ExchangeBuilder // 优先使用建造者模式创建.directExchange("direct.exchange") // 交换机名称.durable(true) // 是否持久化// .autoDelete() // 默认自动删除功能为false,如果显式加了autoDelete()就是true.build();// 构造函数创建,可以支持所有属性和扩展参数// return new DirectExchange("direct.exchange", true, false); // return new TopicExchange("topic.exchange", true, false);// return new FanoutExchange("fanout.exchange", true, false);}@Beanpublic Queue directQueue() { // 创建队列,并绑定死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换器args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键args.put("x-message-ttl", 10000); // 可选:设置消息TTL,10秒后过期成为死信args.put("x-max-length", 10); // 可选:设置队列最大长度,超过10条消息后,新消息成为死信return new Queue("direct.queue", true, false, false, args);/*public Queue(String name, // 队列名称boolean durable, // 是否持久化boolean exclusive, // 是否排他(仅限当前连接)boolean autoDelete, // 是否自动删除Map<String, Object> arguments // 额外参数(如死信队列、TTL 等))*/}@Beanpublic Binding bindingDirectQueue() { // 绑定Direct交换机和业务队列return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key"); // 主题交换机可以含有通配符,例如direct.#}// ========== 死信交换机(DLX)和死信队列 ==========@Beanpublic DirectExchange dlxExchange() { // 死信交换机return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true); // 死信队列}@Beanpublic Binding bindingDLX() { // 绑定死信交换机和死信队列return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); // 死信路由键}// ========== Headers 交换机配置 ==========@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers.exchange", true, false);}@Beanpublic Queue headersQueue() {return new Queue("headers.queue", true);}@Beanpublic Binding bindingHeadersQueue(Queue headersQueue, HeadersExchange headersExchange) {return BindingBuilder.bind(headersQueue).to(headersExchange).whereAll("key1", "key2").match("value1", "value2");}
}
- 配置监听器工厂:
@RabbitListener
标注的每一个Bean都会据此创建配置
@Configuration
@Slf4j
public class RabbitMQListenerConfig {private final ConnectionFactory connectionFactory;public RabbitMQListenerConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}// 简单消息监听容器工厂配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3); // 初始消费者数量factory.setMaxConcurrentConsumers(10); // 最大消费者数量factory.setPrefetchCount(50); // 消费者一次性处理未确认消息的最大数量factory.setDefaultRequeueRejected(false); // 不重新排队被拒绝的消息// 配置消息确认模式:框架自动处理|手动确认|消息异常直接丢弃factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // AUTO | MANUAL | NONEfactory.setBatchListener(true); // 启用批量监听factory.setBatchSize(10); // 每批处理10条消息return factory;}
}
生产者发送消息
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {// 如果入参是对象,RabbitTemplate会默认进行jdk序列化,建议先手动Json序列化后再发送ObjectMapper objectMapper = new ObjectMapper();String json = objectMapper.writeValueAsString(new Book("西游记", "吴承恩"));rabbitTemplate.convertAndSend("direct.exchange", // 交换机名称"routing.key", // Routing KeyjsonMessage,msg -> {// 设置消息 TTL(单位:毫秒),如果同时设置了消息级 TTL 和队列级 TTL,以两者中较小的值为准msg.getMessageProperties().setExpiration(String.valueOf(5000));return msg;});}public void sendCompleteMessage(String payload) {// 创建消息并设置 HeaderMessage message = MessageBuilder.withBody(payload.getBytes()).setHeader("messageId", UUID.randomUUID().toString()) // 唯一ID.setHeader("timestamp", System.currentTimeMillis()) // 时间戳.setHeader("type", "order_created") // 业务类型.build();// 发送消息rabbitTemplate.send("exchange_name", "routing_key", message);}
}
消费者监听
-
手动ACK说明
channel.basicNack(deliveryTag(消息标识符),multiple(是否批量处理),requeue(true会无限重试,false会进入死信));
- 单条消息处理:
channel.basicReject(deliveryTag(消息标识符), requeue(true会无限重试,false会进入死信));
-
@RabbitListener
标注的监听器Bean默认使用多线程异步处理消息- Spring AMQP 会为每个监听容器创建一个线程池(线程数量由
PrefetchCount
决定) - 每个消息由线程池中的空闲线程处理,无需等待上一条消息处理完成
- 并发消费:如果线程池有足够线程,可以同时处理多条消息
- Spring AMQP 会为每个监听容器创建一个线程池(线程数量由
@Component
public class OrderConsumer {@RabbitListener(queues = "order.queue") // 声明监听队列// @RabbitListener(queues = {"order.queue", "payment.queue"}) // 可以监听多个队列public void handleOrderWithManualAck(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("Received: " + message);channel.basicAck(deliveryTag, false); // 手动ACK} catch (Exception e) {channel.basicNack(deliveryTag, false, false); // 进入死信队列}}@RabbitListener(queues = "order.queue") // 工作队列模式下,可以多个消费者监听同一个队列public void handleBatch(List<String> messages, // 需要监听容器开启了批量功能,入参使用List接受Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { // 一批消息对应一个标识符try {for (String message : messages) {System.out.println("Received: " + message);}channel.basicAck(deliveryTag, true); // 批量确认} catch (Exception e) {channel.basicNack(deliveryTag, true, false); // 进入死信队列}}
}
消费异常处理
-
对于核心业务,建议加入重试机制
- 如果未达到最大重试次数 →
NACK(requeue=true)
(继续重试) - 如果达到最大次数 →
NACK(requeue=false)
或Reject(requeue=false)
,消息进入死信队列
- 如果未达到最大重试次数 →
-
方法1:使用Spring Retry拦截器,直接重试,失败后手动入死信
- 此方法保证消息顺序一致
- 放弃重试后会自动进入对应死信,或者直接丢弃
factory.setAdviceChain
也可以实现重试,但配置后不可动态调整且Spring Retry更灵活
@Configuration
@Slf4j
public class RabbitMQListenerConfig {private final ConnectionFactory connectionFactory;public RabbitMQListenerConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}// 简单消息监听容器工厂配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3); // 初始消费者数量factory.setMaxConcurrentConsumers(10); // 最大消费者数量factory.setPrefetchCount(50); // 预取消息数量factory.setDefaultRequeueRejected(false); // 不重新排队被拒绝的消息// 配置消息确认模式:框架自动处理|手动确认|消息异常直接丢弃factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // AUTO | MANUAL | NONEfactory.setBatchListener(true); // 启用批量监听factory.setBatchSize(10); // 每批处理10条消息// 配置重试模板(RetryTemplate)RetryTemplate retryTemplate = new RetryTemplate();// 设置重试策略(最多重试3次)Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();retryableExceptions.put(Exception.class, true); // 所有异常均触发重试SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);retryTemplate.setRetryPolicy(retryPolicy);// 设置退避策略(指数退避:首次2s,后续翻倍,最大10s)ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(2000);backOffPolicy.setMultiplier(2);backOffPolicy.setMaxInterval(10000);retryTemplate.setBackOffPolicy(backOffPolicy);factory.setRetryTemplate(retryTemplate);// 也可以快速配置重试策略
// factory.setAdviceChain(
// RetryInterceptorBuilder.stateless()
// .maxAttempts(3)
// .backOffOptions(1000, 2.0, 10000)
// .recoverer((message, throwable) ->
// log.error("消息重试失败:{},已转入死信队列:{}",
// throwable.getMessage(),
// new String(message.getBody()))
// )
// .build()
// );return factory;}
}
-
方法2:手动实现重试逻辑,消费异常后重新入队,失败后进入死信队列
- 此方法可能会打乱消息顺序,但性能高
- 此方法可以对于不同的队列进行自定义处理
@Component
@Slf4j
public class RabbitMQConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = "direct.queue") // 声明监听队列public void handleOrderWithManualAck(Message message, Channel channel) throws IOException {try {System.out.println(new String(message.getBody()));throw new RuntimeException();// 业务处理成功// channel.basicAck(deliveryTag, false);} catch (Exception e) {int retryCount = Optional.ofNullable((Integer) message.getMessageProperties().getHeaders().get("x-retry-count")).orElse(0);if (retryCount < 3) {message.getMessageProperties().setHeader("x-retry-count", retryCount + 1);System.out.println("重试中");// 拒绝当前消息(不重新入队)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);// 手动重新发布消息(确保 Header 被持久化)rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(),message);} else {// 重试耗尽,拒绝消息(如果配置了DLX会路由到死信队列)System.out.println("重试失败");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}}
}
RabbitMQ应用
异步解耦
- 目的:将耗时操作或不需要立即返回结果的操作通过MQ异步执行,提高系统的响应速度和吞吐量
- 举例:用户抢购完成后,异步操作创建订单,发送通知等
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate RabbitTemplate rabbitTemplate;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {Long userId = ThreadLocalDto.threadLocal.get().getUserId();RMap<String, String> goodMap = redissonClient.getMap("good:" + goodId); //商品信息bucket// 1.不在抢购期内直接失败Long begin = Long.parseLong(goodMap.get("beginTime"));Long end = Long.parseLong(goodMap.get("endTime"));if (LocalDateTime.ofInstant(Instant.ofEpochMilli(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 分布式锁:用商品ID作为锁的KeyRLock lock = redissonClient.getLock("lock:good:" + goodId);try{// 2.尝试拿redisson分布式锁boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (!isLock) { //重试后最终拿锁失败,返回错误信息return Result.fail("系统繁忙,请重试");}// 3.拿锁成功,开始尝试扣减库存if (Long.parseLong(goodMap.get("stoke")) <= 0){return Result.fail("没有库存");}goodMap.addAndGet("stoke", -1); // 4.抢购成功,使用MQ异步创建临时订单 OrderDto orderDto = OrderDto.builder().goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();rabbitTemplate.convertAndSend("direct.order", "direct.order", JSON.toJSONString(orderDto));return Result.ok();}finally{if(isLock){// 5.释放锁lock.unlock();}} }
}
@Component
@Slf4j
public class OrderConsumer {@Autowiredprivate RedissonClient redissonClient;@RabbitListener(queues = "direct.order") // 声明监听队列public void handleOrderWithManualAck(String order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {OrderDto orderDto = JSON.parseObject(order, OrderDto.class);Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idorderDto.setOrderId(orderId);redissonClient.getBucket("order:"+orderId).set(JSON.toJSONString(orderDto)); //存入订单信息channel.basicAck(deliveryTag, false); // 手动ACK} catch (Exception e) {log.error("Processing failed: " + e.getMessage());channel.basicReject(deliveryTag, false); // 拒绝单条消息(不重试,直接进入死信队列)}}
}
流量削峰
- 目的:短时间高并发访问时限流
- 原理:先将请求发给MQ,消费者再处理请求,通过配置消费者并发数量控制流量
- 配置交换机和队列
@Configuration
public class RabbitMQConfig {// 创建队列@Beanpublic Queue peakCuttingQueue() {return QueueBuilder.durable("peak.cutting.queue").build();}// 创建直连交换机@Beanpublic DirectExchange peakCuttingExchange() {return new DirectExchange("peak.cutting.exchange");}// 绑定队列和交换机@Beanpublic Binding bindingPeakCutting() {return BindingBuilder.bind(peakCuttingQueue()).to(peakCuttingExchange()).with("peak.cutting.routing.key");}
}
Controller
转发请求
@RestController
@RequestMapping("/api/traffic")
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMapping("/peak")public String handlePeakRequest(@RequestBody String payload) {// 将请求发送到削峰队列String messageId = UUID.randomUUID().toString();CorrelationData corrData = new CorrelationData(messageId);Message message = MessageBuilder.withBody(payload.getBytes()).setHeader("x-message-id", messageId).build();rabbitTemplate.send("exchange", "routingKey", message, corrData);return Result.ok();}
}
- 异步处理请求
@Component
public class MessageConsumer {/*** 监听削峰队列* @param message 消息内容* @param channel 通道* @param deliveryTag 消息标签*/@RabbitListener(queues = "peak.cutting.queue")public void processPeakCuttingMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("开始处理消息: " + message);// 模拟业务处理时间(实际业务中替换为你的业务逻辑)Thread.sleep(1000);// 手动确认消息channel.basicAck(deliveryTag, false);System.out.println("消息处理完成: " + message);} catch (InterruptedException | IOException e) {try {// 处理失败,拒绝消息并重新入队(可以根据需要调整)channel.basicNack(deliveryTag, false, true);System.err.println("消息处理失败,已重新入队: " + message);} catch (IOException ex) {System.err.println("消息确认失败: " + ex.getMessage());}}}
}
消费幂等
-
目的:保证消费者不会重复消费消息
- 生产者可能会因为网络等原因重复发消息
- 如果消费者未 ACK 消息且连接断开(如进程崩溃、网络中断),RabbitMQ 会重新投递给其他消费者*
-
原理
- 在redis中记录消息状态,消费消息前先查询消息状态:消费中|未消费|已消费
- 可以根据数据库的一些特性(例如集合唯一性,MySQL唯一索引)保证幂等性
@Component
@Slf4j
public class RabbitMQConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "my.queue")public void handleMessage(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (messageId == null) {// 如果没有唯一ID,拒绝消息并记录日志channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);return;}// 使用 Redis 检查是否已处理Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent("msg:processed:" + messageId, // Redis Key"1", // Value(无意义)24, // TTL(小时),避免长期占用内存TimeUnit.HOURS);if (Boolean.TRUE.equals(isProcessed)) {try {// 首次处理:执行业务逻辑doBusiness(message);// 成功则ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,删除Redis记录(可选)redisTemplate.delete("msg:processed:" + messageId);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}} else {// 已处理过,直接ACK(避免重复入队)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}
延迟消息
-
目的:消息投递给RabbitMQ后,指定时间段后再进行消费
-
方案
- 利用TTL-死信队列实现(推荐):死信队列中的消息是到期消息,消费端关注死信队列即可
- 使用RabbitMQ的延迟队列插件:暂存在交换机的内部存储,等暂存时间过了再将此消息发送给消费者
- 使用定时任务功能:循环查询
headers
中的时间参数,校验过期就消费
死信队列实现
-
配置方法
- 对业务队列绑定死信队列
- 设置队列级别TTL,或者发送消息时设置消息级别TTL(以较小TTL为准)
- 消费者监听死信队列消费
-
绑定死信队列
@Configuration
public class RabbitMQDeclarablesConfig {@Beanpublic DirectExchange directExchange() { // 创建交换机return ExchangeBuilder // 优先使用建造者模式创建.directExchange("direct.exchange") // 交换机名称.durable(true) // 是否持久化.build();}@Beanpublic Queue directQueue() { // 创建队列,并绑定死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换器args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键args.put("x-message-ttl", 10000); // 可选:设置消息TTL,10秒后过期成为死信args.put("x-max-length", 10); // 可选:设置队列最大长度,超过10条消息后,新消息成为死信return new Queue("direct.queue", true, false, false, args);}@Beanpublic Binding bindingDirectQueue() { // 绑定Direct交换机和业务队列return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key"); // 主题交换机可以含有通配符,例如direct.#}@Beanpublic DirectExchange dlxExchange() { // 死信交换机return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true); // 死信队列}@Beanpublic Binding bindingDLX() { // 绑定死信交换机和死信队列return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); // 死信路由键}
}
- 发送延迟消息
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendWithMessageTTL(String message, int ttl) {rabbitTemplate.convertAndSend("direct.exchange", "direct.routing.key", message, msg -> {msg.getMessageProperties().setExpiration(String.valueOf(ttl));return msg;});}
}
延迟插件实现
-
rabbitmq_delayed_message_exchange
插件版本与RabbitMQ大版本一致即可: rabbitmq/rabbitmq-delayed-message-exchange -
配置方法
- 将插件放入挂载目录:rabbitmq-plugins映射的目录下
- 进入容器执行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启RabbitMQ:
docker restart rabbitmq
-
声明延迟队列
@Configuration
public class DelayedQueueConfig {// 声明延迟交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定底层交换机类型为 directreturn new CustomExchange("delayed.exchange", // 交换机名称"x-delayed-message", // 交换机类型(延迟消息交换机)true, // 是否持久化false, // 是否自动删除args // 交换机参数);}// 声明队列@Beanpublic Queue delayedQueue() {return new Queue("delayed.queue", true);}// 绑定队列与交换机@Beanpublic Binding bindingDelayedQueue() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed.routing.key").noargs();}
}
- 发送延迟消息
@RestController
public class DelayedMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDelayed")public String sendDelayedMessage(@RequestParam String message, @RequestParam long delayTime) {rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setDelay(delayTime); // 设置延迟时间(毫秒)return msg;});return "Delayed message sent: " + message + " (delay: " + delayTime + "ms)";}
}
RPC通信
-
RabbitMQ并没有直接提供RPC模式的功能,需要开发者手动实现的一种通信模式
- 直接让客户端和服务端进行通信虽然简单直接,但在某些场景下会面临扩展性、解耦、容错性等方面的限制
- RabbitMQ实现RPC模式可以解决上述问题,客户端发送请求并等待响应,服务端处理请求后返回结果
-
模拟步骤
- 创建至少两个队列,
rpc_queue
用于接收请求,callback_queue
用于接收响应,建议附correlation_id
为客户端唯一标识,方便多客户端时互相辨别 - 客户端通过交换机匹配
correlation_id
发消息给rpc_queue
- 服务端消费指定服务端
rpc_queue
中的消息,处理完毕后将响应发给callback_queue
- 创建至少两个队列,
---> 微服务1 ---> direct交换机 ---> rpc_queue队列 ---> 微服务2 ---> direct交换机 ---> callback_queue队列 ---> 下游模块
日志收集
- 自定义 Logback Appender
@Component
public class RabbitMQLogAppender extends AppenderBase<ILoggingEvent> {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overrideprotected void append(ILoggingEvent event) {// 将日志转换为 JSON 或字符串String logMessage = String.format("[%s] %s - %s",event.getLevel(),event.getLoggerName(),event.getFormattedMessage());// 发送到 RabbitMQrabbitTemplate.convertAndSend(RabbitMQLogConfig.LOG_EXCHANGE,"", // Fanout 交换机不需要 routingKeylogMessage);}
}
- 配置 Logback:logback-spring.xml
<configuration><appender name="RABBITMQ" class="com.example.RabbitMQLogAppender"/><root level="INFO"><appender-ref ref="RABBITMQ"/></root>
</configuration>
- 消费者监听队列
@Service
public class LogConsumerService {@Autowiredprivate SystemLogRepository logRepository;@RabbitListener(queues = RabbitMQLogConfig.LOG_QUEUE)public void receiveLog(String logMessage) {// 解析日志(简单示例,实际可用正则或JSON解析)String[] parts = logMessage.split(" - ");if (parts.length >= 2) {String level = parts[0].substring(1, parts[0].indexOf(']'));String loggerName = parts[0].substring(parts[0].indexOf(']') + 2);String message = parts[1];// 存储到数据库SystemLog log = new SystemLog();log.setLevel(level);log.setLoggerName(loggerName);log.setMessage(message);log.setTimestamp(LocalDateTime.now());logRepository.save(log);}}
}