当前位置: 首页 > news >正文

go-stream(一些常用命令介绍,以及在go-tcp中使用)

一.有关消费组的函数

XGroupCreate 创建消费组

err := rdb.XGroupCreate(ctx, "mystream", "mygroup", "$").Err()
  • mystream:目标 Stream 名称
  • mygroup:消费组名称
  • $:表示从最新消息开始消费(后续添加的消息才会被消费)
  • 注意:若 Stream 不存在会报错,需改用 XGroupCreateMkStream

XGroupCreateMkStream 自动创建流

err := rdb.XGroupCreateMkStream(ctx, "mystream", "mygroup", "$").Err()

功能与 XGroupCreate 相同,但会自动创建不存在的 Stream,推荐优先使用。

XGroupDestroy 删除消费组

err := rdb.XGroupDestroy(ctx, "mystream", "mygroup").Err()

直接删除指定消费组,组内消费者和消息一并清除。

XGroupDelConsumer 删除消费者

res, err := rdb.XGroupDelConsumer(ctx, "mystream", "mygroup", "worker-1").Result()
fmt.Println("删除的消息数:", res)
  • 仅删除消费者,不删除消费组
  • 返回值为该消费者未确认的消息数量

XReadGroup 消费组读取消息

msgs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group:    "mygroup",Consumer: "worker-1",Streams:  []string{"mystream", ">"},Count:    10,Block:    0,
}).Result()
  • >:仅读取未分配的新消息
  • 0:重新读取 Pending 消息
  • Block: 0:无限阻塞等待

XAck 确认消息处理完成

rdb.XAck(ctx, "mystream", "mygroup", "1673451223-0")

确认后消息从 Pending 列表移除,避免重复消费。

XPending 查看未确认消息

pending, err := rdb.XPending(ctx, "mystream", "mygroup").Result()
fmt.Printf("待确认消息数:%d\n", pending.Count)

支持扩展查询:

details, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{Stream: "mystream",Group:  "mygroup",Start:  "-",End:    "+",Count:  10,
}).Result()

XAutoClaim 自动重分配消息

msgs, err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{Stream:   "mystream",Group:    "mygroup",Consumer: "worker-2",MinIdle:  time.Minute, // 超时1分钟未确认的消息Start:    "0-0",Count:    10,
}).Result()

用于将长时间未确认的消息重新分配给其他消费者,实现消息重试机制。

二.向流中写入数据

XAdd —— 向流中添加一条消息

功能是往指定 Stream 中添加一条消息。
代码示例:

id, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream",Values: map[string]interface{}{"user": "alice","msg":  "hello world",},
}).Result()
fmt.Println("添加的消息 ID:", id)

参数说明

  • Stream:流名称
  • Values:要写入的键值对数据
  • ID(可选):消息ID,通常留空让Redis自动生成(格式如 1715678901234-0
  • MaxLen(可选):限制流的最大长度(自动裁剪旧消息)
  • Approx(可选):是否使用近似修剪(性能更高)

示例:限制流长度

id, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream",MaxLen: 1000,   // 限制最多1000条Approx: true,   // 近似修剪Values: map[string]interface{}{"event": "signup","user":  "bob",},
}).Result()

Redis 会自动删除旧消息,保持流大小在 1000 条左右。


XAddN —— 批量写入(如果使用 go-redis v9+)

某些版本支持批量添加多条记录(非官方标准 Redis 命令,而是客户端优化)。
示例

ids, err := rdb.XAddN(ctx, []*redis.XAddArgs{{Stream: "mystream",Values: map[string]interface{}{"msg": "first"},},{Stream: "mystream",Values: map[string]interface{}{"msg": "second"},},
}).Result()
fmt.Println("写入的消息 ID 列表:", ids)

如果不支持 XAddN,可以用循环调用 XAdd 实现同样效果。


XTrim —— 主动修剪流

如果想手动清理旧消息(比如后台定期任务),可以使用 XTrim
示例

trimmed, err := rdb.XTrim(ctx, "mystream", &redis.XTrimArgs{MaxLen: 1000,Approx: true,
}).Result()
fmt.Printf("被删除的消息数量: %d\n", trimmed)


XDel —— 删除指定 ID 的消息

虽然 Redis Stream 一般不常直接删除单条消息,但有时为了清理错误或重复数据可以用:

deleted, err := rdb.XDel(ctx, "mystream", "1715678901234-0").Result()
fmt.Printf("删除了 %d 条消息\n", deleted)

三.从流上读取数据

XRead 基础读取功能

从单个或多个 Stream 直接读取消息,不涉及消费组。

典型用法示例

streams, err := rdb.XRead(ctx, &redis.XReadArgs{Streams: []string{"mystream", "0"}, // 从头开始读取Count:   10,                        // 最多读取10条Block:   0,                         // 阻塞读取(0表示无限等待)
}).Result()

核心特点

  • 无消费组概念,需手动指定读取起点(0$ 或具体消息 ID)。
  • 消息不会被自动确认或标记,适合一次性拉取或测试场景。
  • 适用场景:简单消息拉取、历史数据回溯、调试等。
for _, stream := range result { //通过循环每一条流for _, msg := range stream.Messages { //循环读取每条流的每个信息
}
}

XReadGroup 消费组读取功能

基于消费组机制读取消息,支持多消费者协同处理,适用于生产环境。

典型用法示例

streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group:    "mygroup",      // 消费组名称Consumer: "worker-1",     // 消费者标识Streams:  []string{"mystream", ">"}, // ">" 表示仅读取未分配的新消息Count:    5,              // 每次最多读取5条Block:    0,              // 阻塞等待
}).Result()

核心特点

  • 需预先通过 XGROUP CREATE 创建消费组。
  • 参数 > 表示只读取未分配的消息;指定 ID 可处理未确认消息(如故障恢复)。
  • 需调用 XAck 显式确认消息,确保可靠性。
  • 适用场景:分布式任务队列、需消息重试或严格确认的系统。

XReadStreams 简化读取功能

部分客户端库提供的语法糖,本质是对 XRead 的封装,适用于单一 Stream 场景。

伪代码示例

streams, err := rdb.XReadStreams(ctx, "mystream", "0").Result()
// 等效于:
rdb.XRead(ctx, &redis.XReadArgs{Streams: []string{"mystream", "0"},
}).Result()

注意事项

  • 非所有库均提供此方法,建议直接使用 XRead
  • 功能与 XRead 完全一致,仅简化参数传递。

四.redis确认机制

XAck() 函数简介

在 Go 中使用 go-redis 时,XAck() 用于确认一条或多条消息已被消费,是消费组模式下的关键操作。

基本用法示例

ackCount, err := rdb.XAck(ctx, "mystream", "mygroup", "1715678901234-0").Result()
if err != nil {log.Fatal(err)
}
fmt.Printf("确认了 %d 条消息\n", ackCount)

参数说明

  • "mystream":目标流名称。
  • "mygroup":消费组名称。
  • "1715678901234-0":消息 ID(支持传递多个 ID)。
  • 返回值ackCount 表示成功确认的消息数量。

确认机制的本质

通过 XReadGroup 读取消息时:

  • Redis 将消息分配给消费者后,消息会被加入待确认列表(Pending Entries List, PEL)。
  • 调用 XAck() 后,Redis 从 PEL 中移除该消息,表明消息已处理完毕,无需重发。

未调用 XAck 的后果

若消费者读取消息后未确认:

  • 消息滞留:消息持续存在于 Pending 列表,无法自动重新分配。
  • 手动处理:需通过 XClaimXAutoClaim 转移给其他消费者。
  • 资源占用:Pending 列表堆积导致内存压力增大,可能影响系统性能。
  • 监控异常XPENDING 命令显示的未确认消息数量持续增加。

关键点

  • 及时调用 XAck() 是确保消息可靠处理的关键。
  • 未确认消息可能导致重复消费或系统资源浪费。

六.stream在go-tcp的使用

1.多消费组思路

每个消费组都有自己独立的 Pending 列表、游标和确认状态。从而实现每一个客户作为一个消费组,从而都能接受所传来的数据,互不干扰,从而作为广播模式。

也就是说:

  • 如果 Group A 中的消费者没有确认消息(未调用 XAck),
    只会影响 Group A 的状态(这条消息在 A 的 Pending 列表中)。

  • Group B 不受影响,可以正常读取、确认同一条消息。

虽然多个消费组互不影响,但:

  • Redis Stream 中的原始消息不会因为某个组确认了就被删除

  • 消息只有在所有消费组都不再需要它且你主动修剪(XTrimMAXLEN)时,才会被清理。

也就是说:

消息删除和 XAck 是两件事。
XAck 只标记该组“处理完了”,不会删除消息本体。

2.流程

1.首先进行登录注册,创建用户实例

2.服务端创建成功,为每一个在线用户分配消费组,并读取所用的唯一流中的所有数据(通过函数XRange,或XRead,),从而获取离线时的历史消息,并传入到服务端

3.在客户端发消息时,通过写入(发送者,接收者,消息类型,消息内容)作为一个消息集合写入到流中

4.服务端通过实时阻塞接受流中的数据,并做出相应的功能,或者打印

5.服务端接受数据后,在选择群发还是私信发给客户,并将消息存入流中

http://www.dtcms.com/a/531736.html

相关文章:

  • 中职 网站建设与管理海口快速建站公司推荐
  • Qt TCP 网络通信详解(笔记)
  • RandLA-net-pytorch 复现
  • Rocky 9 安装 Elasticsearch分布式集群基于非安全特性
  • 使用现代C++构建高效日志系统的分步指南
  • Nacos 环境搭建:从单机开发到集群生产部署
  • OpenWrt | 实现限制只有指定设备才能访问 luci 和 使用 SSH 等方式管理设备的方法
  • 数据库圣经-----最终章JDBC
  • 小贷做网站客户推广渠道有哪些
  • SAP SD交货单明细查询接口分享
  • Java Spring原理 --- Bean的作用域,Bean的生命周期,Spring自动配置
  • TCP三次握手与四次挥手通俗理解
  • 电商网站如何设计内容能源产品网站建设多少钱
  • 门户网站的发布特点网站子栏目设计
  • 赣州企业网站建设公司苏州网站定制公司哪家好
  • 网页设计与网站建设的课后习题答案外贸 企业网站 建设
  • 呼伦贝尔网站建设平台海口网站运营托管费用
  • 网站外包费用怎么做分录个人做广播网站需要注意什么
  • 教育网站设计欣赏建一个公司网站要多少钱
  • 广州快速建站公司推荐做关于植物的网站
  • 厦门网站建设哪家专业黔东南小程序开发公司
  • 网站开发公司安心加盟苏州排名搜索优化
  • 响应式网站公司怎么做网站
  • 老师问我做网站用到什么创新技术江苏网站建设空间
  • wordpress 菜单没了网站关键词排名seo
  • 立网站系网站开发用什么工具好
  • html 网站开发软件网站永久免费建站
  • 万网网站开发世界500强企业中国有哪些
  • 漳州招商局规划建设局网站饿了么网站做生鲜吗
  • 官方网站优化方法字体设计转换器