Redis实战(7)-- 高级特性 Redis Stream数据结构与基础命令
Redis stream的数据结构:

【注意】消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5;而使用*,进行设置对于的消息ID,则是表示系统默认生成。
每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。

【重点】每个消费组都有一个Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量。
同时每个消费组状态都是独立的,互不影响,同一个消息,可以被每个组都消费。
命令端操作
生产者端
(1)xadd追加消息:
![]()
(2)xrange获取消息队列,会自动过滤已删除信息
![]()
其中-表示最小值,+表示最大值

获取指定ID信息
![]()
(3)xlen获取消息长度
![]()
即代表当前Stream中有多少消息存在且未被消费
(4)del删除Stream
Del streamtest
即表示删除整个Stream流
而xdel streamtest ID 可以删除对应ID消息。
消费端
单消费者【了解】
虽然Stream中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。
【注意】0-0表示头消息,$表示最新消息开始
单消费者操作:
(1)读取指定消息ID后消息:
Xread count 1 streams stream2 1665644057756-0
【注意】当指定位置是0-0时候,则会从第一条消息开始读。$则是读取最尾部消息,如果这个尾部消息已经被读过了,则返回nil (null),通常会配合阻塞block进行使用。
(2)以阻塞方式从尾部一次读取一个信息,并阻塞直至最新消息到来
xread block 0 count 1 streams stream1 $
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。
消费群【重点】
【面试考题】为什么会用到群组?
Redis群组消费是为了应对生产者发送过多消息到redis stream中,但是消费者消费不过来的情况,这种情况可以使用多个群组进行消费,从而合理提升消费者组整体消费能力。
结构图展示:

针对消费组进行解析,消费组会有一个值last_delivered_id进行标识偏移值,表明当前消费到哪条消息。而消费者中也有 pending_ids[] 即消费ID记录表,其中记录当前消费者消费过哪些内容,相当于一个ack机制(其实这个就是PEL机制)。
(1)消费组群组创建:
xgroup create stream1 c1 0-0
(2)只接受最新的消息,当前Stream消息会被全部忽略处理:
xgroup create stream1 c2 $
(3)查看对应Stream状态
xinfo stream stream1

(4)查看消费组状态:
Xinfo groups stream1

(5)查看一组中多个消费者状态:
Xinfo consumers stream2 c1

可以看到目前c1这个消费者有 7 条等待ACK的消息,空闲了2086176ms 没有读取消息。
(6)消费组中固定消费者消费
xreadgroup GROUP c1 consumer1 count 1 block 0 streams stream1 >
【解析】
让c1消费组中的consumer1消费者进行消费一条消息,">"表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进。Block 0表示进行阻塞等待。
【注意】
由于有偏移下标,群组中任意消费者进行消费消息后,偏移量都会向后移动,所以说除非消费失败导致异常,消费者组中消费者不会消费已经比消费的数据。
(7)消费者确认
Xack stream1 cg1 160219318311-0
确认消费者消费stream1消息队列中160219318311-0的消息。(主要用于PEL机制确定)
