微服务商城-商品微服务
数据表
CREATE TABLE `product` (`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品id',`cateid` smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT '类别Id',`name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',`subtitle` varchar(200) NOT NULL DEFAULT '' COMMENT '商品副标题',`images` varchar(1024) NOT NULL DEFAULT '' COMMENT '图片地址,逗号分隔',`detail` varchar(1024) NOT NULL DEFAULT '' COMMENT '商品详情',`price` decimal(20,2) NOT NULL DEFAULT 0 COMMENT '价格,单位-元保留两位小数',`stock` int(11) NOT NULL DEFAULT 0 COMMENT '库存数量',`status` int(6) NOT NULL DEFAULT 1 COMMENT '商品状态.1-在售 2-下架 3-删除',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),KEY `ix_cateid` (`cateid`),KEY `ix_update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表';CREATE TABLE `category` (`id` smallint(6) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '分类id',`parentid` smallint(6) NOT NULL DEFAULT 0 COMMENT '父类别id当id=0时说明是根节点,一级类别',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '类别名称',`status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '类别状态1-正常,2-已废弃',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品类别表';CREATE TABLE `product_operation` (`id` bigint unsigned NOT NULL AUTO_INCREMENT,`product_id` bigint unsigned NOT NULL DEFAULT 0 COMMENT '商品id',`status` int NOT NULL DEFAULT '1' COMMENT '运营商品状态 0-下线 1-上线',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),KEY `ix_update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品运营表';
商品微服务方法
获取商品缓存
func (l *ProductLogic) Product(in *product.ProductItemRequest) (*product.ProductItem, error) {v, err, _ := l.svcCtx.SingleGroup.Do(fmt.Sprintf("product:%d", in.ProductId), func() (interface{}, error) {return l.svcCtx.ProductModel.FindOne(l.ctx, in.ProductId)})if err != nil {return nil, err}p := v.(*model.Product)return &product.ProductItem{ProductId: p.Id,Name: p.Name,Stock: p.Stock,}, nil
}
查询缓存中的信息,如果缓存中不存在product:id, 从数据库中查询
获取多个商品
func (l *ProductsLogic) Products(in *product.ProductRequest) (*product.ProductResponse, error) {products := make(map[int64]*product.ProductItem)pdis := strings.Split(in.ProductIds, ",")ps, err := mr.MapReduce(func(source chan<- interface{}) {for _, pid := range pdis {source <- pid}}, func(item interface{}, writer mr.Writer, cancel func(error)) {pidStr := item.(string)pid, err := strconv.ParseInt(pidStr, 10, 64)if err != nil {return}p, err := l.svcCtx.ProductModel.FindOne(l.ctx, pid)if err != nil {return}writer.Write(p)}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {var r []*model.Productfor p := range pipe {r = append(r, p.(*model.Product))}writer.Write(r)})if err != nil {return nil, err}for _, p := range ps.([]*model.Product) {products[p.Id] = &product.ProductItem{ProductId: p.Id,Name: p.Name,}}return &product.ProductResponse{Products: products}, nil
}
map函数将所有的pid写入source
reduce函数取出pid 查询数据库得到商品信息
final函数将查询结果聚合到一个切片中
获取商品列表
根据类别id获取指定类别的商品列表
首先判断是否存在缓存,如果不存在则查询数据库,且写入缓存
获取上架商品的信息
获取商品操作表中的为上架状态的商品id,获取相应的商品列表
修改商品库存
直接扣减mysql库存数量
检查并修改商品库存
使用lua脚本先判断再扣减
检查商品库存是否足够扣减
查询mysql判断
回滚库存
直接操作数据库
扣减库存 分布式服务
参考:https://juejin.cn/post/7051205679217901599
果是在单体架构的业务当中,是不需要用到分布式事务的.单体架构中,涉及到需要保证多个事务同时成功的场景,只需要创建一个全局的事务对象 如:tx := db.Begin(),然后统一用这一个tx去管理接下来的业务逻辑即可。
绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。
处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。
因为这里的产品库存更新是跨服务操作的,也没有办法使用本地事务来处理,所以我们需要使用分布式事务来处理它。这里我们需要借助 DTM 的 SAGA 协议来实现订单创建和产品库存更新的跨服务分布式事务操作。
- 将dtm注册到etcd中
# 微服务
MicroService:Driver: 'dtm-driver-gozero' # 要处理注册/发现的驱动程序的名称Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址EndPoint: 'dtm:36790'
- 添加 dtm_barrier 数据表
- 我们需要为 product rpc 服务添加 DecrStock、DecrStockRevert 两个接口方法,分别用于产品库存更新 和 产品库存更新的补偿。
- 实现 DecrStock 接口方法
在这里只有库存不足时,我们不需要再重试,直接回滚。 - 在 DecrStockRevert 接口方法中,产品库存是减去指定的数量,在这里我们把它给加回来。这样产品库存就回到在 DecrStock 接口方法减去之前的数量。
package logicimport ("context""mall/service/order/api/internal/svc""mall/service/order/api/internal/types""mall/service/order/rpc/types/order""mall/service/product/rpc/product""github.com/dtm-labs/dtmgrpc""github.com/zeromicro/go-zero/core/logx""google.golang.org/grpc/status"
)type CreateLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {return &CreateLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *CreateLogic) Create(req *types.CreateRequest) (resp *types.CreateResponse, err error) {// 获取 OrderRpc BuildTargetorderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()if err != nil {return nil, status.Error(100, "订单创建异常")}// 获取 ProductRpc BuildTargetproductRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()if err != nil {return nil, status.Error(100, "订单创建异常")}// dtm 服务的 etcd 注册地址var dtmServer = "etcd://etcd:2379/dtmservice"// 创建一个gidgid := dtmgrpc.MustGenGid(dtmServer)// 创建一个saga协议的事务saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).Add(orderRpcBusiServer+"/order.Order/Create", orderRpcBusiServer+"/order.Order/CreateRevert", &order.CreateRequest{Uid: req.Uid,Pid: req.Pid,Amount: req.Amount,Status: 0,}).Add(productRpcBusiServer+"/product.Product/DecrStock", productRpcBusiServer+"/product.Product/DecrStockRevert", &product.DecrStockRequest{Id: req.Pid,Num: 1,})// 事务提交err = saga.Submit()if err != nil {return nil, status.Error(500, err.Error())}return &types.CreateResponse{}, nil
}
商品API
func (l *ProductDetailLogic) ProductDetail(req *types.ProductDetailRequest) (resp *types.ProductDetailResponse, err error) {var (p *product.ProductItemcs *reply.CommentsResponse)if err := mr.Finish(func() error {var err errorif p, err = l.svcCtx.ProductRPC.Product(l.ctx, &product.ProductItemRequest{ProductId: req.ProductID}); err != nil {return err}return nil}, func() error {var err errorif cs, err = l.svcCtx.ReplyRPC.Comments(l.ctx, &reply.CommentsRequest{TargetId: req.ProductID}); err != nil {logx.Errorf("get comments error: %v", err)}return nil}); err != nil {return nil, err}var comments []*types.Commentfor _, c := range cs.Comments {comments = append(comments, &types.Comment{ID: c.Id,Content: c.Content,})}return &types.ProductDetailResponse{Product: &types.Product{ID: p.ProductId,Name: p.Name,},Comments: comments,}, nil
}
mr.Finsh 处理并发任务的结果
代码实现了两个远程调用的并发执行:一个获取产品信息,另一个获取评论。通过 mr.Finish 管理这两个操作,确保执行的顺序和错误处理。第一个操作如果失败会直接返回错误,而第二个操作即使失败也仅仅记录错误,不影响整体流程。
索引缓存
怎么在缓存中存储分类的商品呢?我们使用Sorted Set来存储,member为商品的id,即我们只在Sorted Set中存储缓存索引,查出缓存索引后,因为我们自动生成了以主键id索引为key的缓存,所以查出索引列表后我们再查询行记录缓存即可获取商品的详情,Sorted Set的score为商品的创建时间。
- 首先先从缓存中读取当前页的商品id索引,调用cacheProductList方法,注意,这里调用查询缓存方法忽略了error,为什么要忽略这个error呢,因为我们期望的是尽最大可能的给用户返回数据,也就是redis挂掉了的话那我们就会从数据库查询数据返回给用户,而不会因为redis挂掉而返回错误。
- 如果从缓存中查出的数据为0条,那么我们就从数据库中查询该分类下的数据,这里要注意从数据库查询数据的时候我们要限制查询的条数,我们默认一次查询300条,因为我们每页大小为10,300条可以让用户下翻30页,大多数情况下用户根本不会翻那么多页,所以我们不会全部加载以降低我们的缓存资源,当用户真的翻页超过30页后,我们再按需加载到缓存中
- 获取到当前页的数据后,我们还需要做去重,因为如果我们只以createTime作为游标的话,很可能数据会重复,所以我们还需要加上id作为去重条件,去重逻辑如下
- 如果没有命中缓存的话,我们需要把从数据库查出的数据写入缓存,这里需要注意的是如果数据已经到了末尾需要加上数据结束的标识符,即val为-1,score为0,这里我们异步的写会缓存,因为写缓存并不是主逻辑,不需要等待完成,写失败也没有影响呢,通过异步方式降低接口耗时
func (l *ProductListLogic) ProductList(in *product.ProductListRequest) (*product.ProductListResponse, error) {// 判断类别是否存在_, err := l.svcCtx.CategoryModel.FindOne(l.ctx, int64(in.CategoryId))if err == model.ErrNotFound {return nil, status.Error(codes.NotFound, "category not found")}// 设置游标为当前时间if in.Cursor == 0 {in.Cursor = time.Now().Unix()}// 如果页大小为0,则设置为默认的大小if in.Ps == 0 {in.Ps = defaultPageSize}var (isCache, isEnd boollastID, lastTime int64firstPage []*product.ProductItemproducts []*model.Product)// 查询缓存中的数据pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))// 满一页if len(pids) == int(in.Ps) {isCache = true// 判断是否结束if pids[len(pids)-1] == -1 {isEnd = true}products, err := l.productsByIds(l.ctx, pids)if err != nil {return nil, err}// 商品数据for _, p := range products {firstPage = append(firstPage, &product.ProductItem{ProductId: p.Id,Name: p.Name,CreateTime: p.CreateTime.Unix(),})}} else {var (err errorctime = time.Unix(in.Cursor, 0).Format("2006-01-02 15:04:05"))// 查询数据库products, err = l.svcCtx.ProductModel.CategoryProducts(l.ctx, ctime, int64(in.CategoryId), defaultLimit)if err != nil {return nil, err}var firstPageProducts []*model.Product// 分页if len(products) > int(in.Ps) {firstPageProducts = products[:int(in.Ps)]} else {firstPageProducts = productsisEnd = true}for _, p := range firstPageProducts {firstPage = append(firstPage, &product.ProductItem{ProductId: p.Id,Name: p.Name,CreateTime: p.CreateTime.Unix(),})}}if len(firstPage) > 0 {pageLast := firstPage[len(firstPage)-1]lastID = pageLast.ProductIdlastTime = pageLast.CreateTimeif lastTime < 0 {lastTime = 0}for k, p := range firstPage {if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {firstPage = firstPage[k:]break}}}ret := &product.ProductListResponse{IsEnd: isEnd,Timestamp: lastTime,ProductId: lastID,Products: firstPage,}// 添加缓存if !isCache {threading.GoSafe(func() {if len(products) < defaultLimit && len(products) > 0 {endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")products = append(products, &model.Product{Id: -1, CreateTime: endTime})}_ = l.addCacheProductList(context.Background(), products)})}return ret, nil
}
func (l *ProductListLogic) cacheProductList(ctx context.Context, cid int32, cursor, ps int64) ([]int64, error) {// 上下文、开始的时间游标、结束限、当前页码、页大小pairs, err := l.svcCtx.BizRedis.ZrevrangebyscoreWithScoresAndLimitCtx(ctx, categoryKey(cid), cursor, 0, 0, int(ps))if err != nil {return nil, err}var ids []int64for _, pair := range pairs {id, _ := strconv.ParseInt(pair.Key, 10, 64)ids = append(ids, id)}return ids, nil
}