当前位置: 首页 > news >正文

基于 Redis Stream 实现消息队列功能

好长时间没更新了。。。。。。

背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至可以选择自定义线程池+DelayQueue 这种操作去进行异步操作,而大多数人会在第一时间想到消息丢列,但是消息引入消息队列这件事对于一个并发量不大、后半段消息允许失败的情况单独引入一个中间件对系统的开发维护难度都会提升一个等级,所以我就想到应用 Redis Stream 这种方式来实现异步任务的执行

废话不多说,直接上代码

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisCountdownTaskProducer implements CountdownStrategy {
    private final StringRedisTemplate stringRedisTemplate;
    @Override
    public void startCountdown(long duration, Runnable onFinish, String userId, String taskId) {
        log.info("使用redis stream 的延时任务开始执行 userId:{},taskId:{}",userId,taskId);
        Map<String,String> producerMap = Map.of(
                "userId",userId,
                "taskId",taskId,
                "duration",String.valueOf(duration)
        );
        // 发送延迟消费信息  topic: pickUpTheLightRecord
        stringRedisTemplate.opsForStream().add("streamKey", producerMap);
    }

    @Override
    public void cancelCountdown(String userId, String taskId) {

    }
}

以我的应用场景为例,大家可以忽略这个继承的 CountdownStrategy 的接口,我这是用策略模式来实现多种方式的动态切换

最核心的代码就是一行 stringRedisTemplate.opsForStream().add(“pickUpTheLightRecord”, producerMap);
应用 redis 提供的 stream 功能,直接发送你的 topic 和你的 key(这个 key 可以是你的某个实体,某个信息,或者说某种标识,以便后续取出的时候可以知道自己要进行什么操作)

@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {

    private final RedisConnectionFactory redisConnectionFactory;
    private final RedisCountdownTaskConsumer redisCountdownTaskConsumer;

    @Bean
    public ExecutorService asyncStreamConsumer() {
        AtomicInteger index = new AtomicInteger();
        int processors = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(processors,
                processors + processors >> 1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                runnable -> {
                    Thread thread = new Thread(runnable);
                    thread.setName("stream_consumer_countdown_task_" + index.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
        );
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {

        // 创建消费者组
        String consumerGroup = "your_comsumer_group";
        String streamKey = "streamKey"; // 与生产者topic一致

        try {
            redisConnectionFactory.getConnection()
                    .xGroupCreate(streamKey.getBytes(), consumerGroup, ReadOffset.from("0-0"), true);
        } catch (Exception e) {
            // 捕获异常,检查是否是因为消费者组已存在导致的错误
            if (e.getMessage().contains("BUSYGROUP")) {
                // 如果消费者组已存在,则复用现有的消费者组
                log.warn("消费者组已存在,复用现有的消费者组: {}", consumerGroup);
            } else {
                // 如果是其他错误,则记录日志
                log.warn("消费者组创建失败: {}", e.getMessage());
            }
        }

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(10)
                        // 执行从 Stream 拉取到消息的任务流程
                        .executor(asyncStreamConsumer)
                        // 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时
                        .pollTimeout(Duration.ofSeconds(3))
                        .build();
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);
        streamMessageListenerContainer.receiveAutoAck(
                Consumer.from(consumerGroup,"countdownTaskConsumer"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                redisCountdownTaskConsumer);
        return streamMessageListenerContainer;
    }
}

以上是对消费者的配置,配置过后我们通过 Bean 的形式直接注入 Spring 容器,方便在应用启动时它可以自动创建,应用结束时可以自动销毁,避免资源浪费

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCountdownTaskConsumer implements StreamListener<String, MapRecord<String, String, String>> {

    private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
    private final TaskCommonPomodoroTechniqueMapper taskCommonPomodoroTechniqueMapper;
    private final StringRedisTemplate stringRedisTemplate;


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        log.info("receive message:{},倒计时任务开始执行 userId:{},taskId:{}",message);
        // 执行倒计时任务
        String stream = message.getStream();
        RecordId id = message.getId();
        String consumerGroup = "pickUpTheLightRecord_consumer_group";
        //消息幂等处理
        if(!messageQueueIdempotentHandler.isMessageProcessed(id.toString())){
            //判断当前消息是否执行完成
            if(messageQueueIdempotentHandler.isAccomplish(id.toString())){
                log.info("消息已处理完成: {}", id);
                stringRedisTemplate.opsForStream().acknowledge(stream, consumerGroup, id); // 显式AC
                return;
            }
            throw new RuntimeException("消息未完成流程,选择消息队列重试");
        }

        try {

            Map<String, String> value = message.getValue();
            String userId = value.get("userId");
            String taskId = value.get("taskId");
            LambdaUpdateWrapper<TaskCommonPomodoroTechnique> wrapper = new LambdaUpdateWrapper<>();
            wrapper.eq(TaskCommonPomodoroTechnique::getUserId,userId);
            wrapper.eq(TaskCommonPomodoroTechnique::getId,taskId);
            TaskCommonPomodoroTechnique taskCommonPomodoroTechnique = taskCommonPomodoroTechniqueMapper.selectOne(wrapper);
            if(taskCommonPomodoroTechnique == null){
                log.error("倒计时任务不存在,userId:{},taskId:{}",userId,taskId);
                return;
            }
            taskCommonPomodoroTechnique.setCompletionTimes(taskCommonPomodoroTechnique.getCompletionTimes()+1);
            taskCommonPomodoroTechniqueMapper.updateById(taskCommonPomodoroTechnique);
            log.info("倒计时任务执行成功,userId:{},taskId:{}",userId,taskId);
            stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
        }catch (Throwable e){
            messageQueueIdempotentHandler.deleteMessageProcessed(id.toString());
            log.error("记录倒计时任务异常",e);
            throw e;
        }

        messageQueueIdempotentHandler.setAccomplish(id.toString());
    }
}

最后我们来实现具体的消费者,直接继承 StreamListener 这个类,重写 onMessage 方法,在这个方法中定义你要执行的业务逻辑(大家可以忽略幂等的处理,这个不是讲解的重点)

这样就可以使用基于 Redis Stream 的消息丢列啦

总的来说和 RocketMQ 的使用差不多,但是多了一些配置的过程

相关文章:

  • Java基础关键_021_集合(五)
  • Spring TX配置(声明式事务管理+annotation)
  • 计算矩阵边缘元素之和(信息学奥赛一本通-1121)
  • Python 实现的采集诸葛灵签
  • 研发团队协作软件推荐:18款工具对比
  • win10升级到22H2版本后无法联网
  • 01-Canvas-使用fabric初始
  • 从零搭建微服务项目Pro(第2-2章——JSR303自定义文件校验+整合至微服务公共模块)
  • Java使用JDBC连接操作Sqlite 笔记250314
  • 【算法】 【c++】字符串s1 中删除所有 s2 中出现的字符
  • 总结 HTTP 协议的基本格式, 相关知识以及抓包工具fiddler的使用
  • 67.Harmonyos NEXT 图片预览组件之性能优化策略
  • 【Scala】
  • 论文分享 | HE-Nav: 一种适用于复杂环境中空地机器人的高性能高效导航系统
  • TIA博途在编译 PLC时出现崩溃的解决方法
  • 测不准关系
  • 【redis】zset 类型:基本命令(上)
  • Java中架构DDD:理解聚合、实体和值对象三种核心构造块
  • C#+EF+SqlServer性能优化笔记
  • Python 科学计算与机器学习入门:NumPy + Scikit-Learn 实战指南
  • 高飞已任南航集团党组副书记
  • 泽连斯基:俄代表团级别低,没人能做决定
  • 一涉嫌开设赌场的网上在逃人员在山东威海落网
  • 泽连斯基:乌代表团已启程,谈判可能于今晚或明天举行
  • 中拉互联网发展与合作论坛在西安开幕
  • 最高人民法院原副院长唐德华逝世,享年89岁