当前位置: 首页 > news >正文

redis实现简易消息队列

本系统没有集成MQ,但是又要实现削峰,刚好redis有队列的结构,可以实现简易的MQ

一、核心实现思路

  1. 生产者:使用Redis的LPUSH命令将消息插入队列头部
  2. 消费者
    • 采用RPOP命令配合超时机制实现阻塞读取
    • 无消息时休眠指定时间并累加休眠时长
    • 当累计休眠时间达到max值时强制退出

二、关键代码实现

1. 添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 生产者实现
@Service
public class RedisProducer {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void sendMessage(String queueName, String message) {
        redisTemplate.opsForList().leftPush(queueName, message);
    }
}
3. 消费者实现
@Component
public class RedisConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(RedisConsumer.class);
    private final String queueName;
    private final int maxSleepTime; // 最大累计休眠时间(毫秒)
    private long accumulatedSleepTime = 0;

    public RedisConsumer(String queueName, int maxSleepTime) {
        this.queueName = queueName;
        this.maxSleepTime = maxSleepTime;
    }

    @Override
    public void run() {
        while (true) {
            String message = redisTemplate.opsForList().rightPop(queueName, 1000); // 1秒超时
            if (message != null) {
                try {
                    processMessage(message);
                    accumulatedSleepTime = 0; // 成功消费重置计时
                } catch (Exception e) {
                    logger.error("消息处理失败: {}", e.getMessage());
                }
            } else {
                long sleepDuration = Math.min(1000, maxSleepTime - accumulatedSleepTime);
                logger.info("队列为空,休眠{}ms", sleepDuration);
                try {
                    Thread.sleep(sleepDuration);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                accumulatedSleepTime += sleepDuration;
                if (accumulatedSleepTime >= maxSleepTime) {
                    logger.warn("累计休眠时间已达上限,强制退出");
                    break;
                }
            }
        }
    }

    private void processMessage(String message) {
        // 模拟消息处理逻辑
        logger.info("处理消息: {}", message);
    }
}
4. 配置类
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        return template;
    }
}

三、运行与测试

  1. 启动消费者
@SpringBootApplication
public class RedisQueueDemo {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(RedisQueueDemo.class, args);
        RedisConsumer consumer = context.getBean(RedisConsumer.class);
        new Thread(consumer).start();
    }
}
  1. 发送测试消息
@RestController
public class TestController {
    @Autowired
    private RedisProducer producer;

    @GetMapping("/send")
    public String sendMessage() {
        producer.sendMessage("testQueue", "Hello Redis!");
        return "消息已发送";
    }
}

四、关键特性说明

  1. 阻塞读取:通过rightPop(1000)实现非阻塞等待,避免忙等待
  2. 休眠累加:每次无消息时休眠1秒,累计时间超过max值时退出,避免cpu时间片一直被占用
  3. 异常处理:消费失败时记录日志,避免中断循环
  4. 序列化支持:可扩展为支持复杂对象的消息序列化

五、扩展建议

  1. 分布式锁:多实例部署时需添加分布式锁保证消费唯一性
  2. 消息确认:可结合Redis的WATCH命令实现消息确认机制
  3. 监控指标:添加消费速率、队列长度等监控指标

该方案综合了Redis的高性能和Spring Boot的便捷性,适用于低延迟、高可靠性的消息处理场景。实际生产环境中建议结合具体业务需求调整超时时间和休眠策略。

相关文章:

  • Python代码调用Java接口的简单demo
  • 基于本人猜想和尼古拉特斯拉的结合的植物发电站系统
  • DeepSeek-V3-0324 版本升级概要
  • 关于embedding向量模型的知识
  • Kafka中的消息如何分配给不同的消费者?
  • 多线程—synchronized原理
  • Ubuntu24.04 配置远程桌面服务
  • 当前环境下,数据安全何去何从?
  • [数据结构]并查集(系统整理版)
  • vscode 打开工程 看不到文件目录
  • FlexAlign.SpaceBetween`、`FlexAlign.SpaceAround` 和 `FlexAlign.SpaceEvenly三个属性的区别
  • 解决Dify:failed to init dify plugin db问题
  • C - 通讯录2.0(详细解析)
  • AI知识补全(八):多模态大模型是什么?
  • 第4期:重构软件测试体系——生成式AI如何让BUG无所遁形
  • Python包下载路径 Chrome用户数据 修改到非C盘
  • Elasticsearch 搜索高级
  • C#高级:启动、中止一个指定路径的exe程序
  • 六十天Linux从0到项目搭建(第十天)(系统调用 vs 库函数/进程管理的建模/为什么进程管理中需要PCB?/exec 函数/fork原理与行为详解)
  • 【Linux加餐-网络命令】
  • 国铁集团去年收入12830亿元增3%,全年铁路运输利润总额创新高
  • 五一去哪儿|外国朋友来中国,“买买买”成为跨境旅游新趋势
  • 吴志朴当选福建德化县人民政府县长
  • 君亭酒店:2024年营业收入约6.76亿元, “酒店行业传统增长模式面临巨大挑战”
  • 报告显示2024年全球军费开支增幅达冷战后最大
  • 科学时代重读“老子”的意义——对谈《老子智慧八十一讲》