Spring Boot使用Redis实现消息队列
Spring Boot使用Redis实现消息队列
消息队列有很多现成的产品,但是服务器上的东西有时候不是开发者能说了算的,一般情况下服务器上都会有Mysql和Redis。消息队列又是在开发中大部分情况下要用到的,所以在不增加额外的东西的情况下,Redis实现消息队列就是成必须了。
队列的好处
- 解耦: 生产消息的和处理消息的模块分开了,互不影响。以后就算处理消息的逻辑再复杂,也不会拖慢用户请求的速度。
- 异步: 用户请求(生产消息)可以立即返回,大大提升了用户体验。
- 削峰: 如果瞬间有大量用户注册(比如搞活动),任务会先在Redis里排队,消费者再慢慢处理,保护了后端服务(如邮件服务器)不被冲垮。
实现方法
- 发布/订阅(Pub/Sub)模式
- List或者Stream 模式
两种方式的对比
特性 Feature | Pub/Sub (学校广播) | Stream (智能厨房订单系统) |
---|---|---|
消息持久化 | ❌ 不支持 | ✅ 完全支持 |
消费者组 | ❌ 不支持 | ✅ 核心功能 |
负载均衡 | ❌ 不支持 (广播模式) | ✅ 完美支持 (组内竞争) |
消息确认(ACK) | ❌ 不支持 | ✅ 核心功能 |
失败重试 | ❌ 无法实现 | ✅ 可以轻松实现 |
历史追溯 | ❌ 不支持 | ✅ 任意回溯 |
实现复杂度 | ⭐ (非常简单) | ⭐⭐⭐⭐ (相对复杂) |
最适合的场景 | 实时通知、聊天、数据可视化 | 任务队列、订单处理、日志收集 |
什么时候用 Pub/Sub?
当你追求极致的速度,并且不介意偶尔丢失消息时。
场景举例:网页上的实时股价推送(丢一两个点无所谓,下一个马上就来了)。在线游戏里,广播玩家的位置信息(这次没收到,0.1秒后下次更新就收到了)。多个微服务系统状态的实时监控看板。
什么时候用 Stream?
当你的业务绝不能丢失任何一条数据,并且需要协同处理大量任务时。
场景举例:电商订单系统: 每个订单都必须被准确处理。用户注册后的欢迎邮件/短信发送: 每个用户都必须收到。日志处理系统: 收集所有系统的日志,进行分析。任何需要“任务队列”思想的场景。
代码实现
1. Pub/Sub
- 生产者
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;/*** 消息生产者(Producer)* 就像奶茶店的前台,负责接收点单,并把订单信息发送到传送带(Redis)*/
@Component // 把这个类声明为一个组件,让Spring Boot来管理它
public class MessageProducer {// 这是一个Spring Boot提供的、专门用来操作Redis的强大工具@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送消息到指定的频道(Channel)* @param channel 频道名称,就像传送带的不同泳道,比如"邮件传送带"、"短信传送带"* @param message 要发送的具体消息内容,比如"给用户xxx发送欢迎邮件"*/public void sendMessage(String channel, String message) {System.out.println("发送消息 -> 频道: " + channel + ", 内容: " + message);// 使用convertAndSend方法,将消息发布到指定的频道redisTemplate.convertAndSend(channel, message);}
}
- 消费者
package com.example.redismqdemo;import org.springframework.stereotype.Component;/*** 消息消费者(Consumer)* 就像奶茶店的师傅,时刻准备从传送带(Redis)上接收新订单并处理*/
@Component // 同样,让Spring Boot管理它
public class MessageConsumer {/*** 这是处理消息的方法。* 当我们订阅的频道(比如 "email-channel")收到消息时,这个方法就会被自动调用。* @param message 从频道接收到的消息内容*/public void receiveMessage(String message) {// 为了演示,我们只在控制台打印出来,表示我们已经收到并处理了// 在真实场景中,这里会是发送邮件、处理订单等具体业务代码System.out.println("收到消息 <- 内容: " + message);System.out.println("正在处理任务...(例如:发送邮件)");// 模拟处理任务需要的时间try {Thread.sleep(2000); // 暂停2秒} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务处理完成!");}
}
- 消费关系
package com.example.redismqdemo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** Redis配置类* 作用:建立“订阅关系”,即把我们的消费者(MessageConsumer)和指定的频道(Channel)绑定起来*/
@Configuration // 告诉Spring Boot,这是一个配置类
public class RedisConfig {// 我们要监听的频道名称public static final String EMAIL_CHANNEL = "email-channel";/*** 创建一个消息监听容器* 就像为奶茶师傅分配一个固定的工作站,让他专门监听某个传送带* @param connectionFactory Redis连接工厂,Spring Boot会自动提供* @param listenerAdapter 监听适配器,告诉监听到消息后该怎么办* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// addMessageListener方法将监听器和频道绑定// 这里我们让它监听 "email-channel" 这个频道container.addMessageListener(listenerAdapter, new PatternTopic(EMAIL_CHANNEL));return container;}/*** 创建消息监听适配器* 它定义了当消息来了之后,具体要调用哪个对象的哪个方法* @param consumer 我们自己写的消费者实例(MessageConsumer)* @return*/@BeanMessageListenerAdapter listenerAdapter(MessageConsumer consumer) {// "receiveMessage" 是我们在 MessageConsumer 类中定义的方法名return new MessageListenerAdapter(consumer, "receiveMessage");}
}
- 测试
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController // 声明这是一个网页接口控制器
public class TestController {@Autowiredprivate MessageProducer messageProducer;/*** 创建一个网页接口,用来发送消息* @param message 要发送的消息内容,可以从浏览器传递* @return*/@GetMapping("/send") // 任何人访问 http://localhost:8080/send 就会触发这个方法public String sendMessage(@RequestParam(defaultValue = "这是一个默认消息") String message) {// 调用我们之前写的生产者,把消息发送到 "email-channel" 频道messageProducer.sendMessage(RedisConfig.EMAIL_CHANNEL, message);return "消息发送成功!内容: '" + message + "'。请查看后台控制台的接收情况。";}
}
- 控制台
发送消息 -> 频道: email-channel, 内容: 欢迎新用户_张三
收到消息 <- 内容: 欢迎新用户_张三
正在处理任务...(例如:发送邮件)
// (等待2秒)
任务处理完成!
2、Stream
- 生产者
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息生产者(Producer) - 升级版* 负责把订单信息发送到【智能传送带】(Redis Stream)*/
@Component
public class MessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送消息到Stream* @param streamKey Stream的名称,比如 "stream-email"* @param messageData 消息内容,可以是一个包含多个键值对的Map*/public void sendMessage(String streamKey, Map<String, String> messageData) {// 将Map数据转换成Stream需要的数据结构// ObjectRecord<String, Map<String, String>> record = StreamRecords.newRecord()// .ofObject(messageData) // 设置消息体// .withStreamKey(streamKey); // 设置Stream的KeySystem.out.println("发送消息到Stream -> Stream: " + streamKey + ", 内容: " + messageData);// 使用 xadd 命令将消息添加到Stream的末尾redisTemplate.opsForStream().add(streamKey, messageData);}
}
- 消费者
package com.example.redismqdemo;import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** 消息消费者(Consumer) - 升级版* 模拟一个“奶茶师傅班组”的工作流程*/
@Component
public class StreamConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;// --- Stream基本信息 ---public static final String STREAM_KEY = "stream-email"; // 智能传送带的名称public static final String GROUP_NAME = "email-group"; // 奶茶师傅班组的名称public static final String CONSUMER_NAME_PREFIX = "consumer-"; // 师傅名称前缀// --- 死信队列信息 ---public static final String DLQ_STREAM_KEY = "stream-email-dlq"; // 疑难问题板(死信队列)public static final int MAX_RETRIES = 3; // 最大重试次数// 当前消费者的名字,用机器名+线程ID保证唯一private final String consumerName = CONSUMER_NAME_PREFIX + java.lang.management.ManagementFactory.getRuntimeMXBean().getName();/*** 初始化方法:项目启动时,自动创建Stream的消费者组* 就像奶茶店开门前,先确认传送带和班组都已就位*/@PostConstructpublic void init() {try {// 尝试创建消费者组。如果Stream不存在,这个命令会自动创建它。// ReadOffset.latest() 表示只从最新的消息开始消费,忽略历史消息。redisTemplate.opsForStream().createGroup(STREAM_KEY, ReadOffset.latest(), GROUP_NAME);System.out.println("消费者组 '" + GROUP_NAME + "' 创建成功或已存在。");} catch (Exception e) {// 如果Stream或组已经存在,可能会抛异常,这里可以安全地忽略System.out.println("消费者组 '" + GROUP_NAME + "' 已存在,无需创建。");}}/*** 定时任务:每隔5秒,来检查并处理传送带上的新订单* 使用 @Scheduled 注解,让Spring Boot自动周期性地调用这个方法*/@Scheduled(fixedRate = 5000) // 每5000毫秒(5秒)执行一次public void consumeMessages() {// 1. 从传送带为我们这个班组(GROUP_NAME)获取最多1条新订单// > 表示新消息,block(Duration.ofSeconds(2)) 表示如果没有新消息,愿意等待2秒List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(Consumer.from(GROUP_NAME, consumerName),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()));if (messages == null || messages.isEmpty()) {return; // 没有新消息,直接返回}// 2. 遍历获取到的新订单并处理for (MapRecord<String, Object, Object> message : messages) {Map<Object, Object> body = message.getValue();RecordId messageId = message.getId();System.out.println("消费者 '" + consumerName + "' 收到新订单: " + body + ", 订单ID: " + messageId);try {// 模拟处理订单的业务逻辑processOrder(body);// 3. 处理成功!告诉传送带,这个订单我搞定了(ACK)redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);System.out.println("订单 " + messageId + " 处理成功,已ACK。");} catch (Exception e) {// 4. 处理失败!System.err.println("订单 " + messageId + " 处理失败: " + e.getMessage());handleFailure(message);}}}/*** 模拟订单处理逻辑* @param orderData 订单数据*/private void processOrder(Map<Object, Object> orderData) {// 模拟一个可能失败的操作if ("fail".equals(orderData.get("action"))) {throw new RuntimeException("原料不足,无法制作!");}// 正常的处理逻辑System.out.println("正在处理订单: " + orderData);}/*** 专门处理失败的订单(消息)* @param failedMessage 失败的消息记录*/private void handleFailure(MapRecord<String, Object, Object> failedMessage) {RecordId messageId = failedMessage.getId();String streamKey = failedMessage.getStream();Map<Object, Object> body = failedMessage.getValue();// 查询这条消息被投递了几次(检查重试次数)// pending方法可以查询一个组里,哪些消息被拿走但还没ACKPendingMessagesSummary pendingSummary = redisTemplate.opsForStream().pending(streamKey, GROUP_NAME);long deliveryCount = pendingSummary.getRecordsFor(messageId) != null ?pendingSummary.getRecordsFor(messageId).getTotalDeliveryCount() : 1;System.out.println("订单 " + messageId + " 已被尝试处理 " + deliveryCount + " 次。");if (deliveryCount >= MAX_RETRIES) {// 达到最大重试次数,放入死信队列System.err.println("达到最大重试次数(" + MAX_RETRIES + "),将订单 " + messageId + " 移入死信队列 " + DLQ_STREAM_KEY);// 将原始消息内容和失败信息一起,发送到DLQ// 注意:我们这里只是简单地用add方法,实际生产中可能会添加更多错误信息redisTemplate.opsForStream().add(DLQ_STREAM_KEY, body);// 在原Stream中确认(ACK)此消息,防止无限重试redisTemplate.opsForStream().acknowledge(streamKey, GROUP_NAME, messageId);} else {// 未达到最大重-试次数,不做任何事。// 消息会保留在“待处理列表”(Pending Entries List)中,// 其他消费者可以通过 XCLAIM 或者等待超时后自动重新投递来再次消费。// 这里我们为了简单,就让它暂时悬挂,等待下次被重新发现。System.out.println("订单 " + messageId + " 将等待下一次重试。");}}
}
- 开启定时任务
package com.example.redismqdemo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling; // 导入@SpringBootApplication
@EnableScheduling // <-- 添加这个注解
public class RedisMqDemoApplication {public static void main(String[] args) {SpringApplication.run(RedisMqDemoApplication.class, args);}
}
- 测试
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;@RestController
public class TestController {@Autowiredprivate MessageProducer messageProducer;/*** 发送一个成功的消息*/@GetMapping("/send-success")public String sendSuccessMessage() {Map<String, String> message = new HashMap<>();message.put("userId", "1001");message.put("email", "user1@example.com");message.put("content", "Welcome to our platform!");messageProducer.sendMessage(StreamConsumer.STREAM_KEY, message);return "发送了一个【成功】消息,请观察后台日志。";}/*** 发送一个注定会失败的消息*/@GetMapping("/send-fail")public String sendFailMessage() {Map<String, String> message = new HashMap<>();message.put("userId", "9999");message.put("action", "fail"); // 特殊标记,让消费者处理时抛出异常message.put("reason", "This message is designed to fail.");messageProducer.sendMessage(StreamConsumer.STREAM_KEY, message);return "发送了一个【失败】消息,请观察后台的重试和死信队列逻辑。";}
}