当前位置: 首页 > news >正文

31.stream数据类型应用

1.创建一个stream类型的消息队列,名为streams.orders。通过命令创建,因为队列一开始就创建好,后续也不会再去重复创建了。

创建队列和消费者组命令:

streams.orders为队列名称

g1为组名称

0表示从队列的第一条消息开始

mkstream 表示没有组或者队列则创建

2.将之前秒杀业务中lua脚本调整,在判断抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1
end-- 3.2 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then-- 3.3 存在,说明重复下单返回2return 2
end-- 3.4扣减库存incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5下单,保存用户 sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 k3 v3
redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId, "id", orderId)
return 0

注意:通过java代码传参给lua脚本都是string字符串类型的,需要类型转换一下。

public Long seckillForStream(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行lua脚本//判断购买资格//发送消息到队列Long result = stringRedisTemplate.execute(SECKILL_STREAM_SCRIPT, Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//2.判断结果是否为0,0代表成功int i = result.intValue();if(i != 0) {throw new ServiceException(i == 1? "库存不足":"不能重复下单");}proxy = (IVoucherOrderService) AopContext.currentProxy();return orderId;}

3.项目启动时,开启一个线程任务,尝试获取stream.orders队列中的消息,完成下单。

读取消息命令:

xreadgroup group g1 c1 count 1 block 2000 streams streams.orders >

g1组名称

c1 消费者名称,这里先写死

block 2000 阻塞等待2s

streams.orders队列名称

> 最近一条未消费的消息

private static final DefaultRedisScript<Long> SECKILL_STREAM_SCRIPT;static {SECKILL_STREAM_SCRIPT = new DefaultRedisScript<>();SECKILL_STREAM_SCRIPT.setLocation(new ClassPathResource("seckill_stream.lua"));SECKILL_STREAM_SCRIPT.setResultType(Long.class);}/*** 当项目一启动,当前类被加载初始化的时候此方法就会被执行*/@PostConstructprivate void init() {
//        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderStreamHandler());}private class VoucherOrderStreamHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1.获取消息队列中的消息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("streams.orders", ReadOffset.lastConsumed()));//2.判断消息获取是否成功if(CollectionUtil.isEmpty(list)) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> values = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//3.如果获取成功,创建订单handleVoucherOrder(voucherOrder);//4.ACK确认 sack stream.orders g1 消息idstringRedisTemplate.opsForStream().acknowledge("streams.orders", "g1", entries.getId());}catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try{//1.获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("streams.orders", ReadOffset.from("0")));//2.判断消息是否获取成功if(CollectionUtil.isEmpty(list)) {//2.1如果获取失败,说明pending-list中没有消息,结束循环break;}MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> values = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//3.如果获取成功,创建订单handleVoucherOrder(voucherOrder);//4.ACK确认 sack stream.orders g1 消息idstringRedisTemplate.opsForStream().acknowledge("streams.orders", "g1", entries.getId());}catch (Exception e) {log.error("处理pending-list订单异常", e);//防止调用太频繁,休眠try {Thread.sleep(50);} catch (InterruptedException e1) {e1.printStackTrace();}}}}}
http://www.dtcms.com/a/520306.html

相关文章:

  • 蓝牙协议6.1
  • 微服务中的服务熔断、降级与限流
  • 查网站的建站系统百度搜索网站图片
  • 网站界面风格设计描述网站类网站开发犯罪吗
  • 新站点seo联系方式设计工作室网站首页
  • Adobe Lightroom Classic 2025解锁版 (专业照片管理)
  • univla复现libero
  • kubernets的pod管理
  • 14、【Ubuntu】【VSCode】VSCode 断联问题分析:hostname(二)
  • Java 堆排序(Heap Sort)详解教程
  • 软件设计师知识点总结:操作系统
  • 黄岩路桥网站设计网站流量提升方案
  • 设计师网站欣赏店铺只做商品展示网站怎么做
  • dify部署及SSL自签实现
  • 云南省建设厅标准员网站手机兼职赚钱
  • Redis哈希表渐进式rehash深度解析:为何百万数据迁移不阻塞服务?
  • 广东省省考备考(第一百三十一天10.23)——科学推理:电学(第六节课)
  • Spring的三级缓存和SpringMVC的流程
  • 为什么麒麟信创系统需要开启overcommit_memory才能安装postgresql成功
  • PostGresql All语法
  • [java] 图文示八股
  • 【图像处理】图像形态学操作
  • 网站上传 空间 数据库开发一个电商平台app要多少钱
  • 如何制作网站链接数字镭网站开发
  • 使用python的matplotlib进行绘图
  • Nginx使用auth_request模块做外部认证集成Kibana
  • 【题解】洛谷 P2218 [HAOI2007] 覆盖问题 [二分 + 思维]
  • xss-labs pass-12
  • 企业网站建设服务电话做网站什么主题好做
  • 注册电气工程师(供配电)执业资格考试专业考试规范及设计手册(2025版)