RabbitMQ在SpringBoot中的使用详解
📋目录
🎯 RabbitMQ基础概念
什么是RabbitMQ?
核心概念详解
1. 队列(Queue)
2. 交换机(Exchange)
3. 绑定(Binding)
🛠️ 环境搭建
1. 安装RabbitMQ
Windows安装
Docker安装(推荐)
2. 访问管理界面
🚀 SpringBoot集成RabbitMQ
1. 添加依赖
2. 配置文件
⚙️ 基础配置
1. RabbitMQ配置类
📨 简单队列模式
1. 队列配置
2. 生产者
3. 消费者
👥 工作队列模式
1. 配置类
2. 生产者
3. 消费者
📡 发布订阅模式
1. 配置类
2. 生产者
3. 消费者
🎯 路由模式
1. 配置类
2. 生产者
3. 消费者
🏷️ 主题模式
1. 配置类
2. 生产者
3. 消费者
✅ 消息确认机制
1. 生产者确认
2. 消费者确认
💼 实际项目应用
1. 订单处理系统
2. 延迟队列实现
🔧 常见问题解决
1. 消息丢失问题
2. 消息重复消费问题
3. 性能优化
4. 监控和日志
📚 总结
使用建议
最佳实践
🎯 RabbitMQ基础概念
什么是RabbitMQ?
RabbitMQ是一个开源的消息队列中间件,就像一个邮局一样:
- 生产者(Producer):发送邮件的人
- 队列(Queue):邮箱,存放邮件的地方
- 消费者(Consumer):收邮件的人
- 交换机(Exchange):邮局的分拣中心,决定邮件发到哪个邮箱
核心概念详解
1. 队列(Queue)
队列就像一个容器,消息在这里排队等待被处理
[消息1] [消息2] [消息3] → 消费者取走处理
2. 交换机(Exchange)
交换机有4种类型:
- Direct(直连):根据路由键精确匹配
- Fanout(扇出):广播到所有绑定的队列
- Topic(主题):根据路由键模式匹配
- Headers:根据消息头匹配(较少使用)
3. 绑定(Binding)
绑定是交换机和队列之间的连接规则
🛠️ 环境搭建
1. 安装RabbitMQ
Windows安装
# 使用Chocolatey安装
choco install rabbitmq# 启动RabbitMQ服务
rabbitmq-server
Docker安装(推荐)
# 拉取并运行RabbitMQ容器
docker run -d --name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \rabbitmq:3-management
2. 访问管理界面
- 地址:http://localhost:15672
- 用户名:admin
- 密码:123456
🚀 SpringBoot集成RabbitMQ
1. 添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
2. 配置文件
# application.yml
spring:rabbitmq:host: localhostport: 5672username: adminpassword: 123456virtual-host: /# 发送者开启确认模式publisher-confirms: true# 发送者开启return确认机制publisher-returns: true# 设置消费者手动确认listener:simple:acknowledge-mode: manual# 限制每次只处理一个消息prefetch: 1
⚙️ 基础配置
1. RabbitMQ配置类
@Configuration
@EnableRabbit
public class RabbitConfig {/*** 创建RabbitTemplate,用于发送消息*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 设置消息转换器(可选)template.setMessageConverter(new Jackson2JsonMessageConverter());// 设置发送确认回调template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.out.println("消息发送失败:" + cause);}});return template;}
}
📨 简单队列模式
这是最基本的模式:一个生产者发送消息到队列,一个消费者接收消息。
1. 队列配置
@Configuration
public class SimpleQueueConfig {public static final String SIMPLE_QUEUE = "simple.queue";/*** 声明简单队列*/@Beanpublic Queue simpleQueue() {return QueueBuilder.durable(SIMPLE_QUEUE).build();}
}
2. 生产者
@RestController
@RequestMapping("/simple")
public class SimpleProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMapping("/send")public String sendMessage(@RequestParam String message) {// 发送消息到队列rabbitTemplate.convertAndSend(SimpleQueueConfig.SIMPLE_QUEUE, message);return "消息发送成功:" + message;}
}
3. 消费者
@Component
public class SimpleConsumer {/*** 监听简单队列*/@RabbitListener(queues = SimpleQueueConfig.SIMPLE_QUEUE)public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("接收到消息:" + message);// 模拟业务处理Thread.sleep(1000);// 手动确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {try {// 处理失败,拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}}
}
👥 工作队列模式
多个消费者共同处理一个队列中的消息,实现任务分发。
1. 配置类
@Configuration
public class WorkQueueConfig {public static final String WORK_QUEUE = "work.queue";@Beanpublic Queue workQueue() {return QueueBuilder.durable(WORK_QUEUE).build();}
}
2. 生产者
@RestController
@RequestMapping("/work")
public class WorkProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMapping("/send")public String sendWork(@RequestParam String task) {for (int i = 1; i <= 10; i++) {String message = task + " - 任务" + i;rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message);}return "批量任务发送完成";}
}
3. 消费者
@Component
public class WorkConsumer {/*** 工作者1*/@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)public void worker1(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("工作者1处理:" + message);Thread.sleep(2000); // 模拟较慢的处理channel.basicAck(deliveryTag, false);} catch (Exception e) {handleError(channel, deliveryTag);}}