当前位置: 首页 > news >正文

[Java实战]Spring Boot整合RabbitMQ:实现异步通信与消息确认机制(二十七)

[Java实战]Spring Boot整合RabbitMQ:实现异步通信与消息确认机制(二十七)

摘要:本文通过完整案例演示Spring Boot与RabbitMQ的整合过程,深入讲解异步通信原理与消息可靠性保证机制。包含交换机类型选择、消息持久化配置、手动ACK确认等核心功能实现。

一、RabbitMQ核心概念

1.1 异步通信的优势

  • 系统解耦:生产者和消费者独立运行
  • 流量削峰:应对突发流量冲击
  • 异步处理:提升接口响应速度
  • 失败重试:通过死信队列实现异常处理

1.2 核心组件说明

组件作用类比现实场景
Producer消息生产者快递发货方
Consumer消息消费者快递收货方
Exchange消息路由规则制定者快递分拣中心
Queue消息存储队列快递暂存仓库
Binding交换机和队列的绑定规则快递配送路线

二、环境准备与配置

2.1 快速部署RabbitMQ

# 使用Docker启动RabbitMQ(带管理界面)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 docker.1ms.run/rabbitmq:3-management

2.2 Spring Boot依赖配置

<!-- pom.xml -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# application.yml
spring:rabbitmq:host: localhostport: 5672username: adminpassword: 123456virtual-host: /# 开启发送方确认机制publisher-confirm-type: correlated# 开启发送方回退模式publisher-returns: true

三、基础消息收发实现

3.1 队列与交换机配置

@Configuration
public class RabbitMQConfig {// 订单队列@Beanpublic Queue orderQueue() {return new Queue("order.queue", true); // 持久化队列}// 直连交换机@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order.exchange");}// 绑定关系@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routingKey");}
}

通过RabbitMQ管理界面创建(可选)

  1. 打开浏览器,访问 http://localhost:15672
  2. 使用用户名 admin 和密码 123456 登录。
  3. 在左侧菜单中选择 Queues,点击 Add a new queue,输入队列名称 order.queue,并勾选 Durable
  4. 在左侧菜单中选择 Exchanges,点击 Add a new exchange,输入交换机名称 order.exchange,选择类型为 Direct,并勾选 Durable
  5. 在交换机页面,点击 Bindings 标签,点击 Add binding,选择队列 order.queue,输入路由键 order.routingKey

3.2 消息生产者

@Service
@RequiredArgsConstructor
public class OrderProducer {private final RabbitTemplate rabbitTemplate;public void sendOrder(Order order) {// 消息唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("order.exchange", "order.routingKey", order,message -> {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},correlationData);}
}

3.3 消息消费者

@Component
@Slf4j
public class OrderConsumer {@RabbitListener(queues = "order.queue")public void processOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {// 业务处理逻辑log.info("收到订单消息: {}", order);// 手动ACK确认channel.basicAck(tag, false);} catch (Exception e) {// 消息重试或进入死信队列channel.basicNack(tag, false, true);}}
}

3.4 编写controller测试

    @PostMapping("/sendMs")public void sendMs(@RequestBody Ms ms){Order o = new Order();o.setId(1L);o.setName("test 1234");// 发送消息orderProducer.sendOrder(o);}

测试截图:

在这里插入图片描述

四、消息可靠性保障

4.1 消息确认机制

Producer Broker Consumer 发送消息 Confirm回调 投递消息 发送ACK 删除消息 Producer Broker Consumer
4.1.1 生产者确认模式
  • Confirm模式:确保消息到达Broker
  • Return模式:处理不可路由消息
@Slf4j
@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@Autowiredpublic RabbitConfirmCallback(RabbitTemplate rabbitTemplate) {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息到达交换机,ID: {}", correlationData.getId());} else {log.error("消息投递失败,ID: {},原因: {}", correlationData.getId(), cause);}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.warn("消息无法路由,即将返回给生产者: {}", returned.getMessage());}
}
4.1.2 消费者手动ACK
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认retry:enabled: true          # 开启重试max-attempts: 3       # 最大重试次数

五、高级特性应用

5.1 死信队列配置

@Bean
public Queue deadLetterQueue() {return QueueBuilder.durable("dead.letter.queue").build();
}@Bean
public DirectExchange deadLetterExchange() {return new DirectExchange("dead.letter.exchange");
}@Bean
public Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter.routingKey");
}@Bean
public Queue orderQueueWithDLX() {return QueueBuilder.durable("order.queue").deadLetterExchange("dead.letter.exchange").deadLetterRoutingKey("dead.letter.routingKey").build();
}

5.2 消息TTL设置

// 队列级别TTL
@Bean
public Queue ttlQueue() {return QueueBuilder.durable("ttl.queue").ttl(60000) // 单位毫秒.build();
}// 消息级别TTL
MessageProperties props = MessagePropertiesBuilder.newInstance().setExpiration("30000") .build();
rabbitTemplate.convertAndSend(exchange, routingKey, message, props);

六、性能优化建议

  1. 连接池配置
spring:rabbitmq:cache:channel:size: 50      # 通道缓存数量checkout-timeout: 1000 # 获取通道超时时间
  1. 消费端限流
spring:rabbitmq:listener:simple:prefetch: 10 # 每个消费者最大未确认数
  1. 消息压缩
    在消息头中添加压缩标识:
message.getMessageProperties().setHeader("compression", "gzip");

七、常见问题排查

问题1:消息重复消费

  • 解决方案:实现幂等校验(Redis原子操作/数据库唯一约束)

问题2:队列消息堆积

  • 临时方案:增加消费者实例
  • 长期方案:优化消费逻辑性能

问题3:连接自动断开

  • 检查心跳配置:
    spring:rabbitmq:requested-heartbeat: 60 # 心跳间隔(秒)
    

参考资料

RabbitMQ官方文档

希望本教程对您有帮助,请点赞❤️收藏⭐关注支持!欢迎在评论区留言交流技术细节!

相关文章:

  • day29 python深入探索类装饰器
  • 给大模型“贴膏药”:LoRA微调原理说明书
  • Java面试实战:从Spring Boot到分布式缓存的深度探索
  • 多指标组合策略思路
  • Vue3学习(组合式API——provide和inject)(跨多层级组件通信/跨多层级共享数据)
  • java加强 -多线程 -创建与常用方法
  • 如何完美安装GPU版本的torch、torchvision----解决torch安装慢 无法安装 需要翻墙安装 安装的是GPU版本但无法使用的GPU的错误
  • ​Docker 网络
  • vue3_flask实现mysql数据库对比功能
  • 一款适配国内的视频软件,畅享大屏与局域网播放
  • sparkSQL读入csv文件写入mysql(2)
  • STM32SPI实战-Flash模板
  • html文件cdn一键下载并替换
  • 计算机图形学中MVP变换的理论推导
  • R for Data Science(3)
  • windows环境下c语言链接sql数据库
  • Spring 框架线程安全的五大保障策略解析
  • 山东大学计算机图形学期末复习11——CG13上
  • NAT(网络地址转换)逻辑图解+实验详解
  • symfonos: 2靶场
  • 半年不到再换岗:伊春市委常委、政法委书记方春彪任伊春森工集团党委书记
  • 出走的苏敏阿姨一路走到了戛纳,这块红毯因她而多元
  • 男子入户强奸高龄独居妇女致其死亡,法院:属实,已执行死刑
  • 首次采用“顶置主星+侧挂从星”布局,长二丁“1箭12星”发射成功
  • 中国-拉共体成员国重点领域合作共同行动计划(2025-2027)
  • 外企聊营商|波音速度:创新审批促“起飞”