【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式
文章目录
- 工作队列模式
- 引入依赖
- 配置
- 声明
- 生产者代码
- 消费者代码
- 发布/订阅模式
- 引入依赖
- 声明
- 生产者代码
- 发送消息
- 消费者代码
- 运行程序
- 路由模式
- 声明
- 生产者代码
- 消费者代码
- 运行程序
- 通配符模式
- 声明
- 生产者代码
- 消费者代码
- 运行程序
工作队列模式
引入依赖
我们在创建 SpringBoot
项目的时候,选上这两个依赖即可
或者在依赖中加入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
将配置文件后缀改成 yml
之后,进行配置
#配置 RabbitMQ 的基本信息
spring:rabbitmq: host: 127.0.0.1 #RabbitMQ 服务器的地址 port: 15673 #RabbitMQ的TCP协议的端口号,而不是管理平台的端口号。默认为5672 username: guest password: guest virtual-host: coding #默认为 /
或者这样写
spring:rabbitmq:addresses: amqp://guest:guest@127.0.0.1:5672/coding
- 格式为:
amqp://username:password@ip:port/virtual-host
声明
注意引入的是这个包
package org.example.rabbitmq.config; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 声明一个队列,来自第三方包,就是一个对象 @Bean("workQueue") public Queue workQueue(){ return QueueBuilder.durable(Constants.WORK_QUEUE).build(); }
}
生产者代码
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/work") public String work() { // 使用内置交换机的话,RoutingKey 和队列名称一致 rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work..."); return "发送成功"; }
}
- 在运行程序之后,队列不会被立马创建出来
- 需要发送消息之后才会被创建
消费者代码
消费者是通过实现一个监听类,来监听有没有消息
- 采用一个注解——
@RabbitListener
@RabbitListener
是Spring
框架中用于监听RabbitMQ
队列的注解,通过使用这个注解,可以定义一个方法,以便从RabbitMQ
队列中接收消息。
- 该注解支持多种参数类型,这些参数类型代表了从
RabbitMQ
接收到的消息和相关信息- 以下是一些常用的参数类型:
String
:返回消息的内容Message
(org.spring.framework.ampq.core.Message
):Spring AMPQ
的Message
类,返回原始的消息体以及消息的属性,如消息ID
,内容,队列信息等Channel
(com.rabbitmq.client.Channel
):RabbitMQ
的通道对象,可以用于进行高级的操作,如手动确认消息
package org.example.rabbitmq.listener; import org.apache.logging.log4j.message.Message;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class WorkListener { @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener1(Message message) { System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener2(String message) { System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message); }
}
发布/订阅模式
在发布/订阅模式中,多了一个 Exchange
角色。Exchange
常见有三种类型,分别代表不同的路由规则
Fanout
: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe
模式)Direct
: 定向,把消息交给符合指定Routing Key
的队列(Routing
模式)Topic
: 通配符,把消息交给符合Routing pattern
(路由模式) 的队列(Topics
模式)
引入依赖
我们在创建 SpringBoot
项目的时候,选上这两个依赖即可
或者在依赖中加入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
声明
package org.example.rabbitmq.config; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { /** * 二、发布/订阅模式 * 声明队列、声明交换机、声明队列和交换机的绑定 * @return */ @Bean("fanoutQueue1") // @Bean注解:交给Spring进行管理, 括号里面是指定名称 public Queue fanoutQueue1() { return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build(); } @Bean("fanoutQueue2") public Queue fanoutQueue2() { return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build(); } @Bean("fanoutExchange") // 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange public FanoutExchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build(); } @Bean("fanoutQueueBinding1") public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean("fanoutQueueBinding2") public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); }
}
生产者代码
- 声明队列
- 声明交换机
- 声明交换机和队列的绑定
- 发送消息
发送消息
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/fanout") public String fanout() { rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout..."); return "发送成功"; }
}
消费者代码
package org.example.rabbitmq.listener; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class FanoutListener { @RabbitListener(queues = Constants.FANOUT_QUEUE1) public void queueListener1(String message) { System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.FANOUT_QUEUE2) public void queueListener2(String message) { System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message); }
}
运行程序
- 运行项目,调用接口发送消息
- http://127.0.0.1:8080/producer/fanout
- 监听类收到消息,并打印
路由模式
交换机类型为 Direct
时,会把消息交给符合指定 Routing Key
的队列
- 队列和交换机的绑定,不是任意的绑定了,而是要制定一个
RoutingKey
(路由key
) - 消息的发送方在向
Exchange
发送消息时,也需要指定消息的RoutingKey
Exchange
也不再把消息交给每一个绑定的key
,而是根据消息的RoutingKey
进行判断,只有队列的RoutingKey
和消息的RoutingKey
完全一致,才会接收消息
声明
按照这个图片,进行绑定
/** * 三、 路由模式 * 声明队列、声明交换机、声明队列和交换机的绑定 * @return */
@Bean("directQueue1")
public Queue directQueue1(){ return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
} @Bean("directQueue2")
public Queue directQueue2(){ return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
} @Bean("directExchange")
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange
public DirectExchange directExchange() { return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
} @Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("a");
} @Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("a");
} @Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("b");
} @Bean("directQueueBinding4")
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("c");
}
生产者代码
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; /** * 三、路由模式 * @param routingKey * @return */ @RequestMapping("/direct/{routingKey}") //从路径中拿到这个routingKey public String direct(@PathVariable("routingKey") String routingKey) { rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey); return "发送成功"; }
}
消费者代码
package org.example.rabbitmq.listener; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class DirectListener { @RabbitListener(queues = Constants.DIRECT_QUEUE1) public void queueListener1(String message) { System.out.println("队列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.DIRECT_QUEUE2) public void queueListener2(String message) { System.out.println("队列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message); }
}
运行程序
-
运行项目
-
调用接口发送
routingKey
为a
的消息- http://127.0.0.1:8080/producer/direct/a
- 观察后端日志,队列 1 和 2 都收到消息
-
调用接口发送
routingKey
为b
的消息- http://127.0.0.1:8080/producer/direct/b
- 观察后端日志,队列 2 收到消息
-
调用接口发送
routingKey
为c
的消息- http://127.0.0.1:8080/producer/direct/c
- 观察后端日志,队列 2 收到消息
通配符模式
Topics
和 Routing
模式的区别是:
topics
模式使用的交换机类型为topic
(Routing
模式使用的是direct
)topic
类型的交换机在匹配规则上进行了扩展,Binding Key
支持通配符匹配
*
表示一个单词#
表示多个单词
声明
/** * 四、通配符模式 * 声明队列、声明交换机、声明队列和交换机的绑定 * @return */
@Bean("topicQueue1")
public Queue topicQueue1(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
} @Bean("topicQueue2")
public Queue topicQueue2(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
} @Bean("topicExchange")
public TopicExchange topicExchange() { return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
} @Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");
} @Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");
} @Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");
}
生产者代码
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; /** * 四、通配符模式 * @param routingKey * @return */ @RequestMapping("/topic/{routingKey}") public String topic(@PathVariable("routingKey") String routingKey) { rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey); return "发送成功"; }
}
消费者代码
运行程序
-
运行程序
-
调用接口发送
routingKey
为qqq.a.b
的消息- http://127.0.0.1:8080/producer/topic/qqq.a.b
- 观察后端日志,队列 1 和队列 2 均收到消息
-
调用接口发送
routingKey
为c.abc.fff
的消息- http://127.0.0.1:8080/producer/topic/c.abc.fff
- 观察后端日志,队列 2 收到信息
-
调用接口发送
routingKey
为g.h.j
的消息- http://127.0.0.1:8080/producer/topic/g.h.j
- 观察后端日志,没有队列收到消息