当前位置: 首页 > news >正文

霸王餐返利app的分布式架构设计:基于事件驱动的订单处理系统

霸王餐返利app的分布式架构设计:基于事件驱动的订单处理系统

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

在霸王餐返利app的业务场景中,订单处理涉及用户下单、商家确认、返利计算、资金结算等多个环节,传统单体架构面临耦合度高、扩展性差、故障传播快等问题。基于此,我们采用分布式架构+事件驱动模式设计订单处理系统,通过事件解耦服务、异步化处理流程,提升系统吞吐量与稳定性。以下从架构整体设计、核心组件实现、代码示例三方面展开说明。
在这里插入图片描述

一、架构整体设计:事件驱动的分布式分层模型

系统采用四层分布式架构,分别为接入层、业务服务层、事件中间件层、数据存储层,各层通过事件实现松耦合通信,架构图如下(文字描述):

  1. 接入层:负责用户请求转发,采用Nginx+Gateway实现负载均衡与接口鉴权;
  2. 业务服务层:拆分为订单服务、商家服务、返利服务、支付服务等微服务,每个服务专注于单一领域;
  3. 事件中间件层:基于RocketMQ实现事件发布与订阅,确保事件可靠投递;
  4. 数据存储层:采用MySQL分库分表存储订单数据,Redis缓存热点数据,Elasticsearch存储订单日志。

核心设计原则:通过事件驱动实现服务解耦,每个服务仅关注自身业务逻辑,通过订阅事件触发流程,例如用户下单后,订单服务发布“订单创建事件”,商家服务、返利服务订阅该事件分别执行确认订单、计算返利操作。

二、核心组件实现:事件定义与服务交互

2.1 事件模型设计

事件是系统通信的核心载体,需包含事件ID、事件类型、业务数据、时间戳等字段。基于Java实现通用事件模型,代码如下:

package cn.juwatech.bawangcan.event;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.UUID;/*** 通用事件模型*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class BaseEvent {// 事件唯一标识private String eventId;// 事件类型(如ORDER_CREATED、MERCHANT_CONFIRMED)private String eventType;// 业务数据(存储订单ID、用户ID等关键信息)private Map<String, Object> bizData;// 事件创建时间戳private Long createTime;// 构建事件的静态方法public static BaseEvent build(String eventType, Map<String, Object> bizData) {BaseEvent event = new BaseEvent();event.setEventId(UUID.randomUUID().toString().replace("-", ""));event.setEventType(eventType);event.setBizData(bizData);event.setCreateTime(System.currentTimeMillis());return event;}
}

2.2 事件发布与订阅组件

基于RocketMQ封装事件发布者与订阅者,统一事件处理入口,代码如下:

2.2.1 事件发布者(生产者)
package cn.juwatech.bawangcan.event.producer;import cn.juwatech.bawangcan.event.BaseEvent;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;/*** 事件生产者:负责发布事件到RocketMQ*/
@Component
public class EventProducer {@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.namesrv.addr}")private String namesrvAddr;private DefaultMQProducer producer;@PostConstructpublic void init() throws Exception {producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);// 重试次数producer.setRetryTimesWhenSendFailed(3);producer.start();}/*** 发布事件* @param topic 事件主题(如bawangcan_order_topic)* @param event 事件对象*/public void publishEvent(String topic, BaseEvent event) throws Exception {Message message = new Message(topic,event.getEventType(),event.getEventId(),JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8));producer.send(message);}@PreDestroypublic void destroy() {if (producer != null) {producer.shutdown();}}
}
2.2.2 事件订阅者(消费者)
package cn.juwatech.bawangcan.event.consumer;import cn.juwatech.bawangcan.event.BaseEvent;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;/*** 事件消费者:订阅事件并触发业务处理*/
@Component
public class OrderEventConsumer {@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.namesrv.addr}")private String namesrvAddr;private DefaultMQPushConsumer consumer;@PostConstructpublic void init() throws Exception {consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);// 订阅订单相关主题,监听所有标签(事件类型)consumer.subscribe("bawangcan_order_topic", "*");// 注册事件处理监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);BaseEvent event = JSON.parseObject(msgBody, BaseEvent.class);// 根据事件类型分发处理handleEvent(event);} catch (Exception e) {// 消费失败,返回重试状态return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}/*** 事件分发处理*/private void handleEvent(BaseEvent event) {switch (event.getEventType()) {case "ORDER_CREATED":// 处理订单创建事件(如通知商家)handleOrderCreatedEvent(event);break;case "MERCHANT_CONFIRMED":// 处理商家确认事件(如计算返利)handleMerchantConfirmedEvent(event);break;default:throw new IllegalArgumentException("未知事件类型:" + event.getEventType());}}private void handleOrderCreatedEvent(BaseEvent event) {// 业务逻辑:获取订单ID,调用商家服务发送确认通知String orderId = (String) event.getBizData().get("orderId");// 省略商家通知逻辑...System.out.println("订单创建事件处理完成,订单ID:" + orderId);}private void handleMerchantConfirmedEvent(BaseEvent event) {// 业务逻辑:获取订单ID,调用返利服务计算返利String orderId = (String) event.getBizData().get("orderId");// 省略返利计算逻辑...System.out.println("商家确认事件处理完成,订单ID:" + orderId);}@PreDestroypublic void destroy() {if (consumer != null) {consumer.shutdown();}}
}

2.3 订单服务核心逻辑

订单服务作为核心服务,负责订单创建、状态更新,并发布相关事件,代码如下:

package cn.juwatech.bawangcan.service;import cn.juwatech.bawangcan.event.BaseEvent;
import cn.juwatech.bawangcan.event.producer.EventProducer;
import cn.juwatech.bawangcan.mapper.OrderMapper;
import cn.juwatech.bawangcan.model.Order;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;/*** 订单服务*/
@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate EventProducer eventProducer;/*** 创建订单* @param order 订单信息* @return 订单ID*/@Transactional(rollbackFor = Exception.class)public String createOrder(Order order) throws Exception {// 1. 保存订单到数据库orderMapper.insert(order);String orderId = order.getOrderId();// 2. 发布“订单创建事件”Map<String, Object> bizData = new HashMap<>();bizData.put("orderId", orderId);bizData.put("userId", order.getUserId());bizData.put("merchantId", order.getMerchantId());BaseEvent event = BaseEvent.build("ORDER_CREATED", bizData);eventProducer.publishEvent("bawangcan_order_topic", event);return orderId;}/*** 更新订单状态(由商家服务调用)*/public void updateOrderStatus(String orderId, String status) {LambdaUpdateWrapper<Order> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(Order::getOrderId, orderId).set(Order::getStatus, status);orderMapper.update(null, wrapper);}
}

三、关键技术亮点与问题解决

  1. 事件幂等性处理:由于RocketMQ可能存在重复投递,每个事件处理逻辑需实现幂等。例如在handleOrderCreatedEvent方法中,先查询订单状态,仅当状态为“未处理”时执行后续操作;
  2. 分布式事务保障:采用“本地消息表+事务消息”方案,订单服务在保存订单时,同时将事件存入本地消息表,事务提交后再发布事件,确保订单创建与事件发布的一致性;
  3. 系统可观测性:通过SkyWalking实现分布式追踪,每个事件携带traceId,可追踪从订单创建到返利结算的完整链路;同时通过Prometheus监控各服务的事件处理延迟、成功率等指标。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!


文章转载自:

http://nMZyR5rV.mdpcz.cn
http://PqS1xUSO.mdpcz.cn
http://nSFMWeRe.mdpcz.cn
http://YKxPlutz.mdpcz.cn
http://RWKTEFMY.mdpcz.cn
http://L76MX5vf.mdpcz.cn
http://P1xwl2zH.mdpcz.cn
http://pk25yJyY.mdpcz.cn
http://s2pQBMyc.mdpcz.cn
http://pI9eLmbM.mdpcz.cn
http://OTuqUguN.mdpcz.cn
http://1jXlucwG.mdpcz.cn
http://DdOLBV1I.mdpcz.cn
http://H0DxYDfv.mdpcz.cn
http://qH1Oy6Hf.mdpcz.cn
http://7LoWzS89.mdpcz.cn
http://vlsAqQMK.mdpcz.cn
http://oW4JTopY.mdpcz.cn
http://bzNvo8Mr.mdpcz.cn
http://EfstFoEG.mdpcz.cn
http://JXsAUneq.mdpcz.cn
http://qzQwBpmw.mdpcz.cn
http://lpGzGKGb.mdpcz.cn
http://HgIKlG6e.mdpcz.cn
http://eCOVSWNf.mdpcz.cn
http://ATNlOhff.mdpcz.cn
http://evZbBuNu.mdpcz.cn
http://ovNd57j6.mdpcz.cn
http://J4eVT0dP.mdpcz.cn
http://PCgJpL5h.mdpcz.cn
http://www.dtcms.com/a/379852.html

相关文章:

  • Android SystemServer 启动 service源码分析
  • CentOS搭建本地源
  • Python的pip镜像源配置
  • ES6 面试题及详细答案 80题 (55-61)-- 类与继承
  • 云手机在办公领域中自动化的应用
  • Flink面试题及详细答案100道(21-40)- 基础概念与架构
  • 用Python打造专业级老照片修复工具:让时光倒流的数字魔法
  • 第八章:移动端着色器的优化-Mobile Shader Adjustment《Unity Shaders and Effets Cookbook》
  • 前端性能优化:Webpack Tree Shaking 的实践与踩坑前端性能优化:Webpack Tree Shaking 的实践与踩坑
  • 国产凝思debian系Linux离线安装rabbitmq教程步骤
  • how to setup k3s on an offline ubuntu
  • RabbitMQ对接MQTT消息发布指南
  • ⸢ 肆-Ⅰ⸥ ⤳ 默认安全建设方案:d.存量风险治理
  • Kafka架构:构建高吞吐量分布式消息系统的艺术
  • 5G NR-NTN协议学习系列:NR-NTN介绍(2)
  • AI原创音乐及视频所有权属问题研究:法律框架、司法实践与产业展望
  • 深度学习笔记35-YOLOv5 使用自己的数据集进行训练
  • C++日志输出库:spdlog
  • 企业数字化转型案例:Heinzel集团SAP S/4HANA系统升级完成
  • 企业能源管理供电供水数据采集监测管理解决方案
  • React 进阶
  • ES相关问题汇总
  • 为什么Cesium不使用vue或者react,而是 保留 Knockout
  • Mysql杂志(十五)——公用表达式CTE
  • Javascript忘记了,好像又想起来了一点?
  • AI + 制造:NebulaAI 场景实践来了!
  • mosdns缓存dns服务器配置记录
  • android14 硬键盘ESC改BACK按键返回无效问题
  • 代码随想录算法训练营第62天 | Floyd 算法精讲、A * 算法精讲 (A star算法)、最短路算法总结篇、图论总结
  • 教程:用免费 Google Translate API 在 VSCode 中实现中文注释自动翻译英文