当前位置: 首页 > news >正文

基于可靠消息确保分布式事务的最终一致性:以电商系统中订单服务的新建订单为例

文章目录

  • 应用场景分析
  • 概念分析:基于可靠消息的最终一致性
  • 在 Go 当中使用 RocketMQ
    • 发送普通的同步消息
    • 消费消息
    • 发送延时消息
    • 发送事务消息
  • 实操篇之订单系统开发:基于可靠消息的分布式事务最终一致性解决方案 and. 订单超时归还方案
    • 跨服务调用与本地事务执行可能存在的数据不一致情况分析
    • 基于可靠消息的最终一致性方案解决数据不一致问题
    • 最终的方案实现:基于 Go 编写 gRPC 的服务接口
      • CreateOrder:订单服务的新建订单接口
      • OrderListener:执行本地事务以及实现事务回查的逻辑
        • OrderListener
        • ExecuteLocalTransaction
        • CheckLocalTransaction
      • Sell:库存服务的扣减库存接口
      • AutoReback:消费者端的库存自动归还接口
      • OrderTimeout:订单超时未支付时归还扣减的库存

应用场景分析

今天我们来学习“基于可靠消息确保分布式事务最终一致性”的解决方案,该方案基于 MQ 当中的可靠消息确保生产者(服务 A)发出的消息不会丢失,消费者(服务 B)对这条消息的消费状态一定可知,如果消费者确认消费消息,就代表分布式事务顺利执行,否则代表分布式事务执行失败,需要回滚分布式事务。

为了便于学习理解,我们设置一个应用场景,即:在商城系统当中,用户下订单之后,服务端的「订单服务」会开始新建订单,订单创建好之后,需要跨服务调用「库存服务」来根据订单当中的商品号及商品数量扣减相应的库存。此外,如果订单新建之后用户没有及时支付,订单应该判定为超时,占用的库存应该在超时之后被退还。

对上述场景进行分析,我们不难发现订单服务与库存服务之间涉及到跨服务的分布式事务一致性。此外,在订单服务当中,还涉及到本地的数据库事务,也就是在新建订单的时候需要将订单表保存到数据库当中。因此,在这个场景当中,我们要做的就是确保本地事务 + 分布式事务的一致性。由于还涉及到“订单超时归还”功能的实现,因此综合上述分析,采用“基于 MQ 可靠消息的最终一致性”的技术选型最合适,以 RocketMQ 为例,它不仅可以发送半消息(事务消息),还可以发送延时消息,用于订单超时归还。
在这里插入图片描述

概念分析:基于可靠消息的最终一致性

基于可靠消息的最终一致性方案是目前使用最广泛的分布式事务解决方案,它也是一种基于 MQ 的最终一致性方案,但是最大的区别在于其使用到的消息是 RocketMQ 当中的事务消息

单纯的基于 MQ 的最终一致性方案存在的问题是,服务调用链的上游服务发送到 MQ 当中的消息未必是可靠的,如果消息已经发送了,那么下游的其他服务一定会消费消息。RocketMQ 的事务消息引入了“确认机制”,也就是说如果 MQ 当中的消息经过确认之后才对消费者可见,否则可以随时撤回。

画板

较大的缺点在于:整个事务对 RocketMQ 这个中间件的依赖程度很高,因此需要对 RocketMQ 进行维护。

在 Go 当中使用 RocketMQ

Go 的 RocketMQ 客户端是:apache/rocketmq-client-go。

我目前使用的开发环境是 Apple Silicon 的 MacOS,可以通过 Docker 拉取 RocketMQ 服务实例来在本地进行服务开发测试,使用 M 系列芯片的 MacOS 通过 Docker 安装 RocketMQ 的教程可以参考这篇文章:https://blog.csdn.net/Acloasia/article/details/130548105。(除了需要拉取 Docker 镜像,还需要在本地配置 broker 的持久化,如果电脑重启,那么再次开机之后,分配的局域网 IP 地址可能会变化,此时就需要修改broker.conf,然后重启两个容器)

发送普通的同步消息

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.1.3:9876"})) // 设置 NameServer 的 IP:Port, 也就是本机在局域网的 IP 地址, 根据需要修改if err != nil {panic("生成 Producer 失败.")}err = p.Start() // 启动 Producerif err != nil {panic("启动 producer 失败.")}// 按照发送消息的特点对消息进行划分, 可以划分为同步发送/异步发送/单向发送// p.SendSync()		同步发送// p.SendAsync()	异步发送// p.SendOneWay()	单向发送res, err := p.SendSync(context.Background(), primitive.NewMessage("TestRocketMQ", []byte("This is a message for testing"))) // 涉及到网络的接口调用一般都会加入 context 这个参数// ⬆️ 使用 primitive.NewMessage({Topic}, {Message, byte slice}) 来通过让 Producer 给 Topic 发送一条消息if err != nil {fmt.Printf("消息发送失败: %s\n", err)} else {fmt.Printf("消息发送成功: %s\n", res.String()) // res.String() 获取发送成功的信息}err = p.Shutdown()if err != nil {panic("关闭 producer 失败.")}
}

消费消息

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive""time"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.1.3:9876"}), // 设置 NameServer 的 IP:Port, 也就是本机在局域网的 IP 地址, 根据需要修改consumer.WithGroupName("GeeShop"), // GroupName 标识一个集群, 通过设定 GroupName 指定消费者为一组) // push consumer: broker 当中的 Topic 一旦有数据, 就推送给 Consumer. 与之相对的, pull consumer 主动从服务器拉取.if err != nil {panic("消费者初始化失败.")}if err = c.Subscribe("TestRocketMQ", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {// 这个函数是一个匿名函数, 当然也可以在外部定义一个函数// 它接收的参数是 context 和 *primitive.MessageExt// 这个函数用于处理接收到消息之后处理消息的逻辑for i := range msgs {fmt.Printf("获取值: %v\n", msgs[i])}// 返回 consumer.ConsumeSuccess, 消息就被成功消费掉了, 不会再重新出现return consumer.ConsumeSuccess, nil}); err != nil {panic("读取消息失败")}err = c.Start()if err != nil {panic("消费者启动失败.")}// 不能让 main goroutine 退出, 因为拉取消息是异步的time.Sleep(1 * time.Hour)err = c.Shutdown()if err != nil {panic("消费者关闭失败.")}
}

发送延时消息

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.1.3:9876"})) // 设置 NameServer 的 IP:Port, 也就是本机在局域网的 IP 地址, 根据需要修改if err != nil {panic("生成 Producer 失败.")}err = p.Start() // 启动 Producerif err != nil {panic("启动 producer 失败.")}// 发送延时消息msg := primitive.NewMessage("TestRocketMQ", []byte("This is a message for testing delay message~"))msg.WithDelayTimeLevel(2)                         // 设置延时级别, 这在 RocketMQ 当中是固定的res, err := p.SendSync(context.Background(), msg) // 涉及到网络的接口调用一般都会加入 context 这个参数// ⬆️ 使用 primitive.NewMessage({Topic}, {Message, byte slice}) 来通过让 Producer 给 Topic 发送一条消息if err != nil {fmt.Printf("消息发送失败: %s\n", err)} else {fmt.Printf("消息发送成功: %s\n", res.String()) // res.String() 获取发送成功的信息}err = p.Shutdown()if err != nil {panic("关闭 producer 失败.")}/*为什么需要延时消息?在支付的时候(比如12306/大麦网), 由于资源有限, 同时为了确保用户体验, 用户下订单之后, 会首先扣减库存, 并要求用户在某个时间范围内支付订单. 如果用户不支付, 那么到期的时候就应该归还库存.为了实现上述方法, 最简单的策略是开启一个线程进行轮询, 但是轮询存在许多问题, 比如轮询的时间很难把控, 归还库存的时间不确定, 用户体验非常差.因此可以通过采用 RocketMQ 的延时消息, 时间一到就会归还库存, 避免了多余的轮询. 此外, 消息中包含了订单编号, 我们可以直接查看订单编号.*/
}

发送事务消息

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer""time"
)type OrderListener struct {}	// 这个 struct 本身的属性不重要, 重要的是绑定两个方法func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 执行本地事务, 并返回本地事务的执行状态// 在这个函数体当中可以随时返回半消息的状态, 比如 Commit 或 Rollback, Commit 就提交半消息, Rollback 就回滚半消息return primitive.CommitMessageState	// 投递消息
}func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {return primitive.RollbackMessageState	// 不投递消息
}func main() {p, err := rocketmq.NewTransactionProducer(&OrderListener{},producer.WithNameServer([]string{"192.168.1.3:9876"}),	// 设置 NameServer 的 IP:Port, 也就是本机在局域网的 IP 地址, 根据需要修改) /*  TransactionLister 这个接口具有两个方法, 分别是 ExecuteLocalTransaction 和 CheckLocalTransaction.1. ExecuteLocalTransaction: 在 Producer 发送半消息成功之后, 执行其中的逻辑2. CheckLocalTransaction: 当半消息在 MQ 的消息太长时, 会执行消息回查, 确定半消息是 Commit 还是 Rollback.*/if err != nil {panic("生成 Producer 失败.")}err = p.Start() // 启动 Producerif err != nil {panic("启动 producer 失败.")}res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TestRocketMQ_TransationMessage", []byte("This is a message for testing TransactionMessage")))// 发送事务消息之后就开始执行 ExecuteLocalTransaction 函数体if err != nil {fmt.Printf("消息发送失败: %s\n", err)} else {fmt.Printf("消息发送成功: %s\n", res.String()) // res.String() 获取发送成功的信息}time.Sleep(1 * time.Hour)	// 休眠一段时间, 测试事务回查if err = p.Shutdown(); err != nil {panic("关闭 Producer 失败.")}}

实操篇之订单系统开发:基于可靠消息的分布式事务最终一致性解决方案 and. 订单超时归还方案

由于订单系统涉及到库存扣减,因此与订单相关的接口需要确保强一致性。由于新建订单的接口需要跨服务调用库存扣减的接口,因此需要确保分布式事务的一致性。

跨服务调用与本地事务执行可能存在的数据不一致情况分析

无论是分布式事务还是本地数据库事务,我们最不愿意看到的状态就是事务执行的中间状态,比如:

  1. 本地订单创建失败,但是库存被扣减了;
  2. 本地订单创建成功,但是库存扣减失败了。

我们希望看到的是:订单创建成功,库存就扣减成功;订单创建失败,库存也扣减失败。这两种最终一致的情况我们都可以接受。

采取先跨服务调用,再执行本地事务的方案,其弊端如下:

  • 如果先进行库存扣减,再执行本地事务,如果本地事务执行失败了,就应该执行回滚,此时应该同时调用库存服务的库存归还接口,归还扣减了的库存。此时如果库存服务宕机了,会导致库存归还接口不能够被成功调用,从而导致数据不一致。
  • 如果本地事务并未失败,而是本地代码出现了没有被捕获的异常,导致本地服务的代码挂掉了,此时我们不知道本地事务的执行情况如何(不知道事务是成功提交了还是没有成功提交),但是库存已经扣减了,这就会带来数据的不一致。

画板

既然先跨服务调用,再执行本地事务,可能会产生本地事务失败,导致跨服务调用的结果无法回滚的情况产生,那么先执行本地事务,再进行跨服务调用,能否解决数据不一致的问题呢?答案仍然是「不能」:

  • 本地事务执行完成后,发送网络请求,跨服务调用库存扣减接口,库存扣减之后,由于网络拥塞或抖动,成功执行的响应没能够按时到达本地服务。当本地服务达到超时重试的次数上限时,会误认为库存服务宕掉了,遂回滚本地事务,但是库存确实扣减了,导致数据不一致。
  • 本地事务执行完成后,发送网络请求,此时本地服务突然宕掉了,导致本地事务没能成功提交,但是库存服务确实被调用了,导致数据不一致。

画板

因此不管是先执行本地事务,还是先执行跨服务调用,都不能确保数据最终一致。

基于可靠消息的最终一致性方案解决数据不一致问题

我们采用基于可靠消息的分布式事务一致性方案解决数据不一致的问题。但是发送的消息我们要进行一些逻辑上的优化。在初学这个主题时,我认为发送的半消息应该是与库存扣减有关的,比如我们在订单服务端首先发送一条半消息到 RocketMQ,然后执行本地事务,如果本地事务执行成功,那么就提交半消息扣减库存,否则回滚半消息,但这样做存在的问题是,提交半消息后库存可能已经不够了,导致「库存没有扣减,但是订单创建了」的不一致情况。

因此,优化后的逻辑是,首先直接跨服务调用库存扣减,如果扣减成功,再向 RocketMQ 发送一条「库存归还」的半消息,之后我们执行本地事务,如果订单服务的本地事务执行成功,那么就回滚「库存归还」的半消息,否则如果本地事务失败,就提交「库存归还」的半消息,这样就确保了库存扣减时一定有库存可扣,如果连库存都没有了,那么跨服务调用直接失败,执行不到半消息发送以及本地事务这些环节。

此外,如果我们设置半消息的逻辑是进行「库存归还」,还可以顺便实现「订单超时未支付时库存归还」的功能。

最终的方案实现:基于 Go 编写 gRPC 的服务接口

CreateOrder:订单服务的新建订单接口

func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {/*新建订单1. 从购物车中获取到选中的商品2. 商品的价格自己查询 - 访问商品服务 (跨微服务)3. 库存的扣减 - 访问库存服务 (跨微服务)4. 订单的基本信息表 - 订单的商品信息表5. 从购物车中删除已购买的记录*/orderListener := OrderListener{Ctx:ctx}	// OrderListener 是事务消息的监控器// 需要实现消息发送后执行事务以及消息状态回查两个接口来确保半消息的发送或回滚p, err := rocketmq.NewTransactionProducer(	// 新建发送事务消息的 Producer&orderListener,	// 绑定我们自己实现的 OrderListenerproducer.WithNameServer([]string{"192.168.0.104:9876"}), // 绑定 NameServer)if err != nil {zap.S().Errorf("生成producer失败: %s", err.Error())return nil, err}if err = p.Start(); err != nil {zap.S().Errorf("启动producer失败: %s", err.Error())return nil, err}order := model.OrderInfo{OrderSn: GenerateOrderSn(req.UserId),	// 新建订单编号, 这一步一定要在 CreateOrder 中完成Address: req.Address,					// 下面都是一些与订单相关的信息SignerName: req.Name,SingerMobile: req.Mobile,Post: req.Post,User: req.UserId,}//应该在消息中具体指明一个订单的具体的商品的扣减情况jsonString, _ := json.Marshal(order)		// 将 order 信息通过 json 编码, 因为 MQ 只能发送 []byte_, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString))	// order_reback 是 RocketMQ 的 Topicif err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")}if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)}return &proto.OrderInfoResponse{Id:orderListener.ID, OrderSn:order.OrderSn, Total:orderListener.OrderAmount}, nil
}

在 CreateOrder 这个接口当中我们需要做的就是为订单初始化一个唯一的订单编号,然后将订单的相关信息编码为 JSON 字符串,通过 Producer 发送半消息到 RocketMQ。消息一经发送,就开始执行 OrderListener 实现的本地事务逻辑。此外,OrderListener 还需要实现消息回查的逻辑,以确保因网络抖动或服务宕机而导致消息滞留在 MQ 当中时,可以通过消息回查的逻辑回滚或发送滞留的半消息。

OrderListener:执行本地事务以及实现事务回查的逻辑

需要明确的一点是,OrderListener 实际上是实现了 RocketMQ 监控事务消息的接口,这个接口需要 struct 实现两个方法,分别是ExecuteLocalTransactionCheckLocalTransaction。前者会执行具体的事务逻辑,以确认当前发送的半消息是 Commit 还是 Rollback,而后者在消息状态不确定时执行回查的逻辑,以确认滞留的半消息是 Commit 还是 Rollback。

由于 OrderListener 是一个 struct,其在实现了接口相应的方法的基础上,可以添加一些保存事务执行状态的成员,以方便消息发送方查看事务的执行状态,比如 CreateOrder 中久出现了相关的用法:

/* ... ... ... */
if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)
}
/* ... ... ... */
OrderListener

OrderListener 的定义如下:

type OrderListener struct{Code codes.Code			// 记录事务的执行状态Detail string			// 如果事务执行期间出错, 记录详细信息, 以在日志中记录错误细节ID int32				// 消息的 IDOrderAmount float32		// 订单金额Ctx context.Context		// 并发控制
}

它还需要进一步实现 ExecuteLocalTransaction 以及 CheckLocalTransaction 两个方法。

ExecuteLocalTransaction

ExecuteLocalTransaction 接口的实现方法如下,它的参数就是 Message,也就是我们在 CreateOrder 当中发送给 MQ 的消息,它是一个 JSON 结构的字符串,其中记载了订单编号相关的信息,下面我直接对 ExecuteLocalTransaction 这个 OrderListener 的方法进行注释解析。在解读代码之前,首先要明确的是,这个方法实现的是半消息发送到 RocketMQ 之后,执行分布式事务的逻辑,半消息承载的是“库存放回”的逻辑,因此如果分布式事务执行失败,就需要 Commit 这个消息以归还库存,否则 Rollback 消息代表分布式事务成功执行。我们首先通过 gRPC 跨服务进行库存扣减服务的调用,如果这一步失败了,说明库存不足,没有继续进行下去的必要,否则开始执行本地事务,即新建订单表。本地事务如果失败,那么 Commit 半消息归还库存,否则,若本地事务成功,则分布式事务执行结束,Rollback 半消息并发送一条延时消息,到期检查订单表的支付状态,如果订单仍然未支付,再由另一个消费者通知库存服务归还库存。以下是 ExecuteLocalTransaction 的代码及注释:

func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 1. 从购物车中获取选中的商品, 选中商品的 checked 状态为 truevar orderInfo model.OrderInfo	_ = json.Unmarshal(msg.Body, &orderInfo)	// 通过 Unmarshal 解析 JSON 字符串var shopCarts []model.ShoppingCart			// 记录购物车中选中的商品if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {o.Code = codes.InvalidArgumento.Detail = "没有选中结算商品"// 本地没有选中结算的商品, 不需要扣减库存(连 Sell 方法都还没有被调用), 因此回滚「归还库存」的半消息return primitive.RollbackMessageState}var goodsIds []int32                  // 拿商品的 ID, 这样才能跨服务查商品的金额goodsNumsMap := make(map[int32]int32) // 建立一个 hashmap, 从 good 的 Id 获取 good 的 pricefor _, shopCart := range shopCarts {goodsIds = append(goodsIds, shopCart.Goods)goodsNumsMap[shopCart.Goods] = shopCart.Nums}// 2. 跨微服务调用商品服务, 用于查找上一步所选中商品的价格goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})if err != nil {	// 由于跨服务调用商品服务来查询商品信息不涉及有限资源的变动, 因此商品服务不需要确保数据一致性, 因为商品服务是请求资源, 不是变更资源o.Code = codes.InvalidArgumento.Detail = "批量查询商品信息失败"// 只要还没有执行 Sell, 就可以直接 Rollback 半消息return primitive.RollbackMessageState}var orderAmount float32					// 订单总金额var orderGoods []*model.OrderGoods		// 订单商品var goodsInvInfo []*proto.GoodsInvInfo	// 商品库存信息for _, good := range goods.Data {orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])orderGoods = append(orderGoods, &model.OrderGoods{Goods:      good.Id,GoodsName:  good.Name,GoodsPrice: good.ShopPrice,GoodsImage: good.GoodsFrontImage,Nums:       goodsNumsMap[good.Id],})goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{GoodsId: good.Id,Num:     goodsNumsMap[good.Id],})}// 3. 跨微服务调用库存服务, 扣减库存if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {o.Code = codes.ResourceExhaustedo.Detail = "扣减库存失败"return primitive.RollbackMessageState}// 至此, 库存扣减已经成功了, 分布式事务的第一步已经完成, 后面就需要根据本地事务的执行情况判断半消息是 Commit 还是 Rollback// 实际上, 只要本地事务出错, 半消息就要 Commit, 即「归还库存」// 如果执行到最后, 本地事务全部执行成功, 分布式事务完全执行, 就要 Rollback 半消息, 不执行「归还库存」// 同时发送一条延时消息, 处理「订单超时未支付」的特殊场景// 4. 生成两张表// 通过 MySQL 数据库事务来对下述一系列数据库行为进行操作, 确保一致性tx := global.DB.Begin()// 4.1 生成订单表orderInfo.OrderMount = orderAmountif result := tx.Save(&orderInfo); result.RowsAffected == 0 {tx.Rollback()o.Code = codes.Internalo.Detail = "订单创建失败"return primitive.CommitMessageState // 订单创建失败, 提交半消息, 归还库存} // 保存之后, 数据库会自动创建 order 的 Id// 创建的 Id 直接保存在了 order 的 ID 字段当中o.OrderAmount = orderAmounto.ID = orderInfo.IDfor _, orderGood := range orderGoods {orderGood.Order = orderInfo.ID}// 4.2 批量插入 orderGoods, 即插入商品订单表if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {tx.Rollback()o.Code = codes.Internalo.Detail = "批量插入订单商品失败"return primitive.CommitMessageState}if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {tx.Rollback()o.Code = codes.Internalo.Detail = "删除购物车记录失败"return primitive.CommitMessageState} // 从购物车中删除已经购买的记录// 发送延时消息, 处理超时未支付的订单p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.1.3:9876"}))msg = primitive.NewMessage("order_timeout", msg.Body) // 将延时消息发送到另外一个 Topicmsg.WithDelayTimeLevel(16)                            // 根据文档查看不同 level 对应的延迟时间, 将 level 设置为 16, 延时时间就是 30 分钟_, err = p.SendSync(context.Background(), msg)if err != nil {zap.S().Errorf("发送延时消息失败: %v\n", err.Error())tx.Rollback()o.Code = codes.Internalo.Detail = "发送延时消息失败"return primitive.CommitMessageState // 本地事务在发送延时消息的时候失败了, 整个分布式事务应该判定为失败, 所以需要执行库存归还}// 提交事务tx.Commit()o.Code = codes.OKreturn primitive.RollbackMessageState
}
CheckLocalTransaction

由于网络抖动或服务突然宕机,ExecuteLocalTransaction 的过程中当然有可能出现逻辑执行到一半服务崩溃的情况,此时很有可能半消息没有得到 Commit 或 Rollback,滞留在了 MQ 当中。这个时候就需要事务回查的逻辑来确认消息是应该被 Commit 还是应该 Rollback。

由于半消息的逻辑是「归还库存」,所以我们应该查看订单表,确认本地事务的执行是否成功(注意,本地事务的执行要么全部成功,要么全部失败,因此一旦订单表可以被查到,就代表本地事务已经提交并且执行成功,也就代表分布式事务成功了)。

具体的实现如下:

func (o *OrderListener) CheckLocalTransaction(ext *primitive.MessageExt) primitive.LocalTransactionState {// rocketmq 消息回查var orderInfo model.OrderInfo_ = json.Unmarshal(ext.Body, &orderInfo)// 怎么检查之前的逻辑是否完成? 根据 OrderSn, 查询一下之前的订单是否存在即可// 消息回查具体就是直接根据消息当中保存的 OrderSn 来查看订单表中是否有这一条记录即可if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {// 订单不存在, 说明事务保存失败, 即分布式事务的最后一步(本地事务)出错return primitive.CommitMessageState // 但并不能说明库存已经扣减了, 因为可能根本就没有执行到 Sell 这一步, LocalTransaction 就宕机了// 因此, 在进行库存归还时, 必须要做好幂等性的保证, 没有扣减的库存, 就不能归还回去}// 如果订单表中有相应的记录, 说明本地事务的执行肯定是成功了, 即分布式事务的最后一步已经成功完成, 那么就说明分布式事务成功了, 将半消息撤回, 不需要归还库存, 因为订单创建成功了return primitive.RollbackMessageState
}

但是如果订单不存在,本地事务就真的失败了吗?有没有可能根本就没有执行到本地事务,服务就宕机了?这就意味着库存很有可能还没有被扣减,如果此时归还库存,那么就是归还了没有被扣减的库存

此外,消息有没有可能因为网络抖动的原因而重复发送呢?如果库存服务重复消费了消息,同样会导致数据不一致。

根据以上两条分析,在消费者端,即库存服务当中,我们需要进一步实现接口的幂等性,也就是“不要重复地归还库存”以及“不要归还没有扣减的库存”。

Sell:库存服务的扣减库存接口

Sell 是库存服务的扣减库存接口,在订单服务当中,生产者发送半消息之后,需要执行分布式事务,此时订单服务跨服务调用了库存服务的库存扣减接口。

单纯的“库存扣减”其实不难,就是根据传入的商品信息以及购买数量,在 for loop 中查询当前库存并进行扣减即可,如果库存不足,则返回错误。但对于高并发场景,还需要考虑多个服务实例同时对库存进行扣减可能产生的数据不一致,此时就需要引入锁机制来进行并发控制。本例采用 Redis 实现的分布式锁,来进行并发控制(从编码的角度来说其实很简单,使用 Go 的 Redis 分布式锁客户端加锁和释放锁即可)。

需要额外考虑的一点与“库存归还”有关,方才我们已经提到,在分布式事务开启之前,订单服务已经发送了一条“库存归还”的半消息,如果分布式事务执行失败,那么需要归还扣减的库存。但是一种极端的情况是如果分布式事务执行过程中出现网络抖动,或订单服务宕机了,那么半消息没有得到确认,就需要通过事务回查确定消息是发送还是回滚。如果本地事务执行失败,没有订单表创建,我们判定分布式事务执行失败,提交半消息执行库存归还的逻辑,但是订单表创建失败的另一个可能原因是订单服务还没有执行到库存扣减这一步就宕机了,此时归还的库存是还没有被扣减的库存,这显然是不合理的,因此在“库存扣减”这个接口我们就需要考虑到这一点,新引入一张库存扣减细节表「StockSellDetail」来记录库存扣减的细节信息,这样的话,在库存归还时,可以先查这张表确认订单的库存是否被扣减了,只归还被扣减的库存,这样就避免了数据的不一致。

我们通过 GORM 来与数据库交互,因此 StockSellDetail 的定义是:

type GoodsDetail struct {Goods int32Num   int32
}type GoodsDetailList []GoodsDetailtype StockSellDetail struct {OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique;"` // 需要建立 Index, 因为要对它进行查询Status  int32           `gorm:"type:varchar(200)"`                            // 1 表示已扣减; 2 表示已归还, 已经归还的库存不应该重复归还Detail  GoodsDetailList `gorm:"type:varchar(200)"`                            // 一个数组, 详细地记录了每一件商品扣减了多少件// MySQL 没有数组类型, 因此 GORM 将数组转为字符串存储在了数据库表中, GORM 在查询时会将字符串转为数组
}

使用 Status 字段来确定库存是否被扣减,1表示已扣减,2表示已归还(避免重复归还库存)。如果根据OrderSn(包含在消息当中)在这张表上差不到相应的记录,就代表库存根本就没有扣减,不需要归还库存。

在执行 Sell 的逻辑时,我们需要一并创建这张表相应的记录,具体的逻辑详见注释:

func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {// 扣减库存, 需要根据传入的切片, 逐一扣除每个商品的库存// 数据库的一个基本使用场景: 数据库事务, 同时成功或同时失败tx := global.DB.Begin()sellDetail := model.StockSellDetail{	// 设置 StockSellDetail 的记录OrderSn: req.OrderSn,				// 记录扣减库存的订单编号Status:  1, // Status 设置为 1, 表示已经扣减了}var details []model.GoodsDetailfor _, good := range req.GoodsInfo {details = append(details, model.GoodsDetail{Goods: good.GoodsId,Num:   good.Num,})var inv model.Inventory// ⬇️ 加分布式锁扣减库存mutex := global.RS.NewMutex(fmt.Sprintf("goods_%d", good.GoodsId))if err := mutex.Lock(); err != nil {return nil, status.Error(codes.Internal, err.Error())}// 判断参数是否有误// 查询操作可以不写在事务当中, 因为查询不涉及修改数据// 先查询当前商品有多少库存if result := global.DB.Where(&model.Inventory{Goods: good.GoodsId}).First(&inv); result.RowsAffected == 0 {// 参数有误, 事务回滚tx.Rollback()return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")}// 判断库存是否充足if inv.Stocks < good.Num {// 如果商品的库存不足, 那么直接事务回滚tx.Rollback()return nil, status.Errorf(codes.ResourceExhausted, "库存不足")}// 前面的判断没问题, 开始执行库存扣减的逻辑// 注意, 此处在并发场景下会出现数据不一致的问题, 应该使用分布式锁来解决这个问题// 如果库存充足, 扣减相应的库存inv.Stocks -= good.Numtx.Save(&inv)// 保存 inv 之后才释放锁if ok, err := mutex.Unlock(); !ok || err != nil {tx.Rollback()return nil, status.Errorf(codes.Internal, "释放 Redis 分布式锁异常")}}sellDetail.Detail = details// 写 sellDetail 表if result := tx.Create(&sellDetail); result.RowsAffected == 0 {tx.Rollback()return nil, status.Errorf(codes.Internal, "保存库存扣减历史失败")}tx.Commit() // 需要自己手动提交操作, 这样才能够执行事务return &emptypb.Empty{}, nil
}

至此,我们已经实现了 Sell 接口,最后还需要实现 AutoReback 接口,它是库存服务自动归还库存的消息消费逻辑。

AutoReback:消费者端的库存自动归还接口

AutoReback 是库存服务中库存自动归还的消费者逻辑,当一条半消息得到 Commit 确认提交到 MQ 之后,订阅了“order_reback”这个 Topic 的消费者都会收到这样一条消息,根据 AutoReback 当中定义的业务逻辑来对这条消息进行消费。

需要明确的是,方才已经提到,MQ 不能保证消息不被重复发送。此外,由于半消息的逻辑是“归还库存”,AutoReback 也不应该归还没有被扣减的库存,因此 AutoReback 本身需要做好接口的幂等性。

在 Sell 接口当中,我们已经在 StockSellDetail 这张表中对订单的商品库存扣减状态进行了记录,AutoReback 应该率先根据消息提供的OrderSn来查这张表,如果根本查不到记录,就说明库存没有被扣减,不应该归还没有被扣减的库存。而如果这张表能够查到记录,但Status2,就说明这个订单的库存已经被归还过了,不应该重复归还。只有当Status1的时候,才需要归还库存。

代码的实现及逻辑注释如下:

// AutoReback: 自动归还库存, 它是库存服务作为消费者用来消费来自生产者消息的具体业务逻辑, 需要做好接口的幂等性
// 具体来说, 如果接收到了事务消息, 就需要根据消息当中的信息对订单商品进行库存归还.
// 但是需要额外考虑的一个问题是, 发送过来的「库存归还」消息, 对应的库存可能并没有扣减, 不能归还没有被扣减的库存, 所以 AutoReback 应该做好接口的幂等性
func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {type OrderInfo struct {OrderSn string}// 参数是 msgs, 说明可能有多个消息, 使用 for loop + range 处理for i := range msgs {// 既然是归还库存, 那么我们应该具体地知道这个订单中的每件商品应该归还多少(比如, A 商品归还 2, B 商品归还 10), 因此需要记录订单商品的扣减情况// 但是还需要考虑重复归还的问题, 比如因为网络问题, 消息被重发了多次, 或是在新建订单服务的本地事务回查时, 可能执行到「库存扣减」之前服务就挂掉了, 但是本地事务回查仍然需要 Commit 这条「库存归还消息」, 这就会出现「归还了没有扣减的商品」的情况// 所以这个接口应该确保幂等性, 不能因为消息重复发送导致一个订单的库存归还多次, 且没有扣减的库存不应该归还// 如何确保这些都没有问题?(即, 如何确保消费者库存归还接口的幂等性?) 其实很简单, 那就是新建一张表, 这张表记录了详细的订单扣减细节和归还细节// 对消息队列进行监听, 收到的是可以解码的 stringvar orderInfo OrderInfo// 只对 OrderSn 进行 Unmarshall 即可, 因为我们需要通过 OrderSn 来查询 StockSellDetail 这张「库存扣减历史表」if err := json.Unmarshal(msgs[i].Body, &orderInfo); err != nil {zap.S().Errorf("解析 JSON 失败: %v\n", msgs[i].Body)return consumer.ConsumeSuccess, nil // ConsumeSuccess 在此指的是消息成功消费, 直接将消息丢弃}// 去将 inv 的库存加回去, 将 sellDetail 的 status 设置为 2 (标识为已归还, 防止重复归还), 要在事务中进行tx := global.DB.Begin()var sellDetail model.StockSellDetailif result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {// 实际上只能查询到一条数据, 因为 OrderSn 是唯一索引, 使用 First 可以加速// 在查询时指定 Status == 1, 只查询扣减的数据, 因为要对已经被扣减的数据进行归还// 说明已经归还过了return consumer.ConsumeSuccess, nil}// 如果查询到了, 那么逐个归还库存for _, orderGood := range sellDetail.Detail {// update 怎么用?// 先查询一下 inventory 表if result := tx.Model(&model.Inventory{}).Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("stocks+?", orderGood.Num)); result.RowsAffected == 0 {// 代表没有更新上tx.Rollback()// ⬇️ 如果库存归还失败, 那么标记消息的消费状态为 RetryLater, 即重新加入 MQ 稍后再消费return consumer.ConsumeRetryLater, nil}}sellDetail.Status = 2if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).Update("status", 2); result.RowsAffected == 0 {tx.Rollback()return consumer.ConsumeRetryLater, nil}tx.Commit()return consumer.ConsumeSuccess, nil}return consumer.ConsumeSuccess, nil
}

OrderTimeout:订单超时未支付时归还扣减的库存

在订单超时未支付时,我们应该归还这个订单占有的库存,在 ExecuteLocalTransaction 中本地事务执行结束之后,我们进一步发送了一条延时消息给订单服务(订单服务同时是延时消息的生产者和消费者),因此我们需要实现消费这条延时消息的消费者逻辑。

具体的实现在 OrderTimeout 当中,它会消费到来的延时消息。首先需要根据订单的编号查找订单的支付状态,如果订单已经支付,则直接成功消费消息并丢弃。否则,就需要发送消息给订单服务归还库存,此时发送一条普通消息即可。

具体的实现及逻辑注释如下:

func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {var orderInfo model.OrderInfo_ = json.Unmarshal(msgs[i].Body, &orderInfo)//查询订单的支付状态,如果已支付什么都不做,如果未支付,归还库存var order model.OrderInfoif result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {return consumer.ConsumeSuccess, nil}if order.Status != "TRADE_SUCCESS" {// 如果订单的支付状态不是成功支付tx := global.DB.Begin()// 归还库存,我们可以模仿order中发送一个消息到 order_reback 中去// 修改订单的状态为已支付order.Status = "TRADE_CLOSED"tx.Save(&order)p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"}))if err != nil {zap.S().Errorf("生成 Producer 失败")return consumer.ConsumeRetryLater, err}if err = p.Start(); err != nil {zap.S().Errorf("启动 Producer 失败")return consumer.ConsumeRetryLater, err}// 发送一条普通的同步消息, 消息到达 MQ 后 MQ 需要确认消息已送达_, err = p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))if err != nil {tx.Rollback()zap.S().Errorf("消息发送失败: %v\n", err.Error())return consumer.ConsumeRetryLater, err}return consumer.ConsumeSuccess, nil}}return consumer.ConsumeSuccess, nil
}

至此,我们便完整地实现了「基于可靠消息的分布式事务最终一致性」方案,以「电商系统」当中的订单服务为例,我们完整地了解了如何设计一种在订单创建时通过分布式事务确保订单创建与库存扣减最终状态一致的解决方案,并了解了如何通过发送延时消息确保超时未支付的订单被关闭并归还其占有的库存。

相关文章:

  • springboot音乐网站与分享平台
  • wget批量调用shell脚本
  • Missing Semester计算机教育中缺失的一课:Vim
  • 【力扣 简单 C】141. 环形链表
  • LeetCode 第72题:编辑距离(巧妙的动态规划方法)
  • MCP前后端技术研究和应用实践
  • 中科院医学1区Top:解放军医学院利用多组学+网络药理学+转录组测序联合解析苗药七角生白胶囊抗白细胞减少症的分子机制
  • DataHub 架构设计与核心工作原理
  • Python----OpenCV(图像的绘制——绘制直线,绘制矩形,绘制圆形,绘制多边形)
  • win11修改DNS
  • python基础与数据类型
  • 【和春笋一起学C++】(十九)C++函数新特性——对象的引用作为函数参数
  • springAI 大模型应用开发
  • WooCommerce独立站商城的最大优势
  • PCB设计杂谈之一
  • C# 中的Async 和 Await 的用法详解
  • Python应用八股文
  • Java大模型开发入门 (10/15):连接外部世界(下) - 端到端构建完整的RAG问答系统
  • 高效同步Linux服务器文件技巧
  • 计算机网络-自顶向下—第二章应用层-重点复习笔记
  • wordpress shopify/河南seo关键词排名优化
  • 网站推广营销联系方式/深圳百度推广公司
  • 广州做网站好的公司/如何使用免费b站推广网站
  • 深圳正规网站建设/广州seo全网营销
  • 百度免费网站建设/关键词指数
  • 南昌谁做网站设计/自媒体账号申请