分布式专题——12 RabbitMQ之应用开发
1 RabbitMQ基础编程模型
- RabbitMQ 提供了很多种主流编程语言的客户端支持,这里只分析 Java 语言的客户端。
1.1 Maven依赖
-
要使用 RabbitMQ 的 Java 客户端,首先需要在 Maven 项目中添加对应的依赖。下面是 RabbitMQ Java 客户端的 Maven 坐标:
amqp-client
是 RabbitMQ 对 AMQP(高级消息队列协议)的 Java 实现库,版本为5.21.0
;- 添加这个依赖后,项目就能使用 RabbitMQ 相关的类和方法来进行消息队列的操作了;
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version> </dependency>
1.2 基础编程模型
1.2.1 创建连接,获取 Channel
// 首先创建 ConnectionFactory 对象,它是用于创建与 RabbitMQ 服务器连接的工厂类
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 服务器的主机名、端口、用户名、密码以及虚拟主机等连接参数
factory.setHost(HOST_NAME);
factory.setPort(HOST_PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
// 创建一个与 RabbitMQ 服务器的连接 Connection
Connection connection = factory.newConnection();
// 从连接中创建一个 Channel,后续的 RabbitMQ 操作(如声明交换机、队列、发送和接收消息等)主要通过 Channel 来进行
Channel channel = connection.createChannel();
- 在 RabbitMQ 的客户端使用中,一般创建一个
Channel
就足够了,因为只要这个Channel
不被关闭,就能够一直被重复使用,这样可以减少资源的创建和销毁开销,提高效率; - 当需要创建多个
Channel
时,要留意可能出现的冲突情况; - 在调用
createChannel
方法创建Channel
时,可以传入一个int
类型的参数channelNumber
;- 这个
channelNumber
会成为该Channel
的唯一标识; - RabbitMQ 会通过检查这个
channelNumber
来防止重复:- 如果对应的
channelNumber
还没有被用来创建过Channel
,那么就会成功创建一个新的Channel
; - 但如果该
channelNumber
已经被用来创建过Channel
了,此时调用createChannel
方法就会返回null
,以此避免出现Channel
标识冲突的问题。
- 如果对应的
- 这个
1.2.2 声明 Exchange
-
声明交换机:
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;
exchange
是交换机的名称;type
是交换机的类型,比如direct
、fanout
、topic
等,不同类型的交换机有不同的消息路由规则;durable
表示交换机是否持久化,若为true
,则服务器重启后交换机仍然存在;autoDelete
表示当交换机不再被使用时是否自动删除;arguments
是一些额外的参数,用于配置交换机的其他属性;
-
如果 Broker(RabbitMQ 服务器)上没有对应的交换机,RabbitMQ 会自动创建;如果已经存在,声明时的参数要与 Broker 上的保持一致,否则会报错;
-
不同版本的 RabbitMQ,参数是有变动的,最好都以管理控制台上的为准:
1.2.3 声明 queue
-
声明队列:
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
queue
是队列的名称;durable
表示队列是否持久化,持久化的队列在服务器重启后不会丢失;exclusive
表示队列是否是排他的,若为true
,则只有创建它的连接可以访问,连接关闭后队列会被删除;autoDelete
表示当队列不再被使用时是否自动删除;arguments
是队列的额外参数,可用于配置队列的一些特殊属性,比如声明Quorum
队列或Stream
队列时需要通过arguments
来指定队列类型等;
-
同样,若 Broker 上没有该队列,RabbitMQ 会创建;若已存在,声明参数需与 Broker 上的一致;
-
队列的多数参数以管理控制台中的为准,像之前版本存在的
AutoDelete
选项,在当前版本已被移除,且下方参数也和老版本有差异;Durability
用于表示队列的持久化特性。若选择Durable
,队列及其中的消息在服务器重启后不会丢失;若选择Transient
,则队列和消息不会持久化,服务器重启后相关信息就会消失,不过这种情况下消息读写效率会比较高;- 与交换机(Exchange)不同,队列在管理控制台上有
Type
参数,该参数未在 API 中直接体现,这是由于历史版本原因以及不同类型队列实现方式不同。例如Quorum
和Stream
类型的队列,本身就没有Durability
参数,因为它们的消息默认必须持久化,后续详细参数也会有很大区别;
-
队列是有多种类型的,客户端 API 中默认只能声明
Classic
类型的对列。如果需要声明其他类型的对列,需要通过arguments
参数来区分:-
Classic 队列:客户端 API 中默认就只能声明
Classic
类型队列; -
Quorum 队列:
Map<String,Object> params = new HashMap<>(); params.put("x-queue-type","quorum"); // 需要在 arguments 中传入参数 x-queue-type,参数值设定为 quorum // 并且对于 Quorum 类型队列,durable 参数必须设为 true,exclusive 参数必须设为 false,否则会报错 channel.queueDeclare(QUEUE_NAME, true, false, false, params);
-
Stream 队列:
Map<String,Object> params = new HashMap<>(); params.put("x-queue-type","stream"); params.put("x-max-length-bytes", 20_000_000_000L); // 日志文件的最大字节数: 20 GB params.put("x-stream-max-segment-size-bytes", 100_000_000); // 每一个日志文件的最大大小: 100 MB // x-queue-type 参数要设置为 stream // durable 参数必须为 true,exclusive 必须为 false channel.queueDeclare(QUEUE_NAME, true, false, false, params);
x-max-length-bytes
表示日志文件的最大字节数,x-stream-max-segment-size-bytes
每一个日志文件的最大大小。这两个是可选参数,通常为了防止Stream
日志无限制累计,都会配合Stream
队列一起声明;- 不过
Stream
类型的队列不能像前两种队列一样使用,比如声明消费者后,从控制台直接发消息,消费者端接收不到,具体原因后续会详细分析;
-
-
实际项目中使用最多的是 RabbitMQ 的
Classic
经典队列,但从 RabbitMQ 官网能看到,目前 RabbitMQ 更推荐使用Quorum
队列,而Stream
队列目前企业使用还比较少。
1.2.4 绑定队列和交换机
-
绑定队列和交换机:
channel.queueBind(String queue, String exchange, String routingKey)
queue
是要绑定的队列名称;exchange
是要绑定的交换机名称;routingKey
是路由键,它决定了消息从交换机路由到队列的规则,不同类型的交换机对路由键的使用方式不同;
-
有了绑定关系后,交换机才能将接收到的消息路由到对应的队列中。如果 Broker 上已存在该绑定,声明时的参数要与 Broker 上的一致。
1.2.5 Producer发送消息
-
生产者根据应用场景发送消息到管道:
channel.basicPublish(String exchange, String routingKey, BasicProperties props, message.getBytes("UTF-8")) ;
-
exchange
是消息要发送到的交换机名称,如果不需要特定交换机,可传空字符串; -
routingKey
是路由键,用于消息路由; -
props
是消息的一些属性,比如消息的持久化模式、优先级等;-
以上这些配置项,可以用 RabbitMQ 中提供的一个 Builder 对象来构建:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 设置消息的投递模式,这里使用了 PERSISTENT_TEXT_PLAIN 对应的投递模式,表示消息会持久化存储,服务器重启后消息不会丢失 builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()); // 设置消息的优先级 builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()); // 还可以通过 builder.headers(headers) 来设置消息的头部(Headers)部分,传入自定义的参数值。AMQP.BasicProperties prop = builder.build();
-
MessageProperties.PERSISTENT_TEXT_PLAIN
是 RabbitMQ 提供的持久化消息的默认配置,而 RabbitMQ 中消息是否持久化不光取决于消息,还取决于Queue。通常为了保证消息安全,会将 Queue 和消息同时声明为持久化;
-
-
message
是消息的内容,通过getBytes("UTF-8")
方法转换为字节数组作为消息体。
-
1.2.6 Consumer消费消息
-
Consumer 消费消息主要有两种模式:
-
Push 模式:
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
- Consumer 会等待 RabbitMQ 服务器将消息推过来再消费,通常会启动一个一直挂起的线程来等待消息;
autoAck
表示是否自动确认消息,若为true
,则消息被消费者接收到后会自动向 RabbitMQ 确认,RabbitMQ 会将该消息从队列中移除;若为false
,则需要消费者手动确认;
-
Pull 模式:
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
- Consumer 主动到 RabbitMQ 服务器上去拉取消息进行消费;
-
-
实际开发中建议使用 Push 模式,这样消息处理更及时,也不会给服务器带来重复查询的压力。
1.2.7 关闭通道与连接,释放资源
// 关闭通道
channel.close();
// 关闭连接
conection.clouse();
- 用完之后主动释放资源,若不主动释放,大部分情况下 RabbitMQ 过一段时间也会释放,但会额外消耗系统资源。
1.3 关于消息监听与回溯
-
在消费信息时,RabbitMQ 的
Channel
提供了重载的basicConsume
方法:用于消费队列中的消息,通过传入不同的回调函数,在消息投递、消费取消、消费者关闭信号等场景下执行相应逻辑;String basicConsume(String queue, // 要消费的队列名称DeliverCallback deliverCallback, // 消息投递时的回调,当队列中有消息可消费时,该回调会被触发CancelCallback cancelCallback, // 消费被取消时的回调,比如队列被删除等情况会触发ConsumerShutdownSignalCallback shutdownSignalCallback // 消费者关闭信号的回调,当消费者因某些原因关闭时触发)
- 这些回调是 RabbitMQ 在
Consumer
中预留的业务扩展点,在学习时可能感觉作用不大,但实际开发中,若前期没考虑到,后续增加消费端功能(如把consumerTag
、deliveryTag
作为消息编号保存等)会很困难,所以很有学习必要;
- 这些回调是 RabbitMQ 在
-
我们也可以自定义一个回调消费者:
public class CallbackConsumer {// 定义交换机名称常量private static final String EXCHANGE_NAME="callbackExchange";private static final String ALTER_EXCHANGE_NAME="alterExchange";// 定义队列名称常量private static final String QUEUE_NAME = "callbackQueue";public static void main(String[] args) throws Exception {// 获取RabbitMQ连接Connection connection = RabbitMQUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 创建参数映射,用于设置主交换机的备用交换机Map<String,Object> params = new HashMap<>();// 设置备用交换机参数,当消息无法路由到主交换机的队列时,会被转发到备用交换机params.put("alternate-exchange", ALTER_EXCHANGE_NAME);// 声明主交换机:direct类型,持久化,不自动删除,使用备用交换机参数channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, params);// 声明备用交换机:direct类型,持久化,不自动删除,无额外参数channel.exchangeDeclare(ALTER_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);// 声明队列:持久化,非排他,不自动删除,无额外参数channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 将队列绑定到主交换机,使用路由键"key1"// 这意味着只有路由键为"key1"的消息才会被路由到这个队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");// 开始消费队列中的消息,设置三个回调函数处理不同场景channel.basicConsume(QUEUE_NAME, // 消息传递回调:当成功接收到消息时触发new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {// 获取消息的投递标签(唯一标识)long deliveryTag = message.getEnvelope().getDeliveryTag();// 获取消息的相关ID(通常用于请求-响应模式)String correlationId = message.getProperties().getCorrelationId();// 打印接收到的消息详情System.out.println("received message consumerTag: " + consumerTag + "; message: " + new String(message.getBody()) +"; deliveryTag: " + deliveryTag +"; correlationId: " + correlationId);// 手动确认消息已处理,false表示只确认当前消息channel.basicAck(deliveryTag, false);}},// 取消回调:当消费者被取消时触发(例如队列被删除)new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("canceled message consumerTag: " + consumerTag + "; ");}},// 关闭信号回调:当通道或连接关闭时触发new ConsumerShutdownSignalCallback() {@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {System.out.println("consumer shutdown message consumerTag: " + consumerTag + "; Exception: " + sig);}});} }
-
然后往队列里多次发送消息后,会得到类似如下的输出:
received message consumerTag: amq.ctag-64_uNZDp66NSkiuTfu; message: messagedeliveryTag: 1 received message consumerTag: amq.ctag-64_uNZDp66NSkiuTfu; message: messagedeliveryTag: 2 received message consumerTag: amq.ctag-64_uNZDp66NSkiuTfu; message: messagedeliveryTag: 3 received message consumerTag: amq.ctag-64_uNZDp66NSkiuTfu; message: messagedeliveryTag: 4
-
这些是
DeliverCallback
回调反馈的结果,从中可以看到consumerTag
(消费者标签)和deliveryTag
(投递标签)等信息consumerTag
是 RabbitMQ 为消费者分配的唯一标识,用于区分不同的消费者。比如:amq.ctag - 64_uNZDp66NSkiuTfu
;deliveryTag
是 RabbitMQ 为每个投递到消费者的消息分配的唯一标识,用于消费者确认消息等操作,每个消息在投递时都会有一个递增的deliveryTag
。比如:1
、2
、3
、4
这些数字;
-
它们都是 RabbitMQ 服务器分配的内部参数,后续若要拓展消费端处理逻辑(如根据这些标签做消息编号等),这些信息很有帮助。
2 RabbitMQ常用的消息场景
-
虽然 RabbitMQ 的客户端 API 本身使用起来相对简单(比如创建连接、声明交换机 / 队列、发送 / 接收消息等基础操作有明确的方法调用),但在复杂业务场景中(如高并发、消息可靠性保障、复杂路由需求等),要真正用好 RabbitMQ 并发挥其价值,需要深入理解其原理和特性,这部分是学习的重点和难点。而理解业务场景是掌握这部分内容的关键 —— 只有结合具体业务需求,才能合理设计交换机类型、队列策略、消息属性等,避免为了用技术而用技术;
-
具体参见:RabbitMQ Tutorials | RabbitMQ,包含 7 种典型使用场景,这些场景覆盖了 RabbitMQ 的核心功能;
- 其中第 6 部分介绍了如何用 RabbitMQ 实现 RPC(远程过程调用),但实际开发中通常不会选择这种方式。原因在于:
- MQ 的核心优势是异步通信、解耦上下游服务,而 RPC 更强调同步调用的即时性和可靠性,两者设计目标存在差异;
- 用 MQ 实现 RPC 会增加复杂度(如处理超时、重试、结果关联等),不如直接使用专门的 RPC 框架(如 gRPC、Dubbo 等)高效可靠,因此后续不会重点分享该场景;
- 其中第 6 部分介绍了如何用 RabbitMQ 实现 RPC(远程过程调用),但实际开发中通常不会选择这种方式。原因在于:
-
下面除了官方教程中的典型场景,还会补充一种相对小众的路由机制 ——Header 路由。Header 交换机不依赖路由键(routing key),而是通过消息属性中的 headers 键值对与队列绑定的 headers 进行匹配来路由消息,适用于一些需要基于多字段匹配的特殊场景,虽然使用频率不高,但了解其机制有助于拓宽对 RabbitMQ 路由能力的认知。
2.1 “Hello World!”
-
示意图:
-
P
代表生产者(Producer),负责发送消息;中间的“hello”是队列(Queue),用于存储消息;C
代表消费者(Consumer),负责接收并消费队列中的消息; -
流程:生产者
P
直接发送消息到指定的队列“hello”,不需要借助交换机(Exchange)进行路由等复杂规则处理,消费者C
直接从该队列中获取消息进行消费;
-
-
生产者(Producer)代码
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
:声明队列QUEUE_NAME
:队列的名称;- 第一个
false
:表示队列不持久化(durable
为false
),即 RabbitMQ 服务器重启后,该队列会消失; - 第二个
false
:表示队列不是排他的(exclusive
为false
),即多个连接可以访问该队列; - 第三个
false
:表示队列不自动删除(autoDelete
为false
),即队列在不再被使用时不会自动删除; null
:表示队列没有额外的参数;
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"))
:发送消息到队列- 第一个参数
""
:表示使用默认的交换机(默认交换机是直连交换机,会根据路由键将消息路由到对应的队列); QUEUE_NAME
:路由键,这里因为使用默认交换机且要将消息发送到指定队列,所以路由键设为队列名称;null
:表示消息的属性为null
,即使用默认属性;message.getBytes("UTF-8")
:将消息内容转换为 UTF-8 编码的字节数组,作为消息体发送;
- 第一个参数
-
-
消费者(Consumer)代码
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- 同样是声明队列,参数含义与生产者中声明队列的参数一致;
- 这一步是为了确保:消费者在消费前,队列是存在的(如果队列不存在,会创建该队列。如果已存在,此声明操作无实质影响,只是确认队列的存在)。
2.2 Work Queues(工作队列)
-
Work queues(工作队列)是 RabbitMQ 最基础且常用的工作机制:
- 生产者(Producer,图中
P
)发送消息到队列(Queue),多个消费者(Consumer,图中C₁
、C₂
等)同时从该队列消费消息;
- 生产者(Producer,图中
-
生产者(Producer)代码
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
-
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
:声明队列TASK_QUEUE_NAME
:队列名称;true
:表示队列持久化(durable
为true
),即 RabbitMQ 服务器重启后,该队列依然存在。因为任务通常很重要,不能因消息中间件服务问题而被耽误,所以要设置队列持久化;
-
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
:发送消息到队列TASK_QUEUE_NAME
:路由键,将消息发送到指定队列。MessageProperties.PERSISTENT_TEXT_PLAIN
:设置消息为持久化,确保即使 RabbitMQ 服务器重启,消息也不会丢失(结合队列持久化,能最大程度保证消息不丢失)。message.getBytes("UTF-8")
:将消息内容转换为 UTF - 8 编码的字节数组作为消息体。
-
-
消费者(Consumer)代码
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
-
channel.basicQos(1)
:设置消费者的预取计数(prefetchCount
)为 1。这表示该消费者每次只从队列中预取 1 条消息,在该消息被确认(ack
)之前,不会再从队列中获取新的消息。这样可以实现消息的公平分发,避免某个消费者因处理能力不足而堆积大量消息,而其他消费者却处于空闲状态; -
channel.basicConsume(TASK_QUEUE_NAME, false, consumer)
:开始消费队列中的消息TASK_QUEUE_NAME
:要消费的队列名称;false
:表示关闭自动应答(autoAck
为false
),需要消费者手动向 RabbitMQ 发送应答(ack
),告知消息已被成功消费;consumer
:消费者的回调对象,用于处理接收到的消息;
-
-
注意:
-
Consumer 对每个消息必须应答
- 消费者端每消费完一条消息,需要给服务端发送一个
ack
应答(可以是手动应答,也可以是自动应答,上面代码中是手动应答); - 如果消费者一直没有给服务端应答,服务端会不断地将这条消息重复投递,这会持续消耗系统资源,这种未被正确应答且不断重复投递的消息被称为“Poison Message(毒消息)”,这是使用 RabbitMQ 时很容易犯的错误;
- 消费者端每消费完一条消息,需要给服务端发送一个
-
RabbitMQ 并不完全保证消息安全
- 关键消息不能因为服务出现问题而被忽略。如果想要保证消息不丢失,在 RabbitMQ 中,需要同时将队列和消息的
durable
属性都设置成true
(队列持久化保证队列不丢失,消息持久化保证消息内容不丢失); - 但官方也明确说明,就算把队列和消息都设置为
durable
,RabbitMQ 也并不能保证消息完全不丢失。因为 RabbitMQ 对持久化的消息,会先写入到PageCache
缓存中,而不是直接写入磁盘。缓存中的数据断电就会丢失,只有磁盘中的数据断电才不会丢失。消息从PageCache
写入到磁盘,需要进行一次刷盘操作,RabbitMQ 并不会对每个消息都执行刷盘操作,而是会有一定的间隔。因此,如果出现服务器异常断电,RabbitMQ 在这个层面是可能造成消息丢失的。不过数据刷盘是所有应用程序都要面临的问题,在 RabbitMQ 中可使用后面会介绍的Publisher Confirms
机制来应对这种情况;
- 关键消息不能因为服务出现问题而被忽略。如果想要保证消息不丢失,在 RabbitMQ 中,需要同时将队列和消息的
-
消息在多个 Consumer 之间的分发
- RabbitMQ 默认采用的是“fair dispatch(公平分发)”,也叫“round - robin(轮询)”模式,即把消息在所有消费者中轮流发送。这种方式没有考虑消息处理的复杂度以及消费者的处理能力;
- 改进后的方案是,消费者可以向服务器声明一个
prefetchCount
(通过channel.basicQos(prefetchCount)
方法),表示当前这个消费者最多可以同时处理几条消息(消息已经发送,但未收到消费者的basicAck
)。如果超过了这个消费者节点的能力值,就不再往这个消费者发布消息; - 不过官方也指出这种模式还是存在问题,消息有可能全部阻塞,所有消费者节点都超过了
prefetchCount
值,那消息就阻塞在服务器上,这时需要及时发现问题并采取措施,比如增加消费者节点或者采用其他策略。
-
2.3:Publish/Subscribe(订阅/发布)
-
Publish/Subscribe(发布/订阅)机制是对前面的 Work queues(工作队列)模式的补充,进一步解耦了生产者(Producer)和消费者(Consumer);
- 生产者只负责发送消息到交换机(Exchange),而消息进入哪个队列,由交换机来分配;
- 在这种机制中,使用的是类型为
fanout
的交换机,它会将生产者发送的消息同时转发到所有与它绑定的队列中,然后由不同的消费者从各自绑定的队列中消费消息;
-
示意图:
P
是生产者,X
是类型为fanout
的交换机,amq.gen - RQ6...
和amq.gen - As8...
是两个队列,C₁
、C₂
是两个消费者;- 流程为:生产者
P
发送消息到交换机X
,X
会将消息同时发送到与之绑定的两个队列中,然后消费者C₁
从amq.gen - RQ6...
队列消费消息,消费者C₂
从amq.gen - As8...
队列消费消息;
-
生产者(Producer)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
-
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
:声明交换机EXCHANGE_NAME
:交换机的名称;"fanout"
:指定交换机的类型为fanout
,fanout
类型的交换机的特点是会将接收到的消息广播到所有与之绑定的队列;
-
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"))
:发送消息到交换机EXCHANGE_NAME
:要发送到的交换机名称;- 第二个参数
""
:路由键,对于fanout
类型的交换机,路由键没有实际作用,因为它会将消息发送到所有绑定的队列,所以这里设为空字符串; null
:消息的属性为null
,使用默认属性;message.getBytes("UTF-8")
:将消息内容转换为 UTF - 8 编码的字节数组作为消息体;
-
-
绑定(binding)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");
-
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
:同样是声明类型为fanout
的交换机,确保交换机存在; -
String queueName = channel.queueDeclare().getQueue()
:声明一个临时队列(不指定队列名称,由 RabbitMQ 自动生成队列名称),并获取该队列的名称; -
channel.queueBind(queueName, EXCHANGE_NAME, "")
:将临时队列queueName
绑定到交换机EXCHANGE_NAME
上;queueName
:要绑定的队列名称;EXCHANGE_NAME
:要绑定到的交换机名称;""
:路由键,对于fanout
类型的交换机,路由键无实际意义,设为空字符串即可;
-
-
关键之处在于类型为
fanout
的交换机,这种类型的交换机只负责往所有已绑定的队列上发送消息,不管队列的路由键等其他因素,实现了消息的广播式分发,从而支持发布/订阅模式,让多个消费者可以同时接收到同一份消息的副本。
2.4 Routing(基于内容的路由)
-
Routing(基于内容的路由)机制使用类型为
direct
的交换机;- 与 Publish/Subscribe(发布/订阅)模式中
fanout
交换机广播消息到所有绑定队列不同,direct
交换机通过路由键(routingKey
)来决定消息发送到哪些队列; - 它会根据消息的
routingKey
,将不同类别的消息分发到不同的队列,从而实现更精准的消息路由;
- 与 Publish/Subscribe(发布/订阅)模式中
-
示意图:
-
P
是生产者,direct
是类型为direct
的交换机,amq.gen - S9b...
和amq.gen - Ag1...
是两个队列,C₁
、C₂
是两个消费者; -
生产者发送消息时指定不同的
routingKey
(如error
、info
、warn
),direct
交换机根据routingKey
将消息路由到对应的队列:-
带有
error
路由键的消息会被发送到amq.gen - S9b...
队列,然后由消费者C₁
消费; -
带有
info
、warn
、error
路由键的消息会被发送到amq.gen - Ag1...
队列,然后由消费者C₂
消费;
-
-
-
生产者(Producer)
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
:声明类型为direct
的交换机EXCHANGE_NAME
:交换机名称;"direct"
:指定交换机类型为direct
;
-
绑定(Bindings)代码
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueBind(queueName, EXCHANGE_NAME, routingKey1); channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); channel.basicConsume(queueName, true, consumer);
-
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
:声明类型为direct
的交换机,确保交换机存在; -
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1)
和channel.queueBind(queueName, EXCHANGE_NAME, routingKey2)
:- 将队列
queueName
与交换机EXCHANGE_NAME
进行绑定,同时指定路由键routingKey1
和routingKey2
; - 这表示该队列会接收来自交换机
EXCHANGE_NAME
且路由键为routingKey1
或routingKey2
的消息;
- 将队列
-
channel.basicConsume(queueName, true, consumer)
:开始从队列queueName
消费消息,true
表示开启自动应答(autoAck
为true
),消息被消费者接收后会自动向 RabbitMQ 发送应答,告知消息已被成功消费;
-
-
在 Routing 机制中,关键是
direct
类型的交换机以及路由键routingKey
:生产者发送消息时指定routingKey
,交换机根据routingKey
将消息转发到对应的队列,然后由消费者从队列中消费消息,实现了基于内容(通过routingKey
体现)的精准路由,相比fanout
交换机的广播式分发,更加灵活和有针对性。
2.5 Topics(基于话题的路由)
-
Topics(基于话题的路由)机制使用类型为
topic
的交换机,它是在 Routing(基于内容的路由)模式的基础上,对路由键(routingKey
)进行了模糊匹配,从而实现更灵活的消息路由; -
模糊匹配规则。在
topic
类型的交换机中,路由键是由多个单词组成的,单词之间用.
隔开。其中:-
*
:代表一个具体的单词; -
#
:代表 0 个或多个单词;
-
-
示意图:
-
P
是生产者,X
是类型为topic
的交换机,Q₁
、Q₂
是两个队列,C₁
、C₂
是两个消费者; -
队列
Q₁
绑定的路由键是*.orange.*
,表示会接收路由键为类似a.orange.b
这样格式(中间单词为orange
,前后各有一个单词)的消息; -
队列
Q₂
绑定的路由键是*.*.rabbit
和lazy.#
:*.*.rabbit
表示会接收路由键为类似a.b.rabbit
这样格式(第三个单词为rabbit
,前两个为任意单个单词)的消息;lazy.#
表示会接收路由键以lazy.
开头,后面跟着 0 个或多个单词的消息,比如lazy
、lazy.a
、lazy.a.b
等;
-
生产者发送消息时指定不同的路由键,
topic
交换机根据模糊匹配规则将消息路由到对应的队列,然后由消费者从队列中消费消息;
-
-
生产者(Producer)
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
channel.exchangeDeclare(EXCHANGE_NAME, "topic")
:声明类型为topic
的交换机EXCHANGE_NAME
:交换机名称;"topic"
:指定交换机类型为topic
;
-
绑定(Bindings)
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(queueName, EXCHANGE_NAME, routingKey1); channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); channel.basicConsume(queueName, true, consumer);
-
channel.exchangeDeclare(EXCHANGE_NAME, "topic")
:声明类型为topic
的交换机,确保交换机存在; -
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1)
和channel.queueBind(queueName, EXCHANGE_NAME, routingKey2)
:- 将队列
queueName
与交换机EXCHANGE_NAME
进行绑定,同时指定路由键routingKey1
和routingKey2
; - 这些路由键需要符合模糊匹配规则,用于接收对应模式的消息;
- 将队列
-
channel.basicConsume(queueName, true, consumer)
:开始从队列queueName
消费消息,true
表示开启自动应答(autoAck
为true
),消息被消费者接收后会自动向 RabbitMQ 发送应答,告知消息已被成功消费;
-
-
Topics 机制的关键在于
topic
类型的交换机以及其对路由键的模糊匹配规则(*
匹配单个单词,#
匹配 0 个或多个单词)。生产者发送消息时携带符合规则的路由键,交换机根据路由键的模糊匹配结果,将消息转发到对应的队列,消费者再从队列中消费消息,实现了更灵活、更具扩展性的消息路由,适用于需要基于多个维度或模糊条件进行消息分类和分发的场景。
2.6 Publisher Confirms(发送者消息确认)
-
RabbitMQ 以往的机制能保证消息发送到 MQ 后推送给消费者且不丢失,但生产者发送消息是否成功没有保障。因为生产者发送消息的基础 API(
Producer.basicPublish
方法)没有返回值,应用无法知道一次消息发送是否成功,这在业务上易造成消息丢失。Publisher Confirms 机制就是为生产者提供确认机制,保证消息发送过程成功; -
发送者确认模式默认不开启,若需要开启,需手动在
Channel
中声明,使用下面这个方法开启:channel.confirmSelect()
-
三种确认策略
-
发布单条消息:发布一条消息就确认一条消息;
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000); }
-
channel.waitForConfirmsOrDie(5_000)
方法会在Channel
端等待 RabbitMQ 给出响应,表明消息已正确发送到 RabbitMQ 服务端; -
注意:该方法会同步阻塞
Channel
,在等待确认期间,Channel
不能继续发送消息,会明显降低集群的发送速度(吞吐量)。官方说明Channel
底层是异步工作的,会阻塞Channel
然后异步等待服务端发送确认消息才解除阻塞,但使用时可当作同步工具。如果到了超时时间(这里是 5000 毫秒)还没收到服务端确认,就会抛出异常。通常处理该异常的方式是记录错误日志或者尝试重发消息,但重发时要注意不要让程序陷入死循环;
-
-
发送批量消息:为缓解单条确认对吞吐量的影响,采取发送一批消息后一起确认的方式;
int batchSize = 100; int outstandingMessageCount = 0; long start = System.nanoTime(); for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;} } if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(2_000); }
- 优点与问题:这种方式可以稍微缓解发送者确认模式对吞吐量的影响,但存在缺陷,当确认出现异常时,发送者只能知道这一批消息出问题了,无法确认具体是哪一条消息出了问题,所以需要额外机制对每一条发送出的消息进行处理;
-
异步确认消息:生产者在
Channel
中注册监听器来对消息进行确认;channel.addConfirmListener(confirmCallback var1, ConfirmCallback var2);
- 监听器说明:需要注册两个
ConfirmCallback
,一个用于处理成功的情况,一个用于处理失败的情况。ConfirmCallback
是个监听接口,里面只有一个方法void handle(long sequenceNumber, boolean multiple) throws IOException;
,其中:sequenceNumber
:是唯一的序列号,代表唯一的消息。在 RabbitMQ 中,消息本身是二进制数组,默认没有序列号,Producer
通过int sequenceNumber = channel.getNextPublishSeqNo();
生成全局递增的序列号,分配给新发送的第一条消息,应用程序需要自己将序列号与消息对应起来;multiple
:是布尔类型参数,如果为false
,表示这次只确认了当前一条消息;如果为true
,表示 RabbitMQ 这次确认了一批消息,在sequenceNumber
之前的所有消息都已确认完成;
- 监听器说明:需要注册两个
-
-
这三种确认机制都能提升
Producer
发送消息的安全性,通常情况下,第三种异步确认机制的性能最好。实际上,在当前版本中,Publisher
不仅可以确认消息是否到了Exchange
,还可以确认消息是否从Exchange
成功路由到Queue
;- 在
Channel
中可以添加一个ReturnListener
,这个ReturnListener
会监控到一部分发送成功了,但无法被Consumer
消费到的消息; - 对于这些无法被路由的消息,还可以在
Exchange
中添加alternate-exchange
属性,指向另一个Exchange
,将这些消息转到另外的Exchange
进行兜底处理。
- 在
2.7 Headers(头部路由机制)
-
Headers
路由是一种忽略routingKey
的路由方式,通过消息的headers
(键值对)进行路由。在实际中使用较少,但在某些特殊业务场景下很有用。与direct
、fanout
、topic
等以routingKey
为关键进行路由的交换机不同,Headers
类型的交换机不依赖routingKey
,而是依据消息和队列绑定中定义的headers
键值对匹配来路由消息; -
Headers
路由的匹配方式有两种:-
all
:表示需要所有的键值对都满足才会匹配成功,消息才会被路由到对应的队列; -
any
:表示只要满足其中一个键值对就可以匹配成功;
-
-
消费者端(Consumer)
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-match", "any"); // x-match:特定的参数。all表示必须全部匹配才算成功。any表示只要匹配一个就算成功。 headers.put("loglevel", "info"); headers.put("buslevel", "product"); headers.put("syslevel", "admin");Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); String queueName = channel.queueDeclare("ReceiverHeader", true, false, false, null).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, routingKey, headers);
-
首先创建
headers
键值对,设置x-match
为any
,表示只要headers
中loglevel
、buslevel
、syslevel
有一个与生产者发送消息的headers
对应键值匹配,消息就会被路由到该队列; -
然后通过
RabbitMQUtil
获取连接和创建通道,声明HEADERS
类型的交换机,声明队列并将队列与交换机绑定,绑定同时指定了headers
用于路由匹配;
-
-
生产者端(Producer,
EmitLogHeader
类)public class EmitLogHeader {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception{// 虽然Header Exchange不依赖routing key进行路由,但API仍需要提供routing key参数// 这里可以将其用于传递其他业务信息,消费者仍然可以接收到这个值String routingKey = "ourTestRoutingKey";// 创建消息头信息的Map,这些头信息将用于消息路由的匹配Map<String, Object> headers = new HashMap<>();headers.put("loglevel", "error");headers.put("buslevel", "product"); // 匹配成功headers.put("syslevel", "admin"); // 匹配成功// 要发送的消息内容String message = "LOG INFO asdfasdf";// 获取RabbitMQ连接和通道Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();// 声明一个Header类型的交换机// Header Exchange根据消息的header属性进行路由匹配,而不是routing keychannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);// 构建消息属性,设置持久化等参数AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 设置消息为持久化(服务器重启后不会丢失)builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());// 设置消息优先级builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());// 添加自定义header信息,这些信息将用于路由决策builder.headers(headers);// 发布消息到交换机// 虽然提供了routingKey,但Header Exchange会忽略它,只使用header信息进行路由channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));// 关闭通道和连接channel.close();connection.close();} }
-
Headers
交换机的性能相对比较低,因此官方并不建议大规模使用这种交换机,也没有把它列入基础的示例当中。
3 SpringBoot集成RabbitMQ
-
引入依赖:SpringBoot 官方集成了 RabbitMQ,只需引入下面代码中的这个核心 Maven 依赖即可使用 RabbitMQ。不过要特别注意,不同版本的 SpringBoot,后续的配置方式可能会有变化;
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置关键参数:基础的运行环境参数以及生产者的一些默认属性配置都集中在
application.properties
配置文件中,所有配置都以spring.rabbitmq
开头;- 若想了解详细的配置信息,可以参考
RabbitProperties
类的源码,其中有各个字段的简单说明; - 如果需要更详细的配置资料,可前往 GitHub 仓库(GitHub - spring-projects/spring-amqp: Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ)查看;
- 若想了解详细的配置信息,可以参考
-
声明 Exchange、Queue 和 Binding:所有的交换机(Exchange)、队列(Queue)、绑定(Binding)的配置,都需要以对象的方式声明;
- 默认情况下,这些业务对象一经声明,应用就会自动到 RabbitMQ 上创建对应的业务对象,同时也可以配置成绑定已有的业务对象;
- 业务对象的具体声明方式可参考示例工程,详细的属性声明同样可查看 GitHub 仓库;
-
使用 RabbitmqTemplate 对象发送消息:生产者的所有属性已经在
application.properties
配置文件中进行配置,项目启动时,会在 Spring 容器中初始化一个RabbitmqTemplate
对象,之后所有的发送消息操作都通过这个对象来进行; -
使用
@RabbitListener
注解声明消费者:消费者都是通过@RabbitListener
注解来声明的;- 在
@RabbitMQListener
注解中包含了很多对队列进行定制的属性,大部分属性都有默认值,例如ackMode
默认是null
,表示自动应答; - 在日常开发过程中,通常会简化业务模型,消费者只需要绑定队列消费即可;
- 在
-
使用 SpringBoot 框架集成 RabbitMQ 后,开发过程能得到很大简化,使用起来并不难,对照示例就能很快上手。但需要理解的是,SpringBoot 集成后的 RabbitMQ 很多概念虽然能和原生 API 对应上,但这些模型中间都做了转换(比如
Message
就不是原生 RabbitMQ 中的消息了)。所以使用 SpringBoot 框架时,尤其需要加深对 RabbitMQ 原生 API 的理解,这样才能深入理解各种看似简单但实际有很多细节的对象声明方式,做到以不变应万变。