redis实现简易消息队列
本系统没有集成MQ,但是又要实现削峰,刚好redis有队列的结构,可以实现简易的MQ
一、核心实现思路
- 生产者:使用Redis的
LPUSH
命令将消息插入队列头部 - 消费者:
- 采用
RPOP
命令配合超时机制实现阻塞读取 - 无消息时休眠指定时间并累加休眠时长
- 当累计休眠时间达到
max
值时强制退出
- 采用
二、关键代码实现
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 生产者实现
@Service
public class RedisProducer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void sendMessage(String queueName, String message) {
redisTemplate.opsForList().leftPush(queueName, message);
}
}
3. 消费者实现
@Component
public class RedisConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RedisConsumer.class);
private final String queueName;
private final int maxSleepTime; // 最大累计休眠时间(毫秒)
private long accumulatedSleepTime = 0;
public RedisConsumer(String queueName, int maxSleepTime) {
this.queueName = queueName;
this.maxSleepTime = maxSleepTime;
}
@Override
public void run() {
while (true) {
String message = redisTemplate.opsForList().rightPop(queueName, 1000); // 1秒超时
if (message != null) {
try {
processMessage(message);
accumulatedSleepTime = 0; // 成功消费重置计时
} catch (Exception e) {
logger.error("消息处理失败: {}", e.getMessage());
}
} else {
long sleepDuration = Math.min(1000, maxSleepTime - accumulatedSleepTime);
logger.info("队列为空,休眠{}ms", sleepDuration);
try {
Thread.sleep(sleepDuration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
accumulatedSleepTime += sleepDuration;
if (accumulatedSleepTime >= maxSleepTime) {
logger.warn("累计休眠时间已达上限,强制退出");
break;
}
}
}
}
private void processMessage(String message) {
// 模拟消息处理逻辑
logger.info("处理消息: {}", message);
}
}
4. 配置类
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
return template;
}
}
三、运行与测试
- 启动消费者:
@SpringBootApplication
public class RedisQueueDemo {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(RedisQueueDemo.class, args);
RedisConsumer consumer = context.getBean(RedisConsumer.class);
new Thread(consumer).start();
}
}
- 发送测试消息:
@RestController
public class TestController {
@Autowired
private RedisProducer producer;
@GetMapping("/send")
public String sendMessage() {
producer.sendMessage("testQueue", "Hello Redis!");
return "消息已发送";
}
}
四、关键特性说明
- 阻塞读取:通过
rightPop(1000)
实现非阻塞等待,避免忙等待 - 休眠累加:每次无消息时休眠1秒,累计时间超过
max
值时退出,避免cpu时间片一直被占用 - 异常处理:消费失败时记录日志,避免中断循环
- 序列化支持:可扩展为支持复杂对象的消息序列化
五、扩展建议
- 分布式锁:多实例部署时需添加分布式锁保证消费唯一性
- 消息确认:可结合Redis的
WATCH
命令实现消息确认机制 - 监控指标:添加消费速率、队列长度等监控指标
该方案综合了Redis的高性能和Spring Boot的便捷性,适用于低延迟、高可靠性的消息处理场景。实际生产环境中建议结合具体业务需求调整超时时间和休眠策略。