当前位置: 首页 > news >正文

RabbitMQ 异步化抗洪实战

说明:本文仅展示架构思路与安全片段,所有敏感字段已用占位符;不含可直接复刻的生产细节。数据与接口均为演示/虚拟

0. 背景与目标

长耗时/不确定接口(如对接第三方机器人平台)的同步阻塞,容易造成请求堆积与峰值雪崩。本次改造目标:

  • 接口 202 Accepted + 任务轮询,释放前端/网关压力

  • 消息队列削峰:生产入队 → 手动 ack 消费 → 失败入 DLQ

  • 幂等键X-Request-Id)与任务态存储,防重下发、可追踪

  • 动态配置(示例以配置中心为主),便于多环境切换

结果:写请求在峰值时平稳可控,消费端有序处理;失败统一入 DLQ 供排障复盘。

1. 轻量数据流

Client --POST--> Gateway --→ API|                         ||<--202 + taskId----------||--GET /tasks/{id} 轮询-->|API --(Produce)--> RabbitMQ(Topic) --→ Consumer(手动 ack) --→ 第三方平台|成功/失败|状态落库/缓存 <--+失败 → DLQ

注意:仅示意关键节点,隐藏了内部服务名、完整路由与业务字段。

2. 动态配置

# DataId: robot-mq-public.yml
spring:rabbitmq:host: ${ENV_MQ_HOST}port: ${ENV_MQ_PORT:5672}username: ${ENV_MQ_USER}password: ${ENV_MQ_PASS}virtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemq:exchange: robot.task.topic.public       # 非生产命名queue:    robot.task.q.public           # 非生产命名rk:       robot.task.dispatch.public    # 非生产命名dlx:      robot.task.dlx.publicdlq:      robot.task.dlq.publicdlrk:     "#"async:idem-ttl-seconds: 3600

说明:占位符 ${ENV_*} 仅示意变量化做法;真实值不要写进文章或仓库。

3. 关键代码片段

以下为骨架式片段,只展示接口与关键点,隐藏了内部实现、异常策略、业务字段映射等细节。

3.1 交换机/队列与手动 ack

@Configuration
public class MqBasicsConfig {@Value("${mq.exchange}") private String exchange;@Value("${mq.queue}")    private String queue;@Value("${mq.rk}")       private String rk;@Value("${mq.dlx}")      private String dlx;@Value("${mq.dlq}")      private String dlq;@Value("${mq.dlrk}")     private String dlrk;@Beanpublic Declarables mqDeclarables() {Queue q = QueueBuilder.durable(queue).withArgument("x-dead-letter-exchange", dlx).withArgument("x-dead-letter-routing-key", dlrk).build();TopicExchange biz = ExchangeBuilder.topicExchange(exchange).durable(true).build();Binding b = BindingBuilder.bind(q).to(biz).with(rk);TopicExchange dead = ExchangeBuilder.topicExchange(dlx).durable(true).build();Queue deadQ = QueueBuilder.durable(dlq).build();Binding db = BindingBuilder.bind(deadQ).to(dead).with(dlrk);return new Declarables(biz, q, b, dead, deadQ, db);}@Beanpublic SimpleRabbitListenerContainerFactory manualAckFactory(ConnectionFactory cf,MessageConverter converter) {var f = new SimpleRabbitListenerContainerFactory();f.setConnectionFactory(cf);f.setMessageConverter(converter);f.setAcknowledgeMode(AcknowledgeMode.MANUAL);return f;}
}

3.2 生产者(骨架)

@Component
public class TaskProducer {private final RabbitTemplate tpl;@Value("${mq.exchange}") private String exchange;@Value("${mq.rk}")       private String rk;public TaskProducer(RabbitTemplate tpl) { this.tpl = tpl; }/** 仅演示:设置 CorrelationData 绑定 taskId,方便确认回调 */public void send(String taskId, Object payload, @Nullable String requestId) {MessagePostProcessor mpp = msg -> {msg.getMessageProperties().setMessageId(taskId);if (requestId != null) msg.getMessageProperties().setHeader("X-Request-Id", requestId);return msg;};tpl.convertAndSend(exchange, rk, payload, mpp, new CorrelationData(taskId));}
}

3.3 消费者(骨架,手动 ack + 幂等占位)

@Slf4j
@Component
public class TaskConsumer {@RabbitListener(queues = "${mq.queue}", containerFactory = "manualAckFactory")public void onMessage(Message message, Channel channel) throws Exception {String taskId = message.getMessageProperties().getMessageId();String reqId  = (String) message.getMessageProperties().getHeaders().get("X-Request-Id");try {// TODO 幂等占位:检查 reqId / taskId 是否已处理// TODO 业务占位:调用第三方平台(已脱敏),记录状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {log.error("consume failed, taskId={}", taskId, ex);// 直接拒绝入 DLQ:公开文章不贴重试/回退策略细节channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}

3.4 接口契约(202 + 轮询)(骨架)

@RestController
@RequestMapping("/external/async")
public class AsyncController {// POST 接口:只返回 202 + taskId(隐藏业务入参/第三方字段)@PostMapping("/command")public ResponseEntity<Map<String, Object>> postCommand(@RequestHeader(value="X-Request-Id", required=false) String reqId) {String taskId = UUID.randomUUID().toString();// TODO 写入 PENDING 状态(缓存/DB),并生产 MQ 消息return ResponseEntity.accepted().body(Map.of("taskId", taskId));}// 轮询接口:返回 PENDING / DONE / FAILED(隐藏具体结果结构)@GetMapping("/tasks/{taskId}")public Map<String, Object> query(@PathVariable String taskId) {// TODO 读取存储的任务态与简要结果return Map.of("taskId", taskId, "status", "PENDING");}
}

骨架片段仅演示“怎么做”,刻意省略:完整领域模型、重试/退避参数、幂等键落盘、第三方错误分型、限流灰度策略等生产细节。

4. 测试与验收要点

  • 压测入口:仅关注POST→202 延迟与消费者侧消费速率,而非业务同步 RT

  • 人为制造失败,确认DLQ 入列与排障链路可达

  • 幂等:同 X-Request-Id 重复请求不导致重复外呼

  • 限流:网关/服务内双层策略命中时,优先保证系统稳定

  • 监控:最少监控入队速率、堆积深度、消费速率、DLQ 数量


5. 常见坑位

  • 回调未开启:生产者没开 confirm/return,消息“丢哪儿了”难定位

  • 死信配置缺失:Nack(false,false) 却没有 DLX/DLQ

  • 幂等只在接口层:消费者未做幂等,仍可能二次下游外呼


文章转载自:

http://PB5jySpN.nzdks.cn
http://9G3PCECb.nzdks.cn
http://X5FV670M.nzdks.cn
http://9w6xR4ZL.nzdks.cn
http://esRKotTB.nzdks.cn
http://lrUuHdxc.nzdks.cn
http://Z5NW93RQ.nzdks.cn
http://GKi7A6wA.nzdks.cn
http://IThUGAjr.nzdks.cn
http://8sXDJkSc.nzdks.cn
http://H9d94lhG.nzdks.cn
http://ojvodtZi.nzdks.cn
http://LqE5tKJW.nzdks.cn
http://HthqvPIY.nzdks.cn
http://Xqu8gK9d.nzdks.cn
http://Rnvy0wG5.nzdks.cn
http://sN5UmzC7.nzdks.cn
http://YUhw2tXl.nzdks.cn
http://c6dDKLBb.nzdks.cn
http://jza1wKo9.nzdks.cn
http://vL3zcCUM.nzdks.cn
http://ozRAuaSS.nzdks.cn
http://qeBU0UJg.nzdks.cn
http://N4Ki4HzU.nzdks.cn
http://CwXKivAT.nzdks.cn
http://K7mgBeej.nzdks.cn
http://cbnMMJrc.nzdks.cn
http://Szx11OEi.nzdks.cn
http://DfDa8HfB.nzdks.cn
http://NL67YzJO.nzdks.cn
http://www.dtcms.com/a/386622.html

相关文章:

  • 《Java集合框架核心解析》
  • 二维码生成器
  • OSI七层模型
  • 【原创·极简新视角剖析】【组局域网】设备在同一局域网的2个条件
  • 第8课:高级检索技术:HyDE与RAG-Fusion原理与DeepSeek实战
  • Windows 命令行:路径的概念,绝对路径
  • 异常检测在网络安全中的应用
  • 【ubuntu】ubuntu 22.04 虚拟机中扩容操作
  • 【数值分析】05-绪论-章节课后1-7习题及答案
  • Java NIO 核心机制与应用
  • Roo Code 诊断集成功能:智能识别与修复代码问题
  • ANA Pay不再接受海外信用卡储值 日eShop生路再断一条
  • 一阶惯性环节的迭代公式
  • AWS 热门服务(2025 年版)
  • 拷打字节算法面试官之-深入c语言递归算法
  • Vehiclehal的VehicleService.cpp
  • 【传奇开心果系列】基于Flet框架实现的允许调整大小的开关自定义组件customswitch示例模板特色和实现原理深度解析
  • 八股整理xdsm
  • SpringBoot 配置文件详解:从基础语法到实战应用
  • lesson62:JavaScript对象进化:ES2025新特性深度解析与实战指南
  • ARM C1-Premium core简介
  • 机器学习-深度神经网络架构
  • godot+c#实现玩家动画
  • 【Axure高保真原型】标签树分类查询案例
  • 系统架构设计(一)
  • RK3568下QT实简易文件浏览器
  • 设备综合效率(OEE)讲解与计算案例
  • STM32G4 电流环闭环(二) 霍尔有感运行
  • git-gui --批量处理文件
  • 【代码随想录day 28】 力扣 55.跳跃游戏