RabbitMQ 异步化抗洪实战
说明:本文仅展示架构思路与安全片段,所有敏感字段已用占位符;不含可直接复刻的生产细节。数据与接口均为演示/虚拟。
0. 背景与目标
长耗时/不确定接口(如对接第三方机器人平台)的同步阻塞,容易造成请求堆积与峰值雪崩。本次改造目标:
接口 202 Accepted + 任务轮询,释放前端/网关压力
消息队列削峰:生产入队 → 手动 ack 消费 → 失败入 DLQ
幂等键(
X-Request-Id
)与任务态存储,防重下发、可追踪动态配置(示例以配置中心为主),便于多环境切换
结果:写请求在峰值时平稳可控,消费端有序处理;失败统一入 DLQ 供排障复盘。
1. 轻量数据流
Client --POST--> Gateway --→ API| ||<--202 + taskId----------||--GET /tasks/{id} 轮询-->|API --(Produce)--> RabbitMQ(Topic) --→ Consumer(手动 ack) --→ 第三方平台|成功/失败|状态落库/缓存 <--+失败 → DLQ
注意:仅示意关键节点,隐藏了内部服务名、完整路由与业务字段。
2. 动态配置
# DataId: robot-mq-public.yml
spring:rabbitmq:host: ${ENV_MQ_HOST}port: ${ENV_MQ_PORT:5672}username: ${ENV_MQ_USER}password: ${ENV_MQ_PASS}virtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemq:exchange: robot.task.topic.public # 非生产命名queue: robot.task.q.public # 非生产命名rk: robot.task.dispatch.public # 非生产命名dlx: robot.task.dlx.publicdlq: robot.task.dlq.publicdlrk: "#"async:idem-ttl-seconds: 3600
说明:占位符 ${ENV_*}
仅示意变量化做法;真实值不要写进文章或仓库。
3. 关键代码片段
以下为骨架式片段,只展示接口与关键点,隐藏了内部实现、异常策略、业务字段映射等细节。
3.1 交换机/队列与手动 ack
@Configuration
public class MqBasicsConfig {@Value("${mq.exchange}") private String exchange;@Value("${mq.queue}") private String queue;@Value("${mq.rk}") private String rk;@Value("${mq.dlx}") private String dlx;@Value("${mq.dlq}") private String dlq;@Value("${mq.dlrk}") private String dlrk;@Beanpublic Declarables mqDeclarables() {Queue q = QueueBuilder.durable(queue).withArgument("x-dead-letter-exchange", dlx).withArgument("x-dead-letter-routing-key", dlrk).build();TopicExchange biz = ExchangeBuilder.topicExchange(exchange).durable(true).build();Binding b = BindingBuilder.bind(q).to(biz).with(rk);TopicExchange dead = ExchangeBuilder.topicExchange(dlx).durable(true).build();Queue deadQ = QueueBuilder.durable(dlq).build();Binding db = BindingBuilder.bind(deadQ).to(dead).with(dlrk);return new Declarables(biz, q, b, dead, deadQ, db);}@Beanpublic SimpleRabbitListenerContainerFactory manualAckFactory(ConnectionFactory cf,MessageConverter converter) {var f = new SimpleRabbitListenerContainerFactory();f.setConnectionFactory(cf);f.setMessageConverter(converter);f.setAcknowledgeMode(AcknowledgeMode.MANUAL);return f;}
}
3.2 生产者(骨架)
@Component
public class TaskProducer {private final RabbitTemplate tpl;@Value("${mq.exchange}") private String exchange;@Value("${mq.rk}") private String rk;public TaskProducer(RabbitTemplate tpl) { this.tpl = tpl; }/** 仅演示:设置 CorrelationData 绑定 taskId,方便确认回调 */public void send(String taskId, Object payload, @Nullable String requestId) {MessagePostProcessor mpp = msg -> {msg.getMessageProperties().setMessageId(taskId);if (requestId != null) msg.getMessageProperties().setHeader("X-Request-Id", requestId);return msg;};tpl.convertAndSend(exchange, rk, payload, mpp, new CorrelationData(taskId));}
}
3.3 消费者(骨架,手动 ack + 幂等占位)
@Slf4j
@Component
public class TaskConsumer {@RabbitListener(queues = "${mq.queue}", containerFactory = "manualAckFactory")public void onMessage(Message message, Channel channel) throws Exception {String taskId = message.getMessageProperties().getMessageId();String reqId = (String) message.getMessageProperties().getHeaders().get("X-Request-Id");try {// TODO 幂等占位:检查 reqId / taskId 是否已处理// TODO 业务占位:调用第三方平台(已脱敏),记录状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {log.error("consume failed, taskId={}", taskId, ex);// 直接拒绝入 DLQ:公开文章不贴重试/回退策略细节channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}
3.4 接口契约(202 + 轮询)(骨架)
@RestController
@RequestMapping("/external/async")
public class AsyncController {// POST 接口:只返回 202 + taskId(隐藏业务入参/第三方字段)@PostMapping("/command")public ResponseEntity<Map<String, Object>> postCommand(@RequestHeader(value="X-Request-Id", required=false) String reqId) {String taskId = UUID.randomUUID().toString();// TODO 写入 PENDING 状态(缓存/DB),并生产 MQ 消息return ResponseEntity.accepted().body(Map.of("taskId", taskId));}// 轮询接口:返回 PENDING / DONE / FAILED(隐藏具体结果结构)@GetMapping("/tasks/{taskId}")public Map<String, Object> query(@PathVariable String taskId) {// TODO 读取存储的任务态与简要结果return Map.of("taskId", taskId, "status", "PENDING");}
}
骨架片段仅演示“怎么做”,刻意省略:完整领域模型、重试/退避参数、幂等键落盘、第三方错误分型、限流灰度策略等生产细节。
4. 测试与验收要点
压测入口:仅关注POST→202 延迟与消费者侧消费速率,而非业务同步 RT
人为制造失败,确认DLQ 入列与排障链路可达
幂等:同
X-Request-Id
重复请求不导致重复外呼限流:网关/服务内双层策略命中时,优先保证系统稳定
监控:最少监控入队速率、堆积深度、消费速率、DLQ 数量
5. 常见坑位
回调未开启:生产者没开 confirm/return,消息“丢哪儿了”难定位
死信配置缺失:Nack(false,false) 却没有 DLX/DLQ
幂等只在接口层:消费者未做幂等,仍可能二次下游外呼