31.stream数据类型应用
1.创建一个stream类型的消息队列,名为streams.orders。通过命令创建,因为队列一开始就创建好,后续也不会再去重复创建了。
创建队列和消费者组命令:

streams.orders为队列名称
g1为组名称
0表示从队列的第一条消息开始
mkstream 表示没有组或者队列则创建
2.将之前秒杀业务中lua脚本调整,在判断抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId
-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1
end-- 3.2 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明重复下单返回2return 2
end-- 3.4扣减库存incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5下单,保存用户 sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 k3 v3
redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId, "id", orderId)
return 0
注意:通过java代码传参给lua脚本都是string字符串类型的,需要类型转换一下。
public Long seckillForStream(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行lua脚本//判断购买资格//发送消息到队列Long result = stringRedisTemplate.execute(SECKILL_STREAM_SCRIPT, Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//2.判断结果是否为0,0代表成功int i = result.intValue();if(i != 0) {throw new ServiceException(i == 1? "库存不足":"不能重复下单");}proxy = (IVoucherOrderService) AopContext.currentProxy();return orderId;}
3.项目启动时,开启一个线程任务,尝试获取stream.orders队列中的消息,完成下单。
读取消息命令:
xreadgroup group g1 c1 count 1 block 2000 streams streams.orders >
g1组名称
c1 消费者名称,这里先写死
block 2000 阻塞等待2s
streams.orders队列名称
> 最近一条未消费的消息
private static final DefaultRedisScript<Long> SECKILL_STREAM_SCRIPT;static {SECKILL_STREAM_SCRIPT = new DefaultRedisScript<>();SECKILL_STREAM_SCRIPT.setLocation(new ClassPathResource("seckill_stream.lua"));SECKILL_STREAM_SCRIPT.setResultType(Long.class);}/*** 当项目一启动,当前类被加载初始化的时候此方法就会被执行*/@PostConstructprivate void init() {
// SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderStreamHandler());}private class VoucherOrderStreamHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1.获取消息队列中的消息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("streams.orders", ReadOffset.lastConsumed()));//2.判断消息获取是否成功if(CollectionUtil.isEmpty(list)) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> values = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//3.如果获取成功,创建订单handleVoucherOrder(voucherOrder);//4.ACK确认 sack stream.orders g1 消息idstringRedisTemplate.opsForStream().acknowledge("streams.orders", "g1", entries.getId());}catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try{//1.获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("streams.orders", ReadOffset.from("0")));//2.判断消息是否获取成功if(CollectionUtil.isEmpty(list)) {//2.1如果获取失败,说明pending-list中没有消息,结束循环break;}MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> values = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//3.如果获取成功,创建订单handleVoucherOrder(voucherOrder);//4.ACK确认 sack stream.orders g1 消息idstringRedisTemplate.opsForStream().acknowledge("streams.orders", "g1", entries.getId());}catch (Exception e) {log.error("处理pending-list订单异常", e);//防止调用太频繁,休眠try {Thread.sleep(50);} catch (InterruptedException e1) {e1.printStackTrace();}}}}}