Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统
Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统
一、Spring AMQP 是什么?
Spring AMQP(Application Messaging Protocol)是 Spring
官方提供的对 AMQP
协议的封装,其核心模块有两个:
spring-amqp
: 提供AMQP
抽象封装spring-rabbit
:RabbitMQ
的具体实现
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了
RabbitTemplate
工具,用于发送消息
常见的场景包括:
- 微服务之间的异步通信
- 秒杀系统削峰
- 用户注册发送邮件/短信通知
- 分布式事务的最终一致性方案
二、Spring Boot 集成 RabbitMQ
2.1. 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
它会自动引入 spring-rabbit 和 spring-amqp 模块。
2.2. 配置 RabbitMQ
spring:rabbitmq:host: 192.168.184.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
三、快速构建消息系统
- 一个消息队列
- 一个消息发送者
- 一个消息监听者(消费者)
构建示例项目:
mq-demo
:父工程,管理项目依赖publisher
:消息的发送者consumer
:消息的消费者
引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>
3.1.消息发送
在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送:
package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
3.2.消息接收
在consumer
服务的com.itheima.consumer.listener
包中新建一个类SpringRabbitListener
,代码如下:
package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
四、WorkQueues模型
4.1. 介绍
Work Queues
(工作队列)又叫 任务队列(Task Queues),主要用于将一个任务分发给多个消费者(工作线程)处理,每个任务只会被一个消费者处理。
核心思想是:生产者只管发送任务,多个消费者竞争获取任务并处理,达到并发消费、分担压力的目的。
Producer
(生产者):发送任务消息。Queue
(队列):缓存任务。Consumer
(消费者):从队列中获取任务并处理。
每个任务只会被一个消费者处理,多个消费者之间互不干扰。
4.2. 消息发送
在publisher
服务中的SpringAmqpTest
类中添加一个测试方法实现循环发送:
/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
4.3. 消息接收
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
多个消费者监听同一个队列,消息将被平均分配(默认轮询方式)。
4.4. 公平分发 vs 轮询分发
🔁 默认行为:轮询分发
RabbitMQ 默认采用 Round-Robin(轮询) 分发方式,消费者不论是否处理完当前消息,下一条消息仍然会发给它。
这可能导致:处理慢的消费者积压任务,处理快的消费者反而闲着。
✅ 公平分发(prefetch)
设置每个消费者的最大未确认消息数,让 RabbitMQ 只向空闲的消费者发送消息。
spring:rabbitmq:listener:simple:prefetch: 1 # 每个消费者同一时间只能处理1条消息