当前位置: 首页 > news >正文

RabbitMQ工作模式

上篇文章:

RabbitMQ工作流程https://blog.csdn.net/sniper_fandc/article/details/149310780?fromshare=blogdetail&sharetype=blogdetail&sharerId=149310780&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link

目录

1 基本概念

1.1 关于RoutingKey和Binding Key

1.2 交换机类型

2 Simple(简单模式)

2.1 模式介绍

2.2 使用案例

3 Work Queue(工作队列)

3.1 模式介绍

3.2 使用案例

4 Publish/Subscribe(发布/订阅模式)

4.1 模式介绍

4.2 使用案例

5 Routing(路由模式)

5.1 模式介绍

5.2 使用案例

6 Topics(通配符模式)

6.1 模式介绍

6.2 使用案例

7 RPC(RPC模式)

7.1 模式介绍

7.2 使用案例

8 Publisher Confirms(发布确认模式)

8.1 模式介绍

8.2 使用案例

9 SpringBoot实现RabbitMQ常用的四种工作模式(工作队列、发布订阅、路由、通配符)

9.1 引入依赖

9.2 配置文件

9.3 具体实现

9.3.1 Work Queue(工作队列)

9.3.2 Publish/Subscribe(发布订阅)

9.3.3 Routing(路由)

9.3.4 Topics(通配符)

9.3.5 消息内容为对象

10 RabbitMQ的消息传递模式


1 基本概念

1.1 关于RoutingKey和Binding Key

        在介绍工作模式前,首先介绍两个概念:

        RoutingKey:路由键,由生产者生产消息时指定的字符串,交换机会根据路由键来匹配绑定键,匹配成功则将该消息路由到绑定的队列。

        Binding Key:绑定键,RabbitMQ中交换机和队列的绑定关系

        上述两个键实际上决定了两种工作模式:路由模式和通配符模式。由于路由键通常和绑定键进行匹配,大多数情况两个值是一样的(除了通配符模式外)因此RabbitMQ官方有时也会将这两个键混淆为RoutingKey,遇到这个名字时需要根据上下文分辨出说的是哪种键。

1.2 交换机类型

        RabbitMQ的工作模式主要是根据交换机类型来划分的。交换机类型由AMQP协议定义,有6种:Fanout广播、Direct定向、Topic通配符、headers类型、System和自定义,其中RabbitMQ用到了前4种

        Fanout广播:交换机将消息分发给所有绑定的队列(对应发布订阅模式)。

        Direct定向:交换机根据RoutingKey将消息分发给与Binding Key一样的队列(对应路由模式)。

        Topic通配符:交换机根据RoutingKey将消息分发给与Binding Key(此时该值可以为通配符)匹配的队列(对应通配符模式)。

        headers类型:不依赖RoutingKey来进行匹配,而是根据消息中的headers属性匹配队列,性能很差,实际中几乎不用。

2 Simple(简单模式)

2.1 模式介绍

        P是生产者Producer,C是消费者Consumer。

        特点:一个生产者,一个消费者,消息只被消费一次。又称为点对点模式。

        适用场景:消息只被单个消费者处理的场景。

2.2 使用案例

        简单模式在RabbitMQ介绍已经用代码演示过了这里不再演示。

3 Work Queue(工作队列)

3.1 模式介绍

        特点:一个生产者,多个消费者,消息只被消费一次,多个消费者共同处理这些消息。

        适用场景:集群环境中做异步处理。比如订票系统,订票成功消息发送到RabbitMQ中,由多个短信服务共同处理这些消息。

        注意:简单模式和工作队列模式在RabbitMQ中并不是没有用到交换机,而是对于交换机的功能仅仅就是简单转发而已。

3.2 使用案例

        一个生产者,两个消费者,实际代码和简单模式一样,只是启动了两个消费者而已。需要注意的是必须先启动消费者再启动生产者,否则先启动生产者再启动第一个消费者后,第二个消费者还未来得及启动第一个消费者就将消息全部消费了。

// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 通过channel发送消息到队列中for (int i = 0; i < 10; i++) {String msg = "Hello RabbitMQ" + i;//使用的是内置交换机,使用内置交换机时,routingKey要和队列名称⼀样,才可以路由到对应的队列上去channel.basicPublish("", RabbitMQConnection.QUEUE, null, msg.getBytes());}//7.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.QUEUE, true, consumer);//等待回调函数执行完毕之后(防止消息没有接收完毕就立即执行关闭),再关闭资源//        TimeUnit.SECONDS.sleep(5);//7.释放资源,消费者相当于是一个监听程序,不需要关闭资源//注意:顺序不可改变(要么只关闭连接,通道也会关闭;但是如果先关闭连接,通道已经关闭了再调用通道就报错)//         channel.close();//         connection.close();}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5. 声明队列//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.QUEUE, true, false, false, null);//6. 接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.QUEUE, true, consumer);//等待回调函数执行完毕之后(防止消息没有接收完毕就立即执行关闭),再关闭资源//        TimeUnit.SECONDS.sleep(5);//7.释放资源,消费者相当于是一个监听程序,不需要关闭资源//注意:顺序不可改变(要么只关闭连接,通道也会关闭;但是如果先关闭连接,通道已经关闭了再调用通道就报错)//         channel.close();//         connection.close();}}

4 Publish/Subscribe(发布/订阅模式)

4.1 模式介绍

        特点:一个生产者,多个消费者,Fanout广播类型交换机,交换机将每条消息均复制多份广播给所有绑定的队列,每个消费者收到的消息都一样。

        适用场景:消息需要被所有消费者都接收,比如实时通知或广播消息。还是订单系统,订单下单成功后,消息会被广播给不同的服务:物流服务、用户通知服务、店铺服务、支付服务等等。

4.2 使用案例

        一个生产者,两个消费者,第一个消费者订阅队列1,第二消费者订阅队列2:

// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、广播类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)//RoutingKey为""表示交换机收到任何消息都会路由给队列channel.queueBind(RabbitMQConnection.FANOUT_QUEUE1, RabbitMQConnection.FANOUT_EXCHANGE, "");channel.queueBind(RabbitMQConnection.FANOUT_QUEUE2, RabbitMQConnection.FANOUT_EXCHANGE, "");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg = "Hello RabbitMQ";channel.basicPublish(RabbitMQConnection.FANOUT_EXCHANGE, "", null, msg.getBytes());//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.FANOUT_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.FANOUT_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.FANOUT_QUEUE2, true, consumer);}}

        在管理界面可以查看交换机和队列的绑定关系:

5 Routing(路由模式)

5.1 模式介绍

        特点:一个生产者,多个消费者,Direct定向类型交换机,交换机根据路由键RoutingKey匹配Binding Key,把消息路由给指定队列。

        假设RoutingKey=a,则消息会被路由给Q1和Q2;假设RoutingKey=b,则消息只会被路由给Q2。

        适用场景:根据特定规则分发消息的场景。比如日志持久化场景,日志有不同级别Error、Warning、Info、Debug等等,根据不同级别的日志分发到不同队列,再由不同文件服务持久化到不同的文件中。

5.2 使用案例

        按照上图来构建绑定关系:

// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、定向类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)channel.queueBind(RabbitMQConnection.DIRECT_QUEUE1, RabbitMQConnection.DIRECT_EXCHANGE, "a");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "a");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "b");channel.queueBind(RabbitMQConnection.DIRECT_QUEUE2, RabbitMQConnection.DIRECT_EXCHANGE, "c");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg_a = "Hello RabbitMQ RoutingKey = a";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "Hello RabbitMQ RoutingKey = b";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "Hello RabbitMQ RoutingKey = c";channel.basicPublish(RabbitMQConnection.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.DIRECT_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.DIRECT_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.DIRECT_QUEUE2, true, consumer);}}

        交换机和队列绑定关系如下:

6 Topics(通配符模式)

6.1 模式介绍

        特点:一个生产者,多个消费者,Topics通配符类型交换机,交换机根据路由键RoutingKey匹配满足要求的Binding Key,把消息路由给所有满足的队列。

        通配符:*表示匹配一个单词(单词可以有多个字符),.来进行分隔,#表示匹配多个单词。

        假设RoutingKey=a.a.a,则消息会被路由给Q1;假设RoutingKey=a.a.b,则消息会被路由给Q2。

        适用场景:灵活匹配和过滤消息的场景。

6.2 使用案例

        按照上图构建交换机和队列绑定关系:

// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2// 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange"; //通配符交换机public static final String TOPIC_QUEUE1 = "topic.queue1"; //通配符交换机绑定队列1public static final String TOPIC_QUEUE2 = "topic.queue2"; //通配符交换机绑定队列2}
// 生产者public class RabbitProducer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明交换机(交换机名称、定向类型、是否持久化)channel.exchangeDeclare(RabbitMQConnection.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//6.声明队列channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE2, true, false, false, null);//7.交换机绑定队列(队列、交换机、RoutingKey)channel.queueBind(RabbitMQConnection.TOPIC_QUEUE1, RabbitMQConnection.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(RabbitMQConnection.TOPIC_QUEUE2, RabbitMQConnection.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(RabbitMQConnection.TOPIC_QUEUE2, RabbitMQConnection.TOPIC_EXCHANGE, "c.#");//8. 通过channel发送消息到队列中(交换机、RoutingKey、属性、消息内容)String msg_a = "Hello RabbitMQ RoutingKey = apple.a.dog";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "apple.a.dog", null, msg_a.getBytes());//转发到Q1String msg_b = "Hello RabbitMQ RoutingKey = apple.a.b";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "apple.a.b", null, msg_b.getBytes());//转发到Q1和Q2String msg_c = "Hello RabbitMQ RoutingKey = c.apple.dog";channel.basicPublish(RabbitMQConnection.TOPIC_EXCHANGE, "c.apple.dog", null, msg_c.getBytes());//转发到Q2//9.释放资源System.out.println("消息发送成功");channel.close();connection.close();}}
// 消费者1public class RabbitmqConsumer {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE1, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.TOPIC_QUEUE1, true, consumer);}}
// 消费者2public class RabbitmqConsumer2 {public static void main(String[] args) throws Exception {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列(消费者不需要声明交换机)//如果没有队列,会自动创建,如果有,则不创建channel.queueDeclare(RabbitMQConnection.TOPIC_QUEUE2, true, false, false, null);//6.接收消息,并消费// DefaultConsumer是Consumer的接口实现DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(RabbitMQConnection.TOPIC_QUEUE2, true, consumer);}}

        交换机和队列绑定关系如下:

7 RPC(RPC模式)

7.1 模式介绍

        特点:实现远程通信,没有生产者和消费者,只有客户端Client和服务端Server,由RabbitMQ作为消息中间件提供两个队列RPC和Reply实现请求和响应的通信。request携带reply_to(指定响应应该放到哪个队列)和correlation_id(请求和响应关联id,指明队列中响应是对哪个请求回复的)两个属性,而reply携带correlation_id属性来表示该响应关联的是哪个请求。

        理解:RPC模式实际上类似HTTP协议,但是好处是通过在应用层工作,即使不太了解网络协议,也可以实现客户端和服务器的远程通信(并且这种方式是异步的)。

        适用场景:远程通信的高可用性和流量监控的场景。

7.2 使用案例

        当发送请求的时候,Client相当于生产者,Server相当于消费者;当返回响应时,Server相当于生产者,Client相当于消费者。

// RabbitMQ的连接配置public class RabbitMQConnection {public static final String HOST = "192.168.217.150"; //ippublic static final int PORT = 5672; //端口号public static final String VIRTUALHOST = "testVirtual"; //虚拟机名称,默认为/public static final String USERNAME = "admin"; //user,默认guestpublic static final String PASSWORD = "admin"; //password,默认guestpublic static final String QUEUE = "queue1"; //队列// 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange"; //广播交换机public static final String FANOUT_QUEUE1 = "fanout.queue1"; //广播交换机绑定队列1public static final String FANOUT_QUEUE2 = "fanout.queue2"; //广播交换机绑定队列2// 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange"; //定向交换机public static final String DIRECT_QUEUE1 = "direct.queue1"; //定向交换机绑定队列1public static final String DIRECT_QUEUE2 = "direct.queue2"; //定向交换机绑定队列2// 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange"; //通配符交换机public static final String TOPIC_QUEUE1 = "topic.queue1"; //通配符交换机绑定队列1public static final String TOPIC_QUEUE2 = "topic.queue2"; //通配符交换机绑定队列2// RPC模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; //请求队列public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue"; //响应队列}
//客户端public class RpcClient {public static void main(String[] args) throws Exception{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.不声明交换机(使用默认的)//6.声明队列channel.queueDeclare(RabbitMQConnection.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(RabbitMQConnection.RPC_RESPONSE_QUEUE, true, false, false, null);//7.发送请求:通过channel发送消息到请求队列中(交换机、RoutingKey(这里指明发送的队列)、属性、消息内容)String msg = "Hello Server!I am Client";//使用建造者模式创建BasicProperties对象//correlationId()设置消息的唯一ID(用UUID生成)String correlationID = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(RabbitMQConnection.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", RabbitMQConnection.RPC_REQUEST_QUEUE, properties, msg.getBytes());System.out.println("消息发送成功");//8.接收响应//使用阻塞队列,保证同步关系,防止还未接收到响应代码就执行结束final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String responseMsg = new String(body);System.out.println("接收到的响应是:" + responseMsg);if(correlationID.equals(properties.getCorrelationId())){//如果correlationID值一样,则把该消息加入到阻塞队列中blockingQueue.offer(responseMsg);}}};channel.basicConsume(RabbitMQConnection.RPC_RESPONSE_QUEUE,true, consumer);String result = blockingQueue.take();System.out.println("[RPC Client 接收响应:]" + result);}}
//服务器public class RpcServer {public static void main(String[] args) throws Exception{//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建channel通道Channel channel = connection.createChannel();//5.声明队列channel.queueDeclare(RabbitMQConnection.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(RabbitMQConnection.RPC_RESPONSE_QUEUE, true, false, false, null);//6.接收响应//设置同时最多只能获取一条消息(否则队列中消息很多分不清哪个是自己需要的)channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("[RPC Server 接收请求:]" + request);//7.发送响应String response = "Hi Client!Welcome to Server";//需要在响应中添加correlationIdAMQP.BasicProperties  basicProperties= new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), basicProperties, response.getBytes());//消费者接收到消息后可以自动确认和手动确认,如果选择不自动确认,需要使用basicAck()手动确认//手动确认会生成确认的唯一ID,envelope中获取,basicAck(确认唯一ID,是否批量确认)channel.basicAck(envelope.getDeliveryTag(), false);}};//(订阅队列,是否自动确认,处理行为)channel.basicConsume(RabbitMQConnection.RPC_REQUEST_QUEUE, false, consumer);}}

8 Publisher Confirms(发布确认模式)

8.1 模式介绍

        消息丢失三种情况:

        1.生产者发送给RabbitMQ过程中丢了;(发布确认模式解决)

        2.RabbitMQ因为自身内部原因导致消息丢失;(RabbitMQ持久化)

        3.RabbitMQ发送给消费者,由于消费者没有处理好消息而RabbitMQ内部已经将处理失败的消息丢失。(手动确认basicAck())

        特点:RabbitMQ提供的确保消息可靠传输到RabbitMQ服务器的机制(情况(1)),生产者(通过channel.confirmSelect()设置confirm模式,发送的每条消息都有id)发送消息给RabbitMQ,RabbitMQ收到后会异步地给生产者返回ACK(包含消息id)。

        策略:

        1.Publishing Messages Individually(单独确认):生产者每发送一条消息都等待RabbitMQ的一次确认后再发送消息;

        2.Publishing Messages in Batches(批量确认):生产者每发送一批消息,等待这批消息的确认;

        3.Handling Publisher Confirms Asynchronously(异步确认):提供回调方法,RabbitMQ发送一条或一批消息的确认后由回调方法通知生产者进行处理。

        适用场景:对数据安全性要求较高的场景。

        注意:生产者与RabbitMQ通过TCP建立连接,TCP就有ACK机制,为什么还需要RabbitMQ的发布确认模式?TCP的ACK机制工作在传输层只能确认数据是否被传输到接收缓冲区,但是从接收缓冲区到交换机再到队列(如果有持久化还需要持久化到硬盘),这段处理消息也可能会传输也可能失败或丢失,这是应用层范围,TCP协议无感知。而发布确认模式工作在应用层,其设计目的就是保证从消息发送到消息入队列、持久化到硬盘的成功或失败是可以被感知的,从而确保消息可靠传输。

8.2 使用案例

public class Confirm {public static final Integer MESSAGE_COUNT = 10000;public static Connection getConnection() throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost(RabbitMQConnection.HOST);factory.setPort(RabbitMQConnection.PORT);factory.setVirtualHost(RabbitMQConnection.VIRTUALHOST);factory.setUsername(RabbitMQConnection.USERNAME);factory.setPassword(RabbitMQConnection.PASSWORD);//3.创建连接Connectionreturn factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//(1)Publishing Messages Individually(单独确认)publishingMessagesIndividually();//(2)Publishing Messages in Batches(批量确认)publishingMessagesInBatches();//(3)Handling Publisher Confirms Asynchronously(异步确认)handlingPublisherConfirmsAsynchronously();}
    private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4.添加确认监听器//用有序集合存储还未确认的消息IDSortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {if(b){//如果是批量确认,获取确认ID并清除集合中ID值以前的所有ID//headSet()返回小于参数的集合confirmSeqNo.headSet(l+1).clear();}else{//如果不是批量确认,则清除集合中ID值confirmSeqNo.remove(l);}}@Overridepublic void handleNack(long l, boolean b) throws IOException {if(b){//如果是批量确认,获取确认ID并清除集合中ID值以前的所有ID//headSet()返回小于参数的集合confirmSeqNo.headSet(l+1).clear();}else{//如果不是批量确认,则清除集合中ID值confirmSeqNo.remove(l);}//对于Nack(RabbitMQ内部问题导致消息丢失),通常需要进行消息重发,具体什么策略看业务}});//5.发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;//获取消息IDlong seqNo = channel.getNextPublishSeqNo();channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());//发送后把消息ID放入有序集合等待异步确认confirmSeqNo.add(seqNo);}//异步等待确认,直到所有的ID都确认完毕while(!confirmSeqNo.isEmpty()){//减少CPU空转浪费Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4.发送消息long start = System.currentTimeMillis();int batch = 200;int count = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());count++;//每200条进行一次确认if(count == batch){//等待确认(超时时间5s)channel.waitForConfirmsOrDie(5000);count = 0;}}//确认剩余还未确认的消息if(count > 0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try(Connection connection = getConnection()) {//1.开启通道Channel channel = connection.createChannel();//2.为通道设置确认模式channel.confirmSelect();//3.声明队列channel.queueDeclare(RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4.发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "message" + i;channel.basicPublish("", RabbitMQConnection.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认(超时时间5s)channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略: %d 条消息发送完毕, 共计等待 %d ms \n",MESSAGE_COUNT,(end - start));}}}

        可以发现,消息量很大的情况下,异步确认最快,单独确认最慢。这是由于单独确认每发一条消息,生产者就要阻塞等待RabbitMQ服务器返回的确认,浪费时间。而批量确认阻塞等待的次数少了,用时也就减少了。异步确认最大化的利用CPU和时间等资源,让生产者可以同时进行消息发送和确认等待的过程。

9 SpringBoot实现RabbitMQ常用的四种工作模式(工作队列、发布订阅、路由、通配符)

9.1 引入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

9.2 配置文件

#配置方式1,配置RabbitMQ的基本信息#spring:#  rabbitmq:#    host: 192.168.217.150#    port: 5672#    username: admin#    password: admin#    virtual-host: testVirtual#配置方式2,格式:amqp://username:password@Ip:port/virtual-hostspring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtual

9.3 具体实现

9.3.1 Work Queue(工作队列)

        相关配置的通用代码(队列名称、声明队列),在SpringBoot中,队列声明均是以对象形式(Bean)存在,这样方便Spring进行管理:

public class RabbitMQConnection {public static final String WORK_QUEUE = "work_queue";}
@Configurationpublic class RabbitMQConfig {@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(RabbitMQConnection.WORK_QUEUE).build();}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("work")public String work(){//内置交换机RoutingKey和队列名称一致rabbitTemplate.convertAndSend("", RabbitMQConnection.WORK_QUEUE,"Hello SpringBoot RabbitMQ");return "发送成功";}}

        @RabbitListener注解注释到方法就表示消费者(也可以注释到类)。注解需要带上队列名作为参数,表示消费者订阅的队列。

        该注解注释的方法可以填写三种类型的参数:

        Message(org.springframework.amqp.core.Message):消息内容+消息属性(消息ID、队列信息等等)。

        String:消息内容。

        Channel(com.rabbitmq.client.Channel):RabbitMQ通道对象,可以用于更高级操作比如手动确认消息。

        消费者代码(实际中一个消费者写一个类,这里方便演示才写到一起):

@Componentpublic class WorkListener {//消费者1@RabbitListener(queues = RabbitMQConnection.WORK_QUEUE)public void queueListener1(Message message){System.out.println("listener 1["+RabbitMQConnection.WORK_QUEUE+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.WORK_QUEUE)public void queueListener2(Message message){System.out.println("listener 2["+RabbitMQConnection.WORK_QUEUE+"]收到消息:" + message);}}

        具体的Message信息如下:listener 1[work_queue]收到消息:(Body:'Hello SpringBoot RabbitMQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work_queue, deliveryTag=1, consumerTag=amq.ctag-NdS51xpxo5Z5D-r6pWwYvA, consumerQueue=work_queue])

        deliveryTag表示消息ID,一个通道channel的消息ID是递增的,不同通道的消息ID独立。

9.3.2 Publish/Subscribe(发布订阅)

        相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):

public class RabbitMQConnection {public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";public static final String FANOUT_EXCHANGE = "fanout.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(RabbitMQConnection.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(RabbitMQConnection.FANOUT_QUEUE1).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(RabbitMQConnection.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueueBinding1")public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("fanoutQueueBinding2")public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}}

        在绑定队列和交换机的时候,方法形参是队列名和交换机名,而Spring加载Bean的时候会自动注入参数,按照形参名查找Bean的名称,因此如果不加@Qualifier注解指定形参对应的Bean,Spring就会找不到Bean而报错。

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("fanout")public String fanout(){rabbitTemplate.convertAndSend(RabbitMQConnection.FANOUT_EXCHANGE, "","Hello SpringBoot RabbitMQ");return "发送成功";}}

        消费者代码:

@Componentpublic class FanoutListener {//消费者1@RabbitListener(queues = RabbitMQConnection.FANOUT_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.FANOUT_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.FANOUT_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.FANOUT_QUEUE2+"]收到消息:" + message);}}

9.3.3 Routing(路由)

        相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):

public class RabbitMQConnection {public static final String ROUTING_QUEUE1 = "routing.queue1";public static final String ROUTING_QUEUE2 = "routing.queue2";public static final String DIRECT_EXCHANGE = "direct.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("routingQueue1")public Queue routingQueue1(){return QueueBuilder.durable(RabbitMQConnection.ROUTING_QUEUE1).build();}@Bean("routingQueue2")public Queue routingQueue2(){return QueueBuilder.durable(RabbitMQConnection.ROUTING_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.DIRECT_EXCHANGE).durable(true).build();}@Bean("routingQueueBinding1")public Binding routingQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("apple");}@Bean("routingQueueBinding2")public Binding routingQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("dog");}@Bean("routingQueueBinding3")public Binding routingQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("routingQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("apple");}}

        生产者代码如下:

@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("routing")public String routing(String routingKey){rabbitTemplate.convertAndSend(RabbitMQConnection.DIRECT_EXCHANGE, routingKey,"Hello SpringBoot RabbitMQ routingKey="+routingKey);return "发送成功";}}

        消费者代码如下:

@Componentpublic class RoutingListener {//消费者1@RabbitListener(queues = RabbitMQConnection.ROUTING_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.ROUTING_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.ROUTING_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.ROUTING_QUEUE2+"]收到消息:" + message);}}

        依次发送routingKey为apple和dog的两条消息,结果如下:

9.3.4 Topics(通配符)

        相关配置的通用代码(队列名称、交换机名称、声明队列和交换机):

public class RabbitMQConnection {public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";public static final String TOPIC_EXCHANGE = "topic.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(RabbitMQConnection.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(RabbitMQConnection.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(RabbitMQConnection.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.a.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.b");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("c.#");}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("topic")public String topic(String routingKey){rabbitTemplate.convertAndSend(RabbitMQConnection.TOPIC_EXCHANGE, routingKey,"Hello SpringBoot RabbitMQ routingKey="+routingKey);return "发送成功";}}

        消费者代码:

@Componentpublic class TopicListener {//消费者1@RabbitListener(queues = RabbitMQConnection.TOPIC_QUEUE1)public void queueListener1(String message){System.out.println("listener 1["+RabbitMQConnection.TOPIC_QUEUE1+"]收到消息:" + message);}//消费者2@RabbitListener(queues = RabbitMQConnection.TOPIC_QUEUE2)public void queueListener2(String message){System.out.println("listener 2["+RabbitMQConnection.TOPIC_QUEUE2+"]收到消息:" + message);}}

        依次发送routingKey为apple.a.dog、apple.a.b和c.apple.dog三条消息,结果如下:

        注意:如果把@RabbitListener注解放到类上,那类中可能存在多种消息类型的消费者方法(比如Message、String甚至可能是对象),此时@RabbitListener就不知道需要哪种方法。此时需要在方法上添加@RabbitHandle注解,这样当监听的队列有消息时,就可以根据消息类型来匹配对应的处理方法。

9.3.5 消息内容为对象

        当消息内容为对象时,由于RabbitMQ只能接受序列化的内容,因此需要对对象进行序列化再传输。RabbitMQ推荐JSON格式,使用Jackson2JsonMessageConverter(spring的amqp包提供)作为JSON序列化的转化器,把它设置到RabbitTemplate中,此时传输的对象就会自动进行JSON序列化。

@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jackson2JsonMessageConverter()); // 设置消息转换器return template;}

        如果生产者使用了JSON来序列化对象,那么消费者拿到对象就要使用JSON反序列化才能解析该对象,因此上述代码不仅要放到生产者所在的服务里,也要放到消费者所在的服务里。

10 RabbitMQ的消息传递模式

        RabbitMQ支持两种消息传递模式:推模式和拉模式,主要是以推模式工作的。

        推模式是指RabbitMQ主动把消息推送给消费者,只要队列有消息,会全部推送给订阅队列的消费者(SpringBoot中使用channel.basicConsume()来订阅队列)。因此推模式适合对数据实时性要求较高的场景。

        拉模式是指消费者主动从队列中获取消息并消费(SpringBoot中使用channel.basicGet()来拉取消息),一次拉取一条消息消费完再拉取消息。消费者可以按照自身处理消息的速度来消费,因此拉模式适合流量控制、需要大规模计算资源处理消息的场景。

下篇文章:

http://www.dtcms.com/a/283103.html

相关文章:

  • Python类中魔术方法(Magic Methods)完全指南:从入门到精通
  • 分布式系统高可用性设计 - 监控与日志系统
  • 风电箱变、风机、升压站等场景在线监测:助力电力系统稳定可靠运行
  • [论文阅读] 人工智能 + 软件工程 | 用交互式可视化革新软件文档:Helveg工具的设计与改进
  • 21、鸿蒙Harmony Next开发:组件导航(Navigation)
  • 0系统与软件工程-标准体系
  • 【多线程的常见使用场景】
  • 工业自动化中EtherCAT转Profinet网关的速度控制模式配置与优化
  • 破壳萌图鉴(宝可梦) 2.1.2311052226/界面简洁流畅,没有广告
  • Optional:orElse 和 orElseGet 的底层逻辑,决定了它们的本质区别
  • 大模型呼叫系统选型指南:以云蝠智能VoiceAgent为核心的企业升级路径
  • Linux 下安装DM8数据库详细教程
  • Linux下保存Docker镜像文件至本地及启动
  • CSS:transition语法
  • Linux 定时器应用示例(修正版)
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十八课——图像高斯滤波金字塔的实现
  • RCV在电力大数据平台中的集成与标准化建设:推动数据资产价值释放的关键途径
  • 每日钉钉API探索:getAuthCode实现免登授权
  • STM32超声波模块
  • 基于Matlab改进大津法和Gabor滤波的织物缺陷检测系统
  • Java-数构链表
  • 聚合配送与传统配送平台的差异:从运营模式到市场价值
  • XXE漏洞3-通过 XXE 漏洞实现文件读取及端口探测
  • 开源Agent平台Dify源码剖析系列(四)核心模块core/agent之CotAgentRunner
  • SMTPman,发送邮件服务器smtp的功能详解!
  • 统计功效是什么?
  • ST17H36 蓝牙Soc开发(4)—— 外设应用1
  • mac电脑无法阅读runc源码
  • 【网易云-header】
  • HarmonyOS从入门到精通:自定义组件开发指南(九):组件复合与组合模式探秘