当前位置: 首页 > news >正文

返利app的消息队列架构:基于RabbitMQ的异步通信与解耦实践

返利app的消息队列架构:基于RabbitMQ的异步通信与解耦实践

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

在返利app的业务流程中,用户下单、返利计算、佣金到账、消息通知等环节存在强依赖关系——传统同步调用模式下,若“返利计算服务”响应延迟,会导致整个下单流程卡顿,甚至引发连锁故障。为解决这一问题,我们引入RabbitMQ消息队列,基于“生产者-交换机-队列-消费者”架构,实现服务间异步通信与业务解耦,将下单流程响应时间从500ms缩短至150ms,系统峰值吞吐量提升2倍。以下从架构设计、核心组件实现、业务场景落地三方面展开,附完整代码示例。
返利app

一、返利app RabbitMQ架构设计

1.1 架构分层与组件职责

针对返利app的业务特性,设计三层消息通信架构,各组件职责如下:

  • 生产者层:各微服务(订单服务、用户服务、返利服务)作为生产者,将业务事件(如“订单创建”“返利生成”)封装为消息发送至RabbitMQ;
  • 中间件层:RabbitMQ通过交换机(Exchange)与队列(Queue)的绑定关系,实现消息路由——采用Topic交换机支持按规则匹配路由,Fanout交换机实现广播通知;
  • 消费者层:下游服务(如通知服务、统计服务)作为消费者,监听指定队列,异步处理消息,避免与上游服务强耦合。

1.2 核心业务消息流转路径

以“用户下单”场景为例,消息流转路径为:

  1. 订单服务(生产者)创建“订单创建”消息,发送至order-exchange交换机;
  2. 交换机按路由键order.created,将消息路由至order-confirm-queue(商家确认队列)与rebate-calculate-queue(返利计算队列);
  3. 商家服务监听order-confirm-queue,异步处理订单确认;返利服务监听rebate-calculate-queue,异步计算返利金额;
  4. 返利服务计算完成后,作为生产者发送“返利生成”消息至rebate-exchange,由通知服务监听队列并发送用户到账通知。

二、RabbitMQ核心组件代码实现

2.1 消息生产者封装(通用发送组件)

基于Spring AMQP封装通用消息发送组件,支持指定交换机、路由键与消息属性,代码如下:

package cn.juwatech.rebate.mq.producer;import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;/*** RabbitMQ通用消息生产者*/
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;/*** 发送消息(带确认机制,确保消息可靠投递)* @param exchange 交换机名称* @param routingKey 路由键* @param message 消息体*/public void sendMessage(String exchange, String routingKey, Object message) {// 1. 生成消息唯一ID(用于消息确认与追踪)String messageId = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(messageId);// 2. 设置消息确认回调(确保消息到达交换机)rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {if (ack) {// 消息成功到达交换机System.out.printf("消息[%s]已到达交换机,exchange:%s%n", messageId, exchange);} else {// 消息投递失败,可记录日志并触发重试System.err.printf("消息[%s]投递交换机失败,原因:%s%n", messageId, cause);}});// 3. 设置消息返回回调(交换机无法路由时触发)rabbitTemplate.setReturnsCallback(returned -> {System.err.printf("消息[%s]路由失败,routingKey:%s,原因:%s%n",messageId, returned.getRoutingKey(), returned.getReplyText());// 路由失败处理:如发送至死信队列handleReturnedMessage(returned, message);});// 4. 发送消息rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);}/*** 处理路由失败的消息(发送至死信队列)*/private void handleReturnedMessage(org.springframework.amqp.core.ReturnedMessage returned, Object message) {String deadExchange = RabbitMqConfig.DEAD_LETTER_EXCHANGE;String deadRoutingKey = RabbitMqConfig.DEAD_LETTER_ROUTING_KEY;rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, new CorrelationData(UUID.randomUUID().toString()));}
}

2.2 RabbitMQ配置类(交换机、队列、绑定关系)

通过配置类定义业务所需的交换机、队列及绑定规则,包含死信队列配置以处理失败消息,代码如下:

package cn.juwatech.rebate.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ交换机、队列、绑定关系配置*/
@Configuration
public class RabbitMqConfig {// 1. 订单相关配置public static final String ORDER_EXCHANGE = "order-exchange";public static final String ORDER_CONFIRM_QUEUE = "order-confirm-queue";public static final String REBATE_CALCULATE_QUEUE = "rebate-calculate-queue";public static final String ROUTING_KEY_ORDER_CREATED = "order.created";// 2. 返利相关配置public static final String REBATE_EXCHANGE = "rebate-exchange";public static final String REBATE_NOTIFY_QUEUE = "rebate-notify-queue";public static final String ROUTING_KEY_REBATE_GENERATED = "rebate.generated";// 3. 死信队列配置public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter";/*** 1. 声明死信交换机与死信队列(处理失败消息)*/@Beanpublic DirectExchange deadLetterExchange() {// Direct交换机:精确匹配路由键return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}@Beanpublic Queue deadLetterQueue() {// 持久化队列,避免消息丢失return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}@Beanpublic Binding deadLetterBinding() {// 绑定死信交换机与队列return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}/*** 2. 声明订单交换机(Topic类型,支持模糊匹配路由键)*/@Beanpublic TopicExchange orderExchange() {return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}/*** 声明订单确认队列(绑定死信交换机,消息消费失败时转发)*/@Beanpublic Queue orderConfirmQueue() {return QueueBuilder.durable(ORDER_CONFIRM_QUEUE)// 配置死信交换机.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)// 配置死信路由键.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)// 配置消息过期时间(30分钟).withArgument("x-message-ttl", 1800000).build();}/*** 声明返利计算队列*/@Beanpublic Queue rebateCalculateQueue() {return QueueBuilder.durable(REBATE_CALCULATE_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).withArgument("x-message-ttl", 1800000).build();}/*** 绑定订单交换机与订单确认队列*/@Beanpublic Binding orderConfirmBinding() {return BindingBuilder.bind(orderConfirmQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 绑定订单交换机与返利计算队列*/@Beanpublic Binding rebateCalculateBinding() {return BindingBuilder.bind(rebateCalculateQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 3. 声明返利交换机与通知队列*/@Beanpublic TopicExchange rebateExchange() {return ExchangeBuilder.topicExchange(REBATE_EXCHANGE).durable(true).build();}@Beanpublic Queue rebateNotifyQueue() {return QueueBuilder.durable(REBATE_NOTIFY_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).build();}@Beanpublic Binding rebateNotifyBinding() {return BindingBuilder.bind(rebateNotifyQueue()).to(rebateExchange()).with(ROUTING_KEY_REBATE_GENERATED);}
}

2.3 消息消费者实现(业务处理)

以“返利计算消费者”为例,监听rebate-calculate-queue队列,异步处理订单返利计算,代码如下:

package cn.juwatech.rebate.mq.consumer;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.service.RebateCalculateService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 返利计算消息消费者*/
@Component
@RequiredArgsConstructor
public class RebateCalculateConsumer {private final RebateCalculateService rebateCalculateService;/*** 监听返利计算队列,处理订单返利* @param orderDTO 订单数据(消息体)*/@RabbitListener(queues = RabbitMqConfig.REBATE_CALCULATE_QUEUE)public void handleRebateCalculate(OrderDTO orderDTO) {try {System.out.printf("开始处理订单返利,订单ID:%s,用户ID:%s%n", orderDTO.getOrderId(), orderDTO.getUserId());// 调用返利计算服务(核心业务逻辑)rebateCalculateService.calculateRebate(orderDTO);// 手动确认消息(默认AUTO模式,此处显式确认确保业务处理完成)// 注:若使用AUTO模式,方法无异常则自动确认,抛出异常则拒绝并重回队列} catch (Exception e) {System.err.printf("订单返利处理失败,订单ID:%s,原因:%s%n", orderDTO.getOrderId(), e.getMessage());// 消费失败处理:可记录日志,触发告警,避免消息重复重试throw new RuntimeException("返利计算失败,消息将转发至死信队列", e);}}
}

2.4 业务层消息发送示例(订单服务)

订单服务创建订单后,通过生产者发送“订单创建”消息,触发后续异步流程,代码如下:

package cn.juwatech.rebate.service.impl;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.mq.producer.RabbitMqProducer;
import cn.juwatech.rebate.service.OrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** 订单服务实现类*/
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {private final RabbitMqProducer rabbitMqProducer;// 省略订单DAO层依赖...@Override@Transactional(rollbackFor = Exception.class)public void createOrder(OrderDTO orderDTO) {// 1. 保存订单数据(本地事务)saveOrder(orderDTO);// 2. 发送订单创建消息(异步触发商家确认与返利计算)rabbitMqProducer.sendMessage(RabbitMqConfig.ORDER_EXCHANGE,RabbitMqConfig.ROUTING_KEY_ORDER_CREATED,orderDTO);System.out.printf("订单创建成功,订单ID:%s,消息已发送%n", orderDTO.getOrderId());}// 省略订单保存逻辑...
}

三、消息可靠性保障与性能优化

3.1 可靠性保障措施

  1. 消息持久化:交换机、队列均配置为durable=true,消息发送时设置deliveryMode=2(持久化),避免RabbitMQ重启丢失消息;
  2. 生产者确认:通过ConfirmCallback确保消息到达交换机,ReturnsCallback处理路由失败消息,转发至死信队列;
  3. 消费者确认:采用手动确认模式(或AUTO模式结合异常处理),确保业务逻辑执行完成后再确认消息,避免消息丢失;
  4. 死信队列:配置消息过期时间(TTL)与死信路由,消费失败的消息最终进入死信队列,避免无限重试导致系统资源浪费。

3.2 性能优化策略

  1. 消息批量发送:对高频低时延要求的场景(如用户行为日志),采用rabbitTemplate.convertAndSend批量发送,减少网络请求次数;
  2. 消费者线程池配置:通过spring.rabbitmq.listener.simple.concurrencymax-concurrency设置消费者线程池大小,默认1-10,根据业务调整为5-20;
  3. 队列分片:对高流量队列(如rebate-calculate-queue),按用户ID哈希拆分多个队列(如rebate-calculate-queue-1-4),分散消费压力;
  4. 消息压缩:对大体积消息(如包含商品详情的订单数据),发送前通过Gzip压缩,接收后解压,减少网络传输与存储开销。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!


文章转载自:

http://gCJebWrv.zqcsj.cn
http://tWKNGmnV.zqcsj.cn
http://Hak5OQ8g.zqcsj.cn
http://a9hXwOM3.zqcsj.cn
http://7rfFfrld.zqcsj.cn
http://33WgoIQ2.zqcsj.cn
http://g5AwAits.zqcsj.cn
http://5LBUjspy.zqcsj.cn
http://VyyaoFYU.zqcsj.cn
http://JwfdB7px.zqcsj.cn
http://PpwVCGFz.zqcsj.cn
http://5foYoUSX.zqcsj.cn
http://G2WdWVTD.zqcsj.cn
http://fDbKPIk4.zqcsj.cn
http://C6ysrN6j.zqcsj.cn
http://z87rje98.zqcsj.cn
http://k3S05ddS.zqcsj.cn
http://xyL6oXNh.zqcsj.cn
http://cmXuoucV.zqcsj.cn
http://KqKUGLaN.zqcsj.cn
http://bDLyOgvD.zqcsj.cn
http://RtGbyFyR.zqcsj.cn
http://ecPdJubn.zqcsj.cn
http://uRxUGtUW.zqcsj.cn
http://iKeVRgch.zqcsj.cn
http://u4Iii7Kv.zqcsj.cn
http://4Arj6E2N.zqcsj.cn
http://W0TisoZl.zqcsj.cn
http://RcxF08yp.zqcsj.cn
http://ni2REmUP.zqcsj.cn
http://www.dtcms.com/a/383102.html

相关文章:

  • React Native架构革命:从Bridge到JSI性能飞跃
  • Qt---描述网络请求QNetworkRequest
  • XLua教程之Lua调用C#
  • 第七章:AI进阶之------条件语句(if-elif-else)(一)
  • 从希格斯玻色子到QPU:C++在高能物理与量子计算领域的跨界征程与深度融合
  • 二、vue3后台项目系列——安装相关依赖、项目常用辅助开发工具
  • Knockout.js 备忘录模块详解
  • VS2022下载+海康SDK环境配置实现实时预览
  • 前端基础 —— C / JavaScript基础语法
  • 手搓一个 DELL EMC Unity存储系统健康检查清单
  • 字节M3-Agent:如何实现一个支持多模态长期记忆与推理的Agent
  • TCL华星计划投建第8.6代印刷OLED产线
  • Qt学习:moc生成的元对象信息
  • Java—JDBC 和数据库连接池
  • 软件工程实践四:MyBatis-Plus 教程(连接、分页、查询)
  • 用 Go 快速上手 Protocol Buffers
  • Java Stream 流学习笔记
  • Linux线程id与简易封装线程实现
  • 公链分析报告 - Secret Network
  • JavaScript 简单链表题目试析
  • 【ZYNQ开发篇】Petalinux和电脑端的静态ip地址配置
  • 电商AI导购系统的模型部署架构:TensorFlow Serving在实时推荐中的实践
  • 光射三缝实验
  • K8s部署 Redis 主从集群
  • Android点击桌面图库应用启动流程trace分析
  • 【抗量子安全】全球视角下 PQC 与 QKD 技术洞察:政策引领与产业演进
  • 代码随想录学习摘抄day9(回溯1-11)
  • 数据处理指令
  • SpringBoot 中 ZK 与 Kafka 节点选择逻辑:底层原理与实践解析
  • 事务与mysql数据库锁的关系