go-stream(一些常用命令介绍,以及在go-tcp中使用)
一.有关消费组的函数
XGroupCreate 创建消费组
err := rdb.XGroupCreate(ctx, "mystream", "mygroup", "$").Err()
mystream:目标 Stream 名称mygroup:消费组名称$:表示从最新消息开始消费(后续添加的消息才会被消费)- 注意:若 Stream 不存在会报错,需改用
XGroupCreateMkStream
XGroupCreateMkStream 自动创建流
err := rdb.XGroupCreateMkStream(ctx, "mystream", "mygroup", "$").Err()
功能与 XGroupCreate 相同,但会自动创建不存在的 Stream,推荐优先使用。
XGroupDestroy 删除消费组
err := rdb.XGroupDestroy(ctx, "mystream", "mygroup").Err()
直接删除指定消费组,组内消费者和消息一并清除。
XGroupDelConsumer 删除消费者
res, err := rdb.XGroupDelConsumer(ctx, "mystream", "mygroup", "worker-1").Result()
fmt.Println("删除的消息数:", res)
- 仅删除消费者,不删除消费组
- 返回值为该消费者未确认的消息数量
XReadGroup 消费组读取消息
msgs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "mygroup",Consumer: "worker-1",Streams: []string{"mystream", ">"},Count: 10,Block: 0,
}).Result()
>:仅读取未分配的新消息0:重新读取 Pending 消息Block: 0:无限阻塞等待
XAck 确认消息处理完成
rdb.XAck(ctx, "mystream", "mygroup", "1673451223-0")
确认后消息从 Pending 列表移除,避免重复消费。
XPending 查看未确认消息
pending, err := rdb.XPending(ctx, "mystream", "mygroup").Result()
fmt.Printf("待确认消息数:%d\n", pending.Count)
支持扩展查询:
details, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{Stream: "mystream",Group: "mygroup",Start: "-",End: "+",Count: 10,
}).Result()
XAutoClaim 自动重分配消息
msgs, err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{Stream: "mystream",Group: "mygroup",Consumer: "worker-2",MinIdle: time.Minute, // 超时1分钟未确认的消息Start: "0-0",Count: 10,
}).Result()
用于将长时间未确认的消息重新分配给其他消费者,实现消息重试机制。
二.向流中写入数据
XAdd —— 向流中添加一条消息
功能是往指定 Stream 中添加一条消息。
代码示例:
id, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream",Values: map[string]interface{}{"user": "alice","msg": "hello world",},
}).Result()
fmt.Println("添加的消息 ID:", id)
参数说明
Stream:流名称Values:要写入的键值对数据ID(可选):消息ID,通常留空让Redis自动生成(格式如1715678901234-0)MaxLen(可选):限制流的最大长度(自动裁剪旧消息)Approx(可选):是否使用近似修剪(性能更高)
示例:限制流长度
id, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream",MaxLen: 1000, // 限制最多1000条Approx: true, // 近似修剪Values: map[string]interface{}{"event": "signup","user": "bob",},
}).Result()
Redis 会自动删除旧消息,保持流大小在 1000 条左右。
XAddN —— 批量写入(如果使用 go-redis v9+)
某些版本支持批量添加多条记录(非官方标准 Redis 命令,而是客户端优化)。
示例:
ids, err := rdb.XAddN(ctx, []*redis.XAddArgs{{Stream: "mystream",Values: map[string]interface{}{"msg": "first"},},{Stream: "mystream",Values: map[string]interface{}{"msg": "second"},},
}).Result()
fmt.Println("写入的消息 ID 列表:", ids)
如果不支持 XAddN,可以用循环调用 XAdd 实现同样效果。
XTrim —— 主动修剪流
如果想手动清理旧消息(比如后台定期任务),可以使用 XTrim。
示例:
trimmed, err := rdb.XTrim(ctx, "mystream", &redis.XTrimArgs{MaxLen: 1000,Approx: true,
}).Result()
fmt.Printf("被删除的消息数量: %d\n", trimmed)
XDel —— 删除指定 ID 的消息
虽然 Redis Stream 一般不常直接删除单条消息,但有时为了清理错误或重复数据可以用:
deleted, err := rdb.XDel(ctx, "mystream", "1715678901234-0").Result()
fmt.Printf("删除了 %d 条消息\n", deleted)
三.从流上读取数据
XRead 基础读取功能
从单个或多个 Stream 直接读取消息,不涉及消费组。
典型用法示例
streams, err := rdb.XRead(ctx, &redis.XReadArgs{Streams: []string{"mystream", "0"}, // 从头开始读取Count: 10, // 最多读取10条Block: 0, // 阻塞读取(0表示无限等待)
}).Result()
核心特点
- 无消费组概念,需手动指定读取起点(
0、$或具体消息 ID)。 - 消息不会被自动确认或标记,适合一次性拉取或测试场景。
- 适用场景:简单消息拉取、历史数据回溯、调试等。
for _, stream := range result { //通过循环每一条流for _, msg := range stream.Messages { //循环读取每条流的每个信息
}
}
XReadGroup 消费组读取功能
基于消费组机制读取消息,支持多消费者协同处理,适用于生产环境。
典型用法示例
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "mygroup", // 消费组名称Consumer: "worker-1", // 消费者标识Streams: []string{"mystream", ">"}, // ">" 表示仅读取未分配的新消息Count: 5, // 每次最多读取5条Block: 0, // 阻塞等待
}).Result()
核心特点
- 需预先通过
XGROUP CREATE创建消费组。 - 参数
>表示只读取未分配的消息;指定 ID 可处理未确认消息(如故障恢复)。 - 需调用
XAck显式确认消息,确保可靠性。 - 适用场景:分布式任务队列、需消息重试或严格确认的系统。
XReadStreams 简化读取功能
部分客户端库提供的语法糖,本质是对 XRead 的封装,适用于单一 Stream 场景。
伪代码示例
streams, err := rdb.XReadStreams(ctx, "mystream", "0").Result()
// 等效于:
rdb.XRead(ctx, &redis.XReadArgs{Streams: []string{"mystream", "0"},
}).Result()
注意事项
- 非所有库均提供此方法,建议直接使用
XRead。 - 功能与
XRead完全一致,仅简化参数传递。
四.redis确认机制
XAck() 函数简介
在 Go 中使用 go-redis 时,XAck() 用于确认一条或多条消息已被消费,是消费组模式下的关键操作。
基本用法示例
ackCount, err := rdb.XAck(ctx, "mystream", "mygroup", "1715678901234-0").Result()
if err != nil {log.Fatal(err)
}
fmt.Printf("确认了 %d 条消息\n", ackCount)
参数说明
"mystream":目标流名称。"mygroup":消费组名称。"1715678901234-0":消息 ID(支持传递多个 ID)。- 返回值:
ackCount表示成功确认的消息数量。
确认机制的本质
通过 XReadGroup 读取消息时:
- Redis 将消息分配给消费者后,消息会被加入待确认列表(Pending Entries List, PEL)。
- 调用
XAck()后,Redis 从 PEL 中移除该消息,表明消息已处理完毕,无需重发。
未调用 XAck 的后果
若消费者读取消息后未确认:
- 消息滞留:消息持续存在于 Pending 列表,无法自动重新分配。
- 手动处理:需通过
XClaim或XAutoClaim转移给其他消费者。 - 资源占用:Pending 列表堆积导致内存压力增大,可能影响系统性能。
- 监控异常:
XPENDING命令显示的未确认消息数量持续增加。
关键点
- 及时调用
XAck()是确保消息可靠处理的关键。 - 未确认消息可能导致重复消费或系统资源浪费。
六.stream在go-tcp的使用
1.多消费组思路
每个消费组都有自己独立的 Pending 列表、游标和确认状态。从而实现每一个客户作为一个消费组,从而都能接受所传来的数据,互不干扰,从而作为广播模式。
也就是说:
如果 Group A 中的消费者没有确认消息(未调用
XAck),
只会影响 Group A 的状态(这条消息在 A 的 Pending 列表中)。Group B 不受影响,可以正常读取、确认同一条消息。
虽然多个消费组互不影响,但:
Redis Stream 中的原始消息不会因为某个组确认了就被删除。
消息只有在所有消费组都不再需要它且你主动修剪(
XTrim或MAXLEN)时,才会被清理。
也就是说:
消息删除和
XAck是两件事。
XAck只标记该组“处理完了”,不会删除消息本体。
2.流程
1.首先进行登录注册,创建用户实例
2.服务端创建成功,为每一个在线用户分配消费组,并读取所用的唯一流中的所有数据(通过函数XRange,或XRead,),从而获取离线时的历史消息,并传入到服务端
3.在客户端发消息时,通过写入(发送者,接收者,消息类型,消息内容)作为一个消息集合写入到流中
4.服务端通过实时阻塞接受流中的数据,并做出相应的功能,或者打印
5.服务端接受数据后,在选择群发还是私信发给客户,并将消息存入流中
