RabbitMQ工作模式
上篇文章:
RabbitMQ工作流程https://blog.csdn.net/sniper_fandc/article/details/149310780?fromshare=blogdetail&sharetype=blogdetail&sharerId=149310780&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link
目录
1 基本概念
1.1 关于RoutingKey和Binding Key
1.2 交换机类型
2 Simple(简单模式)
2.1 模式介绍
2.2 使用案例
3 Work Queue(工作队列)
3.1 模式介绍
3.2 使用案例
4 Publish/Subscribe(发布/订阅模式)
4.1 模式介绍
4.2 使用案例
5 Routing(路由模式)
5.1 模式介绍
5.2 使用案例
6 Topics(通配符模式)
6.1 模式介绍
6.2 使用案例
7 RPC(RPC模式)
7.1 模式介绍
7.2 使用案例
8 Publisher Confirms(发布确认模式)
8.1 模式介绍
8.2 使用案例
9 SpringBoot实现RabbitMQ常用的四种工作模式(工作队列、发布订阅、路由、通配符)
9.1 引入依赖
9.2 配置文件
9.3 具体实现
9.3.1 Work Queue(工作队列)
9.3.2 Publish/Subscribe(发布订阅)
9.3.3 Routing(路由)
9.3.4 Topics(通配符)
9.3.5 消息内容为对象
10 RabbitMQ的消息传递模式
1 基本概念
1.1 关于RoutingKey和Binding Key
在介绍工作模式前,首先介绍两个概念:
RoutingKey:路由键,由生产者生产消息时指定的字符串,交换机会根据路由键来匹配绑定键,匹配成功则将该消息路由到绑定的队列。
Binding Key:绑定键,RabbitMQ中交换机和队列的绑定关系。
上述两个键实际上决定了两种工作模式:路由模式和通配符模式。由于路由键通常和绑定键进行匹配,大多数情况两个值是一样的(除了通配符模式外)因此RabbitMQ官方有时也会将这两个键混淆为RoutingKey,遇到这个名字时需要根据上下文分辨出说的是哪种键。
1.2 交换机类型
RabbitMQ的工作模式主要是根据交换机类型来划分的。交换机类型由AMQP协议定义,有6种:Fanout广播、Direct定向、Topic通配符、headers类型、System和自定义,其中RabbitMQ用到了前4种:
Fanout广播:交换机将消息分发给所有绑定的队列(对应发布订阅模式)。
Direct定向:交换机根据RoutingKey将消息分发给与Binding Key一样的队列(对应路由模式)。
Topic通配符:交换机根据RoutingKey将消息分发给与Binding Key(此时该值可以为通配符)匹配的队列(对应通配符模式)。
headers类型:不依赖RoutingKey来进行匹配,而是根据消息中的headers属性匹配队列,性能很差,实际中几乎不用。
2 Simple(简单模式)
2.1 模式介绍
P是生产者Producer,C是消费者Consumer。
特点:一个生产者,一个消费者,消息只被消费一次。又称为点对点模式。
适用场景:消息只被单个消费者处理的场景。
2.2 使用案例
简单模式在RabbitMQ介绍已经用代码演示过了这里不再演示。
3 Work Queue(工作队列)
3.1 模式介绍
特点:一个生产者,多个消费者,消息只被消费一次,多个消费者共同处理这些消息。
适用场景:集群环境中做异步处理。比如订票系统,订票成功消息发送到RabbitMQ中,由多个短信服务共同处理这些消息。
注意:简单模式和工作队列模式在RabbitMQ中并不是没有用到交换机,而是对于交换机的功能仅仅就是简单转发而已。
3.2 使用案例
一个生产者,两个消费者,实际代码和简单模式一样,只是启动了两个消费者而已。需要注意的是必须先启动消费者再启动生产者,否则先启动生产者再启动第一个消费者后,第二个消费者还未来得及启动第一个消费者就将消息全部消费了。
// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 通过channel发送消息到队列中for (int i = 0; i < 10; i++) {String msg = "Hello RabbitMQ" + i;//使用的是内置交换机,使用内置交换机时,routingKey要和队列名称⼀样,才可以路由到对应的队列上去channel.basicPublish("", RabbitMQConnection.QUEUE, null, msg.getBytes());}//7.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.QUEUE, true, consumer);//等待回调函数执行完毕之后(防止消息没有接收完毕就立即执行关闭),再关闭资源// TimeUnit.SECONDS.sleep(5);//7.释放资源,消费者相当于是一个监听程序,不需要关闭资源//注意:顺序不可改变(要么只关闭连接,通道也会关闭;但是如果先关闭连接,通道已经关闭了再调用通道就报错)// channel.close();// connection.close();}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.QUEUE, true, consumer);//等待回调函数执行完毕之后(防止消息没有接收完毕就立即执行关闭),再关闭资源// TimeUnit.SECONDS.sleep(5);//7.释放资源,消费者相当于是一个监听程序,不需要关闭资源//注意:顺序不可改变(要么只关闭连接,通道也会关闭;但是如果先关闭连接,通道已经关闭了再调用通道就报错)// channel.close();// connection.close();}}
4 Publish/Subscribe(发布/订阅模式)
4.1 模式介绍
特点:一个生产者,多个消费者,Fanout广播类型交换机,交换机将每条消息均复制多份广播给所有绑定的队列,每个消费者收到的消息都一样。
适用场景:消息需要被所有消费者都接收,比如实时通知或广播消息。还是订单系统,订单下单成功后,消息会被广播给不同的服务:物流服务、用户通知服务、店铺服务、支付服务等等。
4.2 使用案例
一个生产者,两个消费者,第一个消费者订阅队列1,第二消费者订阅队列2:
// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、广播类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)//RoutingKey为""表示交换机收到任何消息都会路由给队列channel.queueBind(RabbitMQConnection.FANOUT_QUEUE1, RabbitMQConnection.FANOUT_EXCHANGE, "");channel.queueBind(RabbitMQConnection.FANOUT_QUEUE2, RabbitMQConnection.FANOUT_EXCHANGE, "");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg = "Hello RabbitMQ";channel.basicPublish(RabbitMQConnection.FANOUT_EXCHANGE, "", null, msg.getBytes());//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.FANOUT_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.FANOUT_QUEUE2, true, consumer);}}
在管理界面可以查看交换机和队列的绑定关系:
5 Routing(路由模式)
5.1 模式介绍
特点:一个生产者,多个消费者,Direct定向类型交换机,交换机根据路由键RoutingKey匹配Binding Key,把消息路由给指定队列。
假设RoutingKey=a,则消息会被路由给Q1和Q2;假设RoutingKey=b,则消息只会被路由给Q2。
适用场景:根据特定规则分发消息的场景。比如日志持久化场景,日志有不同级别Error、Warning、Info、Debug等等,根据不同级别的日志分发到不同队列,再由不同文件服务持久化到不同的文件中。
5.2 使用案例
按照上图来构建绑定关系:
// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、定向类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)channel.queueBind(RabbitMQConnection.DIRECT_QUEUE1, RabbitMQConnection.DIRECT_EXCHANGE, "a");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "a");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "b");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "c");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg_a = "Hello RabbitMQ RoutingKey = a";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "Hello RabbitMQ RoutingKey = b";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "Hello RabbitMQ RoutingKey = c";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.DIRECT_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.DIRECT_QUEUE2, true, consumer);}}
交换机和队列绑定关系如下:
6 Topics(通配符模式)
6.1 模式介绍
特点:一个生产者,多个消费者,Topics通配符类型交换机,交换机根据路由键RoutingKey匹配满足要求的Binding Key,把消息路由给所有满足的队列。
通配符:*表示匹配一个单词(单词可以有多个字符),.来进行分隔,#表示匹配多个单词。
假设RoutingKey=a.a.a,则消息会被路由给Q1;假设RoutingKey=a.a.b,则消息会被路由给Q2。
适用场景:灵活匹配和过滤消息的场景。
6.2 使用案例
按照上图构建交换机和队列绑定关系:
// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2// 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange"; //通配符交换机public static final String TOPIC_QUEUE1 = "topic.queue1"; //通配符交换机绑定队列1public static final String TOPIC_QUEUE2 = "topic.queue2"; //通配符交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、定向类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)channel.queueBind(RabbitMQConnection.TOPIC_QUEUE1, RabbitMQConnection.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(RabbitMQConnection.TOPIC_QUEUE2, RabbitMQConnection.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(RabbitMQConnection.TOPIC_QUEUE2, RabbitMQConnection.TOPIC_EXCHANGE, "c.#");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg_a = "Hello RabbitMQ RoutingKey = apple.a.dog";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "apple.a.dog", null, msg_a.getBytes());//转发到Q1String msg_b = "Hello RabbitMQ RoutingKey = apple.a.b";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "apple.a.b", null, msg_b.getBytes());//转发到Q1和Q2String msg_c = "Hello RabbitMQ RoutingKey = c.apple.dog";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "c.apple.dog", null, msg_c.getBytes());//转发到Q2//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.TOPIC_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.TOPIC_QUEUE2, true, consumer);}}
交换机和队列绑定关系如下:
7 RPC(RPC模式)
7.1 模式介绍
特点:实现远程通信,没有生产者和消费者,只有客户端Client和服务端Server,由RabbitMQ作为消息中间件提供两个队列RPC和Reply实现请求和响应的通信。request携带reply_to(指定响应应该放到哪个队列)和correlation_id(请求和响应关联id,指明队列中响应是对哪个请求回复的)两个属性,而reply携带correlation_id属性来表示该响应关联的是哪个请求。
理解:RPC模式实际上类似HTTP协议,但是好处是通过在应用层工作,即使不太了解网络协议,也可以实现客户端和服务器的远程通信(并且这种方式是异步的)。
适用场景:远程通信的高可用性和流量监控的场景。
7.2 使用案例
当发送请求的时候,Client相当于生产者,Server相当于消费者;当返回响应时,Server相当于生产者,Client相当于消费者。
// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2// 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange"; //通配符交换机public static final String TOPIC_QUEUE1 = "topic.queue1"; //通配符交换机绑定队列1public static final String TOPIC_QUEUE2 = "topic.queue2"; //通配符交换机绑定队列2// RPC模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; //请求队列public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue"; //响应队列}
//客户端public class RpcClient {public static void main(String[] args) throws Exception{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.不声明交换机(使用默认的)//6.声明队列channel.queueDeclare(RabbitMQConnection.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(RabbitMQConnection.RPC_RESPONSE_QUEUE, true, false, false, null);//7.发送请求:通过channel发送消息到请求队列中(交换机、RoutingKey(这里指明发送的队列)、属性、消息内容)String msg = "Hello Server!I am Client";//使用建造者模式创建BasicProperties对象//correlationId()设置消息的唯一ID(用UUID生成)String correlationID = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(RabbitMQConnection.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", RabbitMQConnection.RPC_REQUEST_QUEUE, properties, msg.getBytes());System.out.println("消息发送成功");//8.接收响应//使用阻塞队列,保证同步关系,防止还未接收到响应代码就执行结束final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String responseMsg = new String(body);System.out.println("接收到的响应是:" + responseMsg);if(correlationID.equals(properties.getCorrelationId())){//如果correlationID值一样,则把该消息加入到阻塞队列中blockingQueue.offer(responseMsg);}}};channel.basicConsume(RabbitMQConnection.RPC_RESPONSE_QUEUE,true, consumer);String result = blockingQueue.take();System.out.println("[RPC Client 接收响应:]" + result);}}
//服务器public class RpcServer {public static void main(String[] args) throws Exception{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列channel.queueDeclare(RabbitMQConnection.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(RabbitMQConnection.RPC_RESPONSE_QUEUE, true, false, false, null);//6.接收响应//设置同时最多只能获取一条消息(否则队列中消息很多分不清哪个是自己需要的)channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("[RPC Server 接收请求:]" + request);//7.发送响应String response = "Hi Client!Welcome to Server";//需要在响应中添加correlationIdAMQP.BasicProperties basicProperties= new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), basicProperties, response.getBytes());//消费者接收到消息后可以自动确认和手动确认,如果选择不自动确认,需要使用basicAck()手动确认//手动确认会生成确认的唯一ID,envelope中获取,basicAck(确认唯一ID,是否批量确认)channel.basicAck(envelope.getDeliveryTag(), false);}};//(订阅队列,是否自动确认,处理行为)channel.basicConsume(RabbitMQConnection.RPC_REQUEST_QUEUE, false, consumer);}}
8 Publisher Confirms(发布确认模式)
8.1 模式介绍
消息丢失三种情况:
1.生产者发送给RabbitMQ过程中丢了;(发布确认模式解决)
2.RabbitMQ因为自身内部原因导致消息丢失;(RabbitMQ持久化)
3.RabbitMQ发送给消费者,由于消费者没有处理好消息而RabbitMQ内部已经将处理失败的消息丢失。(手动确认basicAck())
特点:RabbitMQ提供的确保消息可靠传输到RabbitMQ服务器的机制(情况(1)),生产者(通过channel.confirmSelect()设置confirm模式,发送的每条消息都有id)发送消息给RabbitMQ,RabbitMQ收到后会异步地给生产者返回ACK(包含消息id)。
策略:
1.Publishing Messages Individually(单独确认):生产者每发送一条消息都等待RabbitMQ的一次确认后再发送消息;
2.Publishing Messages in Batches(批量确认):生产者每发送一批消息,等待这批消息的确认;
3.Handling Publisher Confirms Asynchronously(异步确认):提供回调方法,RabbitMQ发送一条或一批消息的确认后由回调方法通知生产者进行处理。
适用场景:对数据安全性要求较高的场景。
注意:生产者与RabbitMQ通过TCP建立连接,TCP就有ACK机制,为什么还需要RabbitMQ的发布确认模式?TCP的ACK机制工作在传输层,只能确认数据是否被传输到接收缓冲区,但是从接收缓冲区到交换机再到队列(如果有持久化还需要持久化到硬盘),这段处理消息也可能会传输也可能失败或丢失,这是应用层范围,TCP协议无感知。而发布确认模式工作在应用层,其设计目的就是保证从消息发送到消息入队列、持久化到硬盘的成功或失败是可以被感知的,从而确保消息可靠传输。
8.2 使用案例
public class Confirm {public static final Integer MESSAGE_COUNT = 10000;public static Connection getConnection() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接Connectionreturn factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//(1)Publishing Messages Individually(单独确认)publishingMessagesIndividually();//(2)Publishing Messages in Batches(批量确认)publishingMessagesInBatches();//(3)Handling Publisher Confirms Asynchronously(异步确认)handlingPublisherConfirmsAsynchronously();}
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4.添加确认监听器//用有序集合存储还未确认的消息IDSortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {if(b){//如果是批量确认,获取确认ID并清除集合中ID值以前的所有ID//headSet()返回小于参数的集合confirmSeqNo.headSet(l+1).clear();}else{//如果不是批量确认,则清除集合中ID值confirmSeqNo.remove(l);}}@Overridepublic void handleNack(long l, boolean b) throws IOException {if(b){//如果是批量确认,获取确认ID并清除集合中ID值以前的所有ID//headSet()返回小于参数的集合confirmSeqNo.headSet(l+1).clear();}else{//如果不是批量确认,则清除集合中ID值confirmSeqNo.remove(l);}//对于Nack(RabbitMQ内部问题导致消息丢失),通常需要进行消息重发,具体什么策略看业务}});//5.发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;//获取消息IDlong seqNo = channel.getNextPublishSeqNo();channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());//发送后把消息ID放入有序集合等待异步确认confirmSeqNo.add(seqNo);}//异步等待确认,直到所有的ID都确认完毕while(!confirmSeqNo.isEmpty()){//减少CPU空转浪费Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4.发送消息long start = System.currentTimeMillis();int batch = 200;int count = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());count++;//每200条进行一次确认if(count == batch){//等待确认(超时时间5s)channel.waitForConfirmsOrDie(5000);count = 0;}}//确认剩余还未确认的消息if(count > 0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4.发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认(超时时间5s)channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}}
可以发现,消息量很大的情况下,异步确认最快,单独确认最慢。这是由于单独确认每发一条消息,生产者就要阻塞等待RabbitMQ服务器返回的确认,浪费时间。而批量确认阻塞等待的次数少了,用时也就减少了。异步确认最大化的利用CPU和时间等资源,让生产者可以同时进行消息发送和确认等待的过程。
9 SpringBoot实现RabbitMQ常用的四种工作模式(工作队列、发布订阅、路由、通配符)
9.1 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
9.2 配置文件
#配置方式1,配置RabbitMQ的基本信息#spring:# rabbitmq:# host: 192.168.217.150# port: 5672# username: admin# password: admin# virtual-host: testVirtual#配置方式2,格式:amqp://username:password@Ip:port/virtual-hostspring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtual
9.3 具体实现
9.3.1 Work Queue(工作队列)
相关配置的通用代码(队列名称、声明队列),在SpringBoot中,队列声明均是以对象形式(Bean)存在,这样方便Spring进行管理:
public class RabbitMQConnection {public static final String WORK_QUEUE = "work_queue";}
@Configurationpublic class RabbitMQConfig {@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(RabbitMQConnection.WORK_QUEUE).build();}}
生产者代码:
@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("work")public String work(){//内置交换机RoutingKey和队列名称一致rabbitTemplate.convertAndSend("", RabbitMQConnection.WORK_QUEUE,"Hello SpringBoot RabbitMQ");return "发送成功";}}
@RabbitListener注解注释到方法就表示消费者(也可以注释到类)。注解需要带上队列名作为参数,表示消费者订阅的队列。
该注解注释的方法可以填写三种类型的参数:
Message(org.springframework.amqp.core.Message):消息内容+消息属性(消息ID、队列信息等等)。
String:消息内容。
Channel(com.rabbitmq.client.Channel):RabbitMQ通道对象,可以用于更高级操作比如手动确认消息。
消费者代码(实际中一个消费者写一个类,这里方便演示才写到一起):
@Componentpublic class WorkListener {//消费者1@RabbitListener(queues = RabbitMQConnection.WORK_QUEUE)public void queueListener1(Message message){System.out.println("listener 1["+RabbitMQConnection.WORK_QUEUE+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.WORK_QUEUE)public void queueListener2(Message message){System.out.println("listener 2["+RabbitMQConnection.WORK_QUEUE+"]收到消息:" + message);}}
具体的Message信息如下:listener 1[work_queue]收到消息:(Body:'Hello SpringBoot RabbitMQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work_queue, deliveryTag=1, consumerTag=amq.ctag-NdS51xpxo5Z5D-r6pWwYvA, consumerQueue=work_queue])
deliveryTag表示消息ID,一个通道channel的消息ID是递增的,不同通道的消息ID独立。
9.3.2 Publish/Subscribe(发布订阅)
相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):
public class RabbitMQConnection {public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";public static final String FANOUT_EXCHANGE = "fanout.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(RabbitMQConnection.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(RabbitMQConnection.FANOUT_QUEUE1).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(RabbitMQConnection.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueueBinding1")public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("fanoutQueueBinding2")public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}}
在绑定队列和交换机的时候,方法形参是队列名和交换机名,而Spring加载Bean的时候会自动注入参数,按照形参名查找Bean的名称,因此如果不加@Qualifier注解指定形参对应的Bean,Spring就会找不到Bean而报错。
生产者代码:
@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("fanout")public String fanout(){rabbitTemplate.convertAndSend(RabbitMQConnection.FANOUT_EXCHANGE, "","Hello SpringBoot RabbitMQ");return "发送成功";}}
消费者代码:
@Componentpublic class FanoutListener {//消费者1@RabbitListener(queues = RabbitMQConnection.FANOUT_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.FANOUT_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.FANOUT_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.FANOUT_QUEUE2+"]收到消息:" + message);}}
9.3.3 Routing(路由)
相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):
public class RabbitMQConnection {public static final String ROUTING_QUEUE1 = "routing.queue1";public static final String ROUTING_QUEUE2 = "routing.queue2";public static final String DIRECT_EXCHANGE = "direct.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("routingQueue1")public Queue routingQueue1(){return QueueBuilder.durable(RabbitMQConnection.ROUTING_QUEUE1).build();}@Bean("routingQueue2")public Queue routingQueue2(){return QueueBuilder.durable(RabbitMQConnection.ROUTING_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.DIRECT_EXCHANGE).durable(true).build();}@Bean("routingQueueBinding1")public Binding routingQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("apple");}@Bean("routingQueueBinding2")public Binding routingQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("dog");}@Bean("routingQueueBinding3")public Binding routingQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("apple");}}
生产者代码如下:
@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("routing")public String routing(String routingKey){rabbitTemplate.convertAndSend(RabbitMQConnection.DIRECT_EXCHANGE, routingKey,"Hello SpringBoot RabbitMQ routingKey="+routingKey);return "发送成功";}}
消费者代码如下:
@Componentpublic class RoutingListener {//消费者1@RabbitListener(queues = RabbitMQConnection.ROUTING_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.ROUTING_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.ROUTING_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.ROUTING_QUEUE2+"]收到消息:" + message);}}
依次发送routingKey为apple和dog的两条消息,结果如下:
9.3.4 Topics(通配符)
相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):
public class RabbitMQConnection {public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";public static final String TOPIC_EXCHANGE = "topic.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(RabbitMQConnection.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(RabbitMQConnection.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(RabbitMQConnection.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.a.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.b");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("c.#");}}
生产者代码:
@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("topic")public String topic(String routingKey){rabbitTemplate.convertAndSend(RabbitMQConnection.TOPIC_EXCHANGE, routingKey,"Hello SpringBoot RabbitMQ routingKey="+routingKey);return "发送成功";}}
消费者代码:
@Componentpublic class TopicListener {//消费者1@RabbitListener(queues = RabbitMQConnection.TOPIC_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.TOPIC_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.TOPIC_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.TOPIC_QUEUE2+"]收到消息:" + message);}}
依次发送routingKey为apple.a.dog、apple.a.b和c.apple.dog三条消息,结果如下:
注意:如果把@RabbitListener注解放到类上,那类中可能存在多种消息类型的消费者方法(比如Message、String甚至可能是对象),此时@RabbitListener就不知道需要哪种方法。此时需要在方法上添加@RabbitHandle注解,这样当监听的队列有消息时,就可以根据消息类型来匹配对应的处理方法。
9.3.5 消息内容为对象
当消息内容为对象时,由于RabbitMQ只能接受序列化的内容,因此需要对对象进行序列化再传输。RabbitMQ推荐JSON格式,使用Jackson2JsonMessageConverter(spring的amqp包提供)作为JSON序列化的转化器,把它设置到RabbitTemplate中,此时传输的对象就会自动进行JSON序列化。
@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jackson2JsonMessageConverter()); // 设置消息转换器return template;}
如果生产者使用了JSON来序列化对象,那么消费者拿到对象就要使用JSON反序列化才能解析该对象,因此上述代码不仅要放到生产者所在的服务里,也要放到消费者所在的服务里。
10 RabbitMQ的消息传递模式
RabbitMQ支持两种消息传递模式:推模式和拉模式,主要是以推模式工作的。
推模式是指RabbitMQ主动把消息推送给消费者,只要队列有消息,会全部推送给订阅队列的消费者(SpringBoot中使用channel.basicConsume()来订阅队列)。因此推模式适合对数据实时性要求较高的场景。
拉模式是指消费者主动从队列中获取消息并消费(SpringBoot中使用channel.basicGet()来拉取消息),一次拉取一条消息消费完再拉取消息。消费者可以按照自身处理消息的速度来消费,因此拉模式适合流量控制、需要大规模计算资源处理消息的场景。
下篇文章: