RabbitMQ消息队列实战指南
RabbitMQ 是什么?
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并传递给消费者,在这个过程中,根据路由规则进行消息的路由、缓存和持久化。
AMQP,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件而设计的。基于此协议的客户端与消息中间件可以传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。RabbitMQ就是通过Erlang语言实现的一种消息中间件。
具备下面的核心功能:
- 异步通信:允许应用程序通过消息队列解耦,生产者发送消息后无需等待消费者处理。
- 消息路由:通过灵活的交换器(Exchange)机制,支持多种消息分发模式。
- 可靠性保障:提供消息持久化、确认机制(ACK)和重试策略,确保消息不丢失。
- 负载均衡:通过轮询或权重分配方式将消息分发给多个消费者,提升系统吞吐量。
为什么需要消息队列?
在分布式系统中,直接调用(如 HTTP 请求)可能导致以下问题:
- 耦合性高:服务之间依赖性强,一个服务故障可能引发雪崩效应。
- 性能瓶颈:同步调用会阻塞线程,影响系统响应速度。
- 扩展困难:高并发场景下难以动态调整消费者数量。
消息队列通过异步通信和缓冲机制解决了这些问题:生产者发送消息到队列后即可返回,消费者按自身能力处理消息。即使消费者暂时不可用,消息仍能存储在队列中,避免数据丢失。
核心概念 :
名称 | 说明 |
Producer | 生产者,发送消息的一方 |
Consumer | 消费者,接收消息的一方 |
Queue | 队列,存储消息的缓冲区 |
Exchange | 交换机,负责转发消息到队列 |
Routing Key | 路由键,决定消息如何路由 |
Binding | 绑定,连接交换机与队列的规则 |
Message | 消息,最终传输的数据 |
工作模型 :
模型类型(英文) | 中文名称 | 简介 |
Simple | 简单队列模型 | 一个生产者对应一个队列和一个消费者,最基础的模型,适合入门学习或简单通信。 |
Work Queue | 工作队列模型 | 一个生产者将任务发送到队列,由多个消费者竞争消费,常用于任务分发和后台处理。 |
Publish/Subscribe | 发布/订阅模型 | 通过 fanout 类型交换机,生产者发送的消息会广播到所有绑定的队列,适合通知、广播类场景。 |
Routing | 路由模型 | 使用 direct 类型交换机,生产者根据路由键将消息精确投递到指定队列,适合日志分级处理等场景。 |
Topics | 主题(通配路由)模型 | 使用 topic 类型交换机,支持模糊匹配路由键(如 |
RPC 模式 | 远程调用模型 | 实现远程服务调用,生产者发送请求并等待消费者返回响应,适合系统之间的异步调用场景。 |
Docker安装RabbitMQ
1、拉取RabbitMQ镜像
命令:
docker pull rabbitmq
2、启动
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
3、进入容器内部
docker exec -it rabbit /bin/bash
4、安装插件
rabbitmq-plugins enable rabbitmq_management
5、查看插件情况
rabbitmq-plugins list
6、访问RabbitMQ
http://192.168.142.3:15672/
账号:guest
密码:guest
RabbitMQ安装方式解压安装
1、下载RabbitMQ
Installing RabbitMQ | RabbitMQ
2、下载Erlang
RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,需要是安装 Erlang
Erlang
和RabbitMQ
版本对照:https://www.rabbitmq.com/which-erlang.html
这里安装最新版本3.8.14的RabbitMQ
,对应的Erlang
版本推荐23.x
下载地址:el/7/erlang-23.2.7-2.el7.x86_64.rpm - rabbitmq/erlang · packagecloud
3、将下载好的文件上传到服务器
# 创建文件
mkdir -p /opt/rabbitmq
将安装包上传到/opt/rabbitmq
4、安装Erlang
cd /opt/rabbitmq
# 解压
rpm -Uvh erlang-23.2.7-2.el7.x86_64.rpm# 安装
yum install -y erlang
如果yum无法使用:
使用国内镜像源(适用于中国用户)
- 备份原有 repo 文件:
sudo mkdir /etc/yum.repos.d/backup
sudo mv /etc/yum.repos.d/CentOS-* /etc/yum.repos.d/backup/
- 下载阿里云镜像源:
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
- 清理并重建缓存:
sudo yum clean all
sudo yum makecache
安装完成后输入如下指令查看版本号输入两次ctrl+c退出
5、安装RabbitMQ
在RabiitMQ
安装过程中需要依赖socat
插件,首先安装该插件
yum install -y socat
解压安装RabbitMQ
的安装包
# 解压
rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm
# 安装
yum install -y rabbitmq-server
6、启动RabbitMQ服务
# 启动rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq状态
systemctl status rabbitmq-server# 其他命令
# 设置rabbitmq服务开机自启动
systemctl enable rabbitmq-server
# 关闭rabbitmq服务
systemctl stop rabbitmq-server
# 重启rabbitmq服务
systemctl restart rabbitmq-server
7、RabbitMQWeb管理界面及授权操作
systemctl stop firewalld
# 打开RabbitMQWeb管理界面插件
rabbitmq-plugins enable rabbitmq_management
打开浏览器,访问服务器公网ip:15672
http://192.168.142.131:15672/
rabbitmq
有一个默认的账号密码guest
添加远程用户
# 添加用户
rabbitmqctl add_user 用户名 密码
rabbitmqctl add_user admin 123456
# 设置用户角色,分配操作权限
rabbitmqctl set_user_tags 用户名 角色
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"#其他指令
# 修改密码
rabbitmqctl change_ password 用户名 新密码
# 删除用户
rabbitmqctl delete_user 用户名
# 查看用户清单
rabbitmqctl list_users
角色有四种:
administrator
:可以登录控制台、查看所有信息、并对rabbitmq进行管理monToring
:监控者;登录控制台,查看所有信息policymaker
:策略制定者;登录控制台指定策略managment
:普通管理员;登录控制
创建用户admin
,密码123456
,设置administrator
角色,赋予所有权限
然后访问 http://192.168.142.131:15672/ 用户名:admin 密码:123456
8、延时队列插件安装
Community Plugins | RabbitMQ(RabbitMQ是什么版本的,下载的插件就得是什么版本的)
将插件上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后重启rabbitMQ
SpringBoot如何使用RabbitMQ
1、创建SpringBoot项目引入依赖
<!-- Spring Boot Starter for RabbitMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在 application.yml 中添加 RabbitMQ 配置
spring:# RabbitMQ 相关配置rabbitmq:# RabbitMQ 服务器地址host: 192.168.142.131# RabbitMQ 服务器端口 客户端应用程序(生产者/消费者)通过这个端口与 RabbitMQ 服务器交互port: 5672# RabbitMQ 用户名username: admin# RabbitMQ 密码password: 123456# RabbitMQ 虚拟主机virtual-host: /# 开启发送方确认机制publisher-confirm-type: correlated# 开启发送方退回机制publisher-returns: true# RabbitMQ 模板配置template:# 设置为 true 时,RabbitMQ 将确认消息是否成功投递到队列mandatory: true# 消息监听器配置listener:simple:# 消费者最小数量concurrency: 5# 消费者最大数量max-concurrency: 10# 每次从队列中获取的消息数量prefetch: 1# 消费者手动 ackacknowledge-mode: manual
3、创建 RabbitMQ 配置类
package com.lw.mqdemo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ配置类,用于声明队列、交换机、绑定关系以及配置消息转换器和RabbitTemplate*/
@Slf4j
@Configuration
public class RabbitMQConfig {// 定义队列、交换机和路由键名称// 定义直连交换机相关的常量// 队列名称public static final String DIRECT_QUEUE = "test.direct.queue";// 交换机名称public static final String DIRECT_EXCHANGE = "test.direct.exchange";// 路由键名称public static final String DIRECT_ROUTING_KEY = "test.direct.routingkey";// 定义主题交换机相关的常量// 队列1名称public static final String TOPIC_QUEUE_1 = "test.topic.queue1";// 队列2名称public static final String TOPIC_QUEUE_2 = "test.topic.queue2";// 交换机名称public static final String TOPIC_EXCHANGE = "test.topic.exchange";// 路由键名称public static final String TOPIC_ROUTING_KEY_1 = "test.topic.routingkey1";// 路由键名称public static final String TOPIC_ROUTING_KEY_2 = "test.topic.#";// 定义扇形交换机相关的常量// 队列1名称public static final String FANOUT_QUEUE_1 = "test.fanout.queue1";// 队列2名称public static final String FANOUT_QUEUE_2 = "test.fanout.queue2";// 交换机名称public static final String FANOUT_EXCHANGE = "test.fanout.exchange";// 1. 直连型交换机队列/*** 声明直连型交换机的队列** @return 队列对象*/@Beanpublic Queue directQueue() {log.info("创建队列: " + DIRECT_QUEUE);return new Queue(DIRECT_QUEUE, true);}// 2. 主题型交换机队列/*** 声明主题型交换机的第一个队列** @return 队列对象*/@Beanpublic Queue topicQueue1() {log.info("创建队列: " + TOPIC_QUEUE_1);return new Queue(TOPIC_QUEUE_1, true);}/*** 声明主题型交换机的第二个队列** @return 队列对象*/@Beanpublic Queue topicQueue2() {log.info("创建队列: " + TOPIC_QUEUE_2);return new Queue(TOPIC_QUEUE_2, true);}// 3. 扇形交换机队列/*** 声明扇形交换机的第一个队列** @return 队列对象*/@Beanpublic Queue fanoutQueue1() {log.info("创建队列: " + FANOUT_QUEUE_1);return new Queue(FANOUT_QUEUE_1, true);}/*** 声明扇形交换机的第二个队列** @return 队列对象*/@Beanpublic Queue fanoutQueue2() {log.info("创建队列: " + FANOUT_QUEUE_2);return new Queue(FANOUT_QUEUE_2, true);}// 1. 直连型交换机/*** 声明直连型交换机** @return 交换机对象*/@Beanpublic DirectExchange directExchange() {log.info("创建交换机: " + DIRECT_EXCHANGE);return new DirectExchange(DIRECT_EXCHANGE, true, false);}// 2. 主题型交换机/*** 声明主题型交换机** @return 交换机对象*/@Beanpublic TopicExchange topicExchange() {log.info("创建交换机: " + TOPIC_EXCHANGE);return new TopicExchange(TOPIC_EXCHANGE, true, false);}// 3. 扇形交换机/*** 声明扇形交换机** @return 交换机对象*/@Beanpublic FanoutExchange fanoutExchange() {log.info("创建交换机: " + FANOUT_EXCHANGE);return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 绑定直连型交换机和队列/*** 绑定直连型交换机和队列** @return 绑定对象*/@Beanpublic Binding bindingDirect() {log.info("绑定队列: " + DIRECT_QUEUE + " 到交换机: " + DIRECT_EXCHANGE + ",路由键: " + DIRECT_ROUTING_KEY);return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);}// 绑定主题型交换机和队列/*** 绑定主题型交换机的第一个队列** @return 绑定对象*/@Beanpublic Binding bindingTopic1() {log.info("绑定队列: " + TOPIC_QUEUE_1 + " 到交换机: " + TOPIC_EXCHANGE + ",路由键: " + TOPIC_ROUTING_KEY_1);return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);}/*** 绑定主题型交换机的第二个队列** @return 绑定对象*/@Beanpublic Binding bindingTopic2() {log.info("绑定队列: " + TOPIC_QUEUE_2 + " 到交换机: " + TOPIC_EXCHANGE + ",路由键: " + TOPIC_ROUTING_KEY_2);return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);}// 绑定扇形交换机和队列/*** 绑定扇形交换机的第一个队列** @return 绑定对象*/@Beanpublic Binding bindingFanout1() {log.info("绑定队列: " + FANOUT_QUEUE_1 + " 到交换机: " + FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 绑定扇形交换机的第二个队列** @return 绑定对象*/@Beanpublic Binding bindingFanout2() {log.info("绑定队列: " + FANOUT_QUEUE_2 + " 到交换机: " + FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}/*** 创建延迟交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed.exchange","x-delayed-message", true, false, args);}/*** 绑定延迟交换机* @return*/@Beanpublic Binding bindingDelayed() {return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(DIRECT_ROUTING_KEY).noargs();}// 使用JSON序列化消息/*** 配置JSON消息转换器** @return 消息转换器对象*/@Beanpublic MessageConverter jsonMessageConverter() {log.info("配置JSON消息转换器");return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate/*** 配置RabbitTemplate,设置消息转换器以及消息发送确认和返回回调** @param connectionFactory 连接工厂* @return 配置好的RabbitTemplate对象*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jsonMessageConverter());// 消息发送到交换器后确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息成功发送到Exchange: " + correlationData);} else {log.error("消息发送到Exchange失败: " + correlationData);}});// 消息从交换器发送到队列失败回调rabbitTemplate.setReturnsCallback(returned -> {log.info("消息从Exchange路由到Queue失败: " + returned.getMessage());log.info("交换机: " + returned.getExchange());log.info("路由键: " + returned.getRoutingKey());log.info("返回码: " + returned.getReplyCode());log.info("返回信息: " + returned.getReplyText());});return rabbitTemplate;}
}
4、创建消息生产者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** RabbitMQ生产者类,用于发送不同类型的交换机消息*/
@Component
public class RabbitMQProducer {/*** 注入RabbitTemplate模板,用于发送消息*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 注入MessageConverter转换器,用于将对象转换为消息*/@Autowiredprivate MessageConverter messageConverter;/*** 发送直连型交换机消息** @param message 要发送的消息对象*/public void sendDirectMessage(Object message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, // 交换机名称RabbitMQConfig.DIRECT_ROUTING_KEY, // 路由键 用于消息路由message, // 要发送的消息对象correlationData // 消息的唯一标识);}/*** 发送主题型交换机消息1** @param message 要发送的消息对象*/public void sendTopicMessage1(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交换机名称RabbitMQConfig.TOPIC_ROUTING_KEY_1, // 路由键 用于消息路由message // 要发送的消息对象);}/*** 发送主题型交换机消息2** @param message 要发送的消息对象*/public void sendTopicMessage2(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交换机名称"test.topic.routingkey2.test", // 路由键 用于消息路由message);}/*** 发送扇形交换机消息** @param message 要发送的消息对象*/public void sendFanoutMessage(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, // 交换机名称"", // 扇形交换机不需要路由键message);}/*** 发送延迟消息** @param message 要发送的消息对象* @param delayMillis 消息延迟的时间(毫秒)*/public void sendDelayedMessage(Object message, int delayMillis) {MessageProperties props = new MessageProperties();props.setDelay(delayMillis);Message msg = messageConverter.toMessage(message, props);rabbitTemplate.convertAndSend("delayed.exchange", // 关键修改点RabbitMQConfig.DIRECT_ROUTING_KEY,msg);}
}
5、创建消息消费者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** RabbitMQ消费者类* 该类包含了对不同类型队列(直连型、主题型、扇形)的消息消费方法*/
@Component
@Slf4j
public class RabbitMQConsumer {/*** 直连型队列消费者* 监听直连型队列并处理收到的消息** @param message 消息内容* @param channel 消息通道* @throws IOException 当消息处理失败时抛出异常*/@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void processDirectMessage(Message message, Channel channel) throws IOException {try {// 打印收到的消息内容log.info("直连型队列收到消息: " + new String(message.getBody()));// 手动ACK确认 如果不确认会消息Unacked状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主题型队列1消费者* 监听主题型队列1并处理收到的消息** @param message 消息内容* @param channel 消息通道* @param msg 消息对象* @throws IOException 当消息处理失败时抛出异常*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1)public void processTopicMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info("主题型队列1收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主题型队列2消费者* 监听主题型队列2并处理收到的消息** @param message 消息内容* @param channel 消息通道* @param msg 消息对象* @throws IOException 当消息处理失败时抛出异常*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2)public void processTopicMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info("主题型队列2收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形队列1消费者* 监听扇形队列1并处理收到的消息** @param message 消息内容* @param channel 消息通道* @param msg 消息对象* @throws IOException 当消息处理失败时抛出异1常*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)public void processFanoutMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info("扇形队列1收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形队列2消费者* 监听扇形队列2并处理收到的消息** @param message 消息内容* @param channel 消息通道* @param msg 消息对象* @throws IOException 当消息处理失败时抛出异常*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)public void processFanoutMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info("扇形队列2收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}
}
6、创建测试控制器
package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.RabbitMQProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** RabitMQ控制器* @author lw*/
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {// 注入RabbitMQ生产者@Autowiredprivate RabbitMQProducer rabbitMQProducer;/*** 发送直连型消息* @return*/@GetMapping("/direct")public String sendDirectMessage() {log.info("发送直连型消息");rabbitMQProducer.sendDirectMessage("这是一条直连型交换机消息");return "直连型消息发送成功";}/*** 发送主题型消息1* @return*/@GetMapping("/topic1")public String sendTopicMessage1() {log.info("发送主题型消息1");rabbitMQProducer.sendTopicMessage1("这是一条主题型交换机消息1");return "主题型消息1发送成功";}/*** 发送主题型消息2* @return*/@GetMapping("/topic2")public String sendTopicMessage2() {log.info("发送主题型消息2");rabbitMQProducer.sendTopicMessage2("这是一条主题型交换机消息2");return "主题型消息2发送成功";}@GetMapping("/fanout")public String sendFanoutMessage() {log.info("发送扇形消息");rabbitMQProducer.sendFanoutMessage("这是一条扇形交换机消息");return "扇形消息发送成功";}/*** 发送延迟消息* @return*/@GetMapping("/delay")public String sendDelayedMessage() {log.info("发送延迟消息");// 延迟5秒rabbitMQProducer.sendDelayedMessage("这是一条延迟消息", 5000);return "延迟消息发送成功,5秒后消费";}
}