Rabbitmq基础篇
文章目录
- RabbitMq基础篇
 - 1. 初识Mq
 - 1.1同步与异步
 - 1.2 消息队列的作用
 - 异步调用
 - 系统解耦
 - 流量削峰
 
- 1.3消息队列技术选型
 
- 2. 安装RabbitMq
 - 2.1系统准备
 - 2.2 安装 Erlang & RabbitMQ
 - 2.3 启用 Web 管理插件
 - 2.4创建管理员用户
 - 2.5开放防火墙端口
 - 2.6访问管理界面
 
- 3. 快速入门
 - 3.1依赖配置
 - 3.2 消息发送与接受
 
- 4.业务改造
 - 4.1.配置MQ
 - 4.2.接收消息
 - 4.3.发送消息
 
RabbitMq基础篇
1. 初识Mq
1.1同步与异步
正常情况下,我们调用一个服务,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?
 
解读:
- 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
 - 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。
 
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。
所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
1.2 消息队列的作用
基本上消息队列主要有以下几个作用:异步,解耦,削峰。
异步调用
异步处理指的是将一些不需要立即返回结果的操作,通过消息队列延后处理,从而提高系统的响应速度。
例如:用户注册后需要发送欢迎邮件,这个操作不需要立即完成,可以异步处理。
系统解耦
解耦是指系统各个模块之间不直接依赖,通过消息队列进行通信,从而降低模块间的耦合度,提高系统的可维护性和扩展性。
例如:订单服务不直接调用库存服务,而是通过消息队列通知库存服务扣减库存。
流量削峰
削峰是指在高并发场景下,通过消息队列缓冲瞬时大量请求,避免后端服务被压垮,实现流量的平滑处理。
例如:秒杀活动中,大量用户同时下单,消息队列将请求排队,后端服务按处理能力逐步消费。
1.3消息队列技术选型
几种常见MQ的对比:
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache | 
| 开发语言 | Erlang | Java | Java | Scala&Java | 
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 | 
| 可用性 | 高 | 一般 | 高 | 高 | 
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 | 
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 | 
| 消息可靠性 | 高 | 一般 | 高 | 一般 | 
追求可用性:Kafka、 RocketMQ 、RabbitMQ
 追求可靠性:RabbitMQ、RocketMQ
 追求吞吐能力:RocketMQ、Kafka
 追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好,因此接下来我们将详细介绍一下RabbitMq。
2. 安装RabbitMq
下面介绍一下在Ubuntu24.x系统上安装RabbitMq的步骤
2.1系统准备
sudo apt update && sudo apt -y upgrade
sudo apt install -y curl gnupg apt-transport-https lsb-release
 
2.2 安装 Erlang & RabbitMQ
sudo apt update
# 安装 Erlang/OTP 26 全量包
sudo apt install -y erlang-base \erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools \erlang-tftp erlang-tools erlang-xmerl# 安装 RabbitMQ Server
sudo apt install -y rabbitmq-server
 
安装完成后服务已自动启动并设为开机自启:
sudo systemctl status rabbitmq-server
 
2.3 启用 Web 管理插件
sudo rabbitmq-plugins enable rabbitmq_management
 
2.4创建管理员用户
# 建议删除默认 guest 用户(仅 localhost 可访问)
sudo rabbitmqctl delete_user guest# 创建新管理员
sudo rabbitmqctl add_user admin YOUR_PASSWORD
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
 
2.5开放防火墙端口
# 如启用了 ufw
sudo ufw allow 5672/tcp  # AMQP
sudo ufw allow 15672/tcp # Web UI
sudo ufw reload
 
云主机请同步在 安全组 放行上述端口。
2.6访问管理界面
浏览器打开:
http://<服务器IP>:15672
 
使用刚才创建的 admin / YOUR_PASSWORD 登录。
3. 快速入门
注意:一下操作均在SpringBoot3.5x版本使用
3.1依赖配置
引入RabbitMq的依赖:
<!--引入RabbitMQ : AMQP(高级消息队列协议) Advanced-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
 
配置:
# 在生产者或者消费者引入MQ服务端信息
spring:rabbitmq:host: <Your Host>port: 5672 virtual-host: "/" # 虚拟主机username: <Your User Name>password: <Your Password>
 
3.2 消息发送与接受
1)配置RabbitConfig配置类。用于初始化队列,交换机等
@Configuration
public class RabbitMQConfig {// 声明队列@Beanpublic Queue helloQueue1() {return new Queue("hello.queue1", true); // durable=true 持久化队列}@Beanpublic Queue chengfuNewsQueue() {return new Queue("chengfu.news", true);}// 声明交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct.exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.exchange");}// 声明绑定关系@Beanpublic Binding bindingHelloQueue(Queue helloQueue1, DirectExchange directExchange) {return BindingBuilder.bind(helloQueue1).to(directExchange).with("hello.queue1");}@Beanpublic Binding bindingChengfuNews(Queue chengfuNewsQueue, TopicExchange topicExchange) {return BindingBuilder.bind(chengfuNewsQueue).to(topicExchange).with("chengfu.news.#");}
}
 
2)编写消息发送与接受方法
@Slf4j
@RestController
@RequestMapping("/rabbit")
public class Controller {@Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/send")public void send() {rabbitTemplate.convertAndSend("chengfu.news", "hello rabbitmq");log.info("发送消息成功");}@RabbitListener(queues = {"chengfu.news"})public void listen(String message) {System.out.println("接收到消息:" + message);}
}
 
3)进行测试
###
POST http://localhost:8080/rabbit/send
 
日志:
2025-10-29T15:47:43.645+08:00  INFO 16948 --- [rabbitmq-demo] [nio-8080-exec-6] com.chengfu.rabbitmqdemo.Controller      : 发送消息成功
接收到消息:hello rabbitmq
 
4.业务改造
案例需求:改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用,改为基于RabbitMQ的异步通知。
 如图:
 
 说明,我们只关注交易服务,步骤如下:
- 定义topic类型交换机,命名为
pay.topic - 定义消息队列,命名为
mark.order.pay.queue - 将
mark.order.pay.queue与pay.topic绑定,BindingKey为pay.success - 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到
pay.topic,发送消息的RoutingKey为pay.success,消息内容是订单id - 交易服务监听
mark.order.pay.queue队列,接收到消息后更新订单状态为已支付 
4.1.配置MQ
不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
 1)添加依赖:
  <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
 
2)配置MQ地址:
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
 
4.2.接收消息
在trade-service服务中定义一个消息监听类
 其代码如下:
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.order.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}
}
 
4.3.发送消息
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:
private final RabbitTemplate rabbitTemplate;@Override
@Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
}BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
}
 
技术不要为了用而用,适合场景才重要。
