Spring Boot 使用 Redis 实现消息队列
Spring Boot 使用 Redis 实现消息队列
一、概念引入:Redis 消息队列是什么?
你可以把 Redis 消息队列 想象成一个「留言板」系统。
想象一下,你和朋友在一个繁忙的咖啡厅里,但你们不能直接交流。你可以把想说的话写在便签上,贴在留言板上,朋友稍后过来看留言板就能看到你的消息。Redis 消息队列就是这样的"数字留言板":
- 生产者:写便签的人(把消息放到队列中)
- Redis 队列:留言板(存储消息的地方)
- 消费者:看留言板的人(从队列中取出消息处理)
Redis 本身是一个内存数据库,但它的数据结构(如 List、Stream)可以轻松实现消息队列功能,比传统的消息中间件更轻量级,部署更简单。
二、原理讲解:Redis 消息队列的核心机制
1. Redis 实现消息队列的三种方式
a) 基于 List 的简单队列
Redis 的 List 数据结构天然支持队列操作:
LPUSH/RPUSH:从列表左/右侧推入元素(入队)RPOP/LPOP:从列表右/左侧弹出元素(出队)
工作原理:
生产者: LPUSH queue_name "消息1" -> [消息1]
生产者: LPUSH queue_name "消息2" -> [消息2, 消息1]
消费者: RPOP queue_name -> 返回"消息1", 队列变为[消息2]
b) 基于 Pub/Sub 的发布订阅模式
Redis 的 Pub/Sub 功能实现消息广播:
- 生产者发布消息到频道(Channel)
- 多个消费者订阅同一频道,都能收到消息
工作原理:
生产者: PUBLISH channel_name "消息" -> 所有订阅者收到
消费者: SUBSCRIBE channel_name -> 等待接收消息
c) 基于 Stream 的高级队列(Redis 5.0+)
Redis Stream 是专门为消息队列设计的数据结构:
- 支持消息持久化
- 支持消费者组(Consumer Group)
- 支持消息确认(ACK)机制
- 支持消息回溯
工作原理:
生产者: XADD stream_name * field value -> 添加消息到流
消费者组: XREADGROUP GROUP group_name consumer_name COUNT 1 STREAMS stream_name >
2. Redis 消息队列 vs 传统消息中间件
| 特性 | Redis 队列 | RabbitMQ/Kafka |
|---|---|---|
| 部署复杂度 | 简单(单机) | 复杂(集群) |
| 消息持久化 | 有限(Stream支持) | 完善 |
| 吞吐量 | 高(内存操作) | 中高 |
| 消息确认 | 有限(Stream支持) | 完善 |
| 适用场景 | 轻量级、高吞吐 | 企业级、可靠性要求高 |
三、场景应用:什么时候适合用 Redis 消息队列?
1. 异步任务处理
场景:用户注册后需要发送欢迎邮件、初始化用户数据等耗时操作。
传统方式:
用户注册 -> 发送邮件(2秒) -> 初始化数据(1秒) -> 返回成功(总耗时3秒)
使用 Redis 队列:
用户注册 -> 写入队列(0.01秒) -> 返回成功
后台进程 -> 从队列读取 -> 发送邮件 -> 初始化数据
2. 流量削峰
场景:秒杀活动中,瞬间有大量请求涌入系统。
传统方式:直接处理请求可能导致数据库崩溃
使用 Redis 队列:将请求放入队列,后台按处理能力逐步消费
3. 系统解耦
场景:订单系统需要通知库存系统、物流系统、支付系统。
传统方式:订单系统直接调用各系统接口,耦合度高
使用 Redis 队列:订单系统发送消息到队列,各系统自行订阅处理
四、代码实现:Spring Boot 整合 Redis 消息队列
1. 项目准备
a) 添加依赖
<!-- pom.xml -->
<dependencies><!-- Spring Boot Starter Data Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Spring Boot Starter Web (如果需要Web接口) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Lombok (简化代码) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
b) 配置 Redis 连接
# application.yml
spring:redis:host: localhostport: 6379password: # 如果有密码database: 0timeout: 3000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0
2. 基于 List 的简单队列实现
a) 消息生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;/*** 基于 List 的消息队列生产者*/
@Slf4j
@Service
public class ListQueueProducer {@Autowiredprivate StringRedisTemplate redisTemplate;// 队列名称private static final String QUEUE_NAME = "simple:queue";/*** 发送消息到队列* @param message 消息内容*/public void sendMessage(String message) {try {// 使用 LPUSH 将消息添加到队列左侧redisTemplate.opsForList().leftPush(QUEUE_NAME, message);log.info("消息发送成功: {}", message);} catch (Exception e) {log.error("消息发送失败: {}", message, e);}}/*** 发送延迟消息(通过设置过期时间实现)* @param message 消息内容* @param delayTime 延迟时间* @param timeUnit 时间单位*/public void sendDelayedMessage(String message, long delayTime, TimeUnit timeUnit) {try {// 将消息放入延迟队列String delayedQueue = QUEUE_NAME + ":delayed";redisTemplate.opsForList().rightPush(delayedQueue, message);// 设置延迟队列的过期时间(简化实现,实际生产环境需要更精确的延迟队列)redisTemplate.expire(delayedQueue, delayTime, timeUnit);log.info("延迟消息发送成功: {}, 延迟时间: {} {}", message, delayTime, timeUnit);} catch (Exception e) {log.error("延迟消息发送失败: {}", message, e);}}
}
b) 消息消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;/*** 基于 List 的消息队列消费者*/
@Slf4j
@Service
public class ListQueueConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;// 队列名称private static final String QUEUE_NAME = "simple:queue";/*** 启动时开始消费消息*/@PostConstructpublic void startConsumer() {// 在实际应用中,这应该在单独的线程中运行// 这里简化处理,使用定时任务模拟log.info("消息队列消费者已启动");}/*** 定时从队列中获取消息(模拟持续监听)* 在实际应用中,这应该在独立线程中使用阻塞式操作*/@Scheduled(fixedDelay = 1000) // 每秒执行一次public void consumeMessage() {try {// 使用 BRPOP 阻塞式获取消息(这里简化为非阻塞)String message = redisTemplate.opsForList().rightPop(QUEUE_NAME);if (message != null) {// 处理消息processMessage(message);}} catch (Exception e) {log.error("消费消息失败", e);}}/*** 处理消息的业务逻辑* @param message 消息内容*/private void processMessage(String message) {log.info("开始处理消息: {}", message);try {// 模拟业务处理Thread.sleep(500); // 模拟处理耗时// 这里可以添加具体的业务逻辑// 例如:发送邮件、更新数据库等log.info("消息处理完成: {}", message);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("消息处理被中断: {}", message);} catch (Exception e) {log.error("消息处理失败: {}", message, e);// 可以考虑将失败的消息放入重试队列或死信队列}}
}
3. 基于 Pub/Sub 的发布订阅实现
a) 消息发布者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;/*** 基于 Pub/Sub 的消息发布者*/
@Slf4j
@Service
public class PubSubPublisher {@Autowiredprivate StringRedisTemplate redisTemplate;// 频道名称private static final String CHANNEL_NAME = "notifications";/*** 发布消息到指定频道* @param message 消息内容*/public void publishMessage(String message) {try {// 使用 convertAndSend 发布消息redisTemplate.convertAndSend(CHANNEL_NAME, message);log.info("消息发布成功: {}", message);} catch (Exception e) {log.error("消息发布失败: {}", message, e);}}/*** 发布对象消息(自动序列化为JSON)* @param object 消息对象*/public void publishObject(Object object) {try {// 在实际应用中,可能需要使用JSON序列化工具// 这里简化处理,直接使用toString()String message = object.toString();redisTemplate.convertAndSend(CHANNEL_NAME, message);log.info("对象消息发布成功: {}", message);} catch (Exception e) {log.error("对象消息发布失败: {}", object, e);}}
}
b) 消息订阅者
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;/*** 基于 Pub/Sub 的消息订阅者*/
@Slf4j
@Service
public class PubSubSubscriber implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {try {// 获取频道名称String channel = new String(message.getChannel());// 获取消息内容String messageBody = new String(message.getBody());log.info("接收到消息 - 频道: {}, 内容: {}", channel, messageBody);// 处理消息processMessage(messageBody);} catch (Exception e) {log.error("处理订阅消息失败", e);}}/*** 处理消息的业务逻辑* @param message 消息内容*/private void processMessage(String message) {log.info("开始处理订阅消息: {}", message);try {// 模拟业务处理Thread.sleep(200);// 这里可以添加具体的业务逻辑// 例如:更新UI、发送通知等log.info("订阅消息处理完成: {}", message);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("订阅消息处理被中断: {}", message);} catch (Exception e) {log.error("订阅消息处理失败: {}", message, e);}}
}
c) Redis 监听器配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** Redis 消息监听器配置*/
@Configuration
public class RedisListenerConfig {/*** 配置消息监听容器* @param connectionFactory Redis连接工厂* @param pubSubSubscriber 消息订阅者* @return 监听容器*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,PubSubSubscriber pubSubSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅指定频道container.addMessageListener(pubSubSubscriber, new ChannelTopic("notifications"));return container;}
}
4. 基于 Stream 的高级队列实现(Redis 5.0+)
a) 消息生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.Map;/*** 基于 Stream 的消息队列生产者*/
@Slf4j
@Service
public class StreamQueueProducer {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// 流名称private static final String STREAM_NAME = "mystream";/*** 发送消息到流* @param message 消息内容* @return 消息ID*/public String sendMessage(String message) {try {// 创建消息记录ObjectRecord<String, String> record = StreamRecords.newRecord().ofObject(message).withStreamKey(STREAM_NAME);// 发送消息并获取消息IDString messageId = redisTemplate.opsForStream().add(record).getValue();log.info("消息发送成功 - ID: {}, 内容: {}", messageId, message);return messageId;} catch (Exception e) {log.error("消息发送失败: {}", message, e);return null;}}/*** 发送带字段的消息* @param fields 消息字段* @return 消息ID*/public String sendMessageWithFields(Map<String, Object> fields) {try {// 发送带字段的消息String messageId = redisTemplate.opsForStream().add(STREAM_NAME, fields).getValue();log.info("字段消息发送成功 - ID: {}, 字段: {}", messageId, fields);return messageId;} catch (Exception e) {log.error("字段消息发送失败: {}", fields, e);return null;}}
}
b) 消息消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;/*** 基于 Stream 的消息队列消费者*/
@Slf4j
@Service
public class StreamQueueConsumer {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// 流名称private static final String STREAM_NAME = "mystream";// 消费者组名称private static final String CONSUMER_GROUP = "mygroup";// 消费者名称private static final String CONSUMER_NAME = "consumer-1";/*** 初始化消费者组*/@PostConstructpublic void initConsumerGroup() {try {// 尝试创建消费者组(如果不存在)redisTemplate.opsForStream().createGroup(STREAM_NAME, CONSUMER_GROUP);log.info("消费者组创建成功: {}", CONSUMER_GROUP);} catch (Exception e) {// 消费者组可能已存在,忽略错误log.info("消费者组可能已存在: {}", CONSUMER_GROUP);}}/*** 定时消费消息*/@Scheduled(fixedDelay = 1000) // 每秒执行一次public void consumeMessages() {try {// 从消费者组读取消息StreamOffset<String> streamOffset = StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed());Consumer consumer = Consumer.from(CONSUMER_GROUP, CONSUMER_NAME);StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofMillis(100));List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read(consumer, options, streamOffset);if (!records.isEmpty()) {log.info("接收到 {} 条消息", records.size());for (MapRecord<String, Object, Object> record : records) {try {// 处理消息processMessage(record);// 确认消息处理完成redisTemplate.opsForStream().acknowledge(STREAM_NAME, CONSUMER_GROUP, record.getId());} catch (Exception e) {log.error("处理消息失败 - ID: {}", record.getId(), e);// 处理失败的消息可以重试或放入死信队列}}}} catch (Exception e) {log.error("消费消息失败", e);}}/*** 处理消息的业务逻辑* @param record 消息记录*/private void processMessage(MapRecord<String, Object, Object> record) {String messageId = record.getId().getValue();Map<Object, Object> messageFields = record.getValue();log.info("开始处理消息 - ID: {}, 内容: {}", messageId, messageFields);try {// 模拟业务处理Thread.sleep(300);// 这里可以添加具体的业务逻辑// 例如:解析消息字段、更新数据库等log.info("消息处理完成 - ID: {}", messageId);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("消息处理被中断 - ID: {}", messageId);} catch (Exception e) {log.error("消息处理失败 - ID: {}", messageId, e);throw e; // 重新抛出异常,使消息不被确认}}
}
5. 实际应用示例:订单处理系统
a) 订单实体类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.math.BigDecimal;
import java.time.LocalDateTime;/*** 订单实体类*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {private String orderId; // 订单IDprivate String userId; // 用户IDprivate String productId; // 产品IDprivate String productName; // 产品名称private Integer quantity; // 数量private BigDecimal price; // 单价private BigDecimal totalAmount; // 总金额private LocalDateTime createTime; // 创建时间private Integer status; // 订单状态:0-待处理,1-处理中,2-已完成,-1-已取消
}
b) 订单服务(生产者)
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;/*** 订单服务(生产者)*/
@Slf4j
@Service
public class OrderService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate ObjectMapper objectMapper;// 订单队列名称private static final String ORDER_QUEUE = "order:queue";/*** 创建订单并发送到队列* @param userId 用户ID* @param productId 产品ID* @param productName 产品名称* @param quantity 数量* @param price 单价* @return 订单ID*/public String createOrder(String userId, String productId, String productName, Integer quantity, BigDecimal price) {try {// 创建订单Order order = new Order();order.setOrderId(UUID.randomUUID().toString().replace("-", ""));order.setUserId(userId);order.setProductId(productId);order.setProductName(productName);order.setQuantity(quantity);order.setPrice(price);order.setTotalAmount(price.multiply(new BigDecimal(quantity)));order.setCreateTime(LocalDateTime.now());order.setStatus(0); // 待处理// 将订单转换为JSON字符串String orderJson = objectMapper.writeValueAsString(order);// 将订单发送到队列redisTemplate.opsForList().leftPush(ORDER_QUEUE, orderJson);log.info("订单创建成功并发送到队列: {}", order.getOrderId());return order.getOrderId();} catch (Exception e) {log.error("创建订单失败", e);throw new RuntimeException("创建订单失败", e);}}
}
c) 订单处理器(消费者)
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;/*** 订单处理器(消费者)*/
@Slf4j
@Service
public class OrderProcessor {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate ObjectMapper objectMapper;// 订单队列名称private static final String ORDER_QUEUE = "order:queue";// 重试队列名称private static final String RETRY_QUEUE = "order:retry:queue";// 死信队列名称private static final String DEAD_LETTER_QUEUE = "order:dead:queue";/*** 启动订单处理器*/@PostConstructpublic void init() {log.info("订单处理器已启动");}/*** 定时处理订单*/@Scheduled(fixedDelay = 1000) // 每秒执行一次public void processOrders() {try {// 从队列中获取订单String orderJson = redisTemplate.opsForList().rightPop(ORDER_QUEUE);if (orderJson != null) {// 处理订单processOrder(orderJson);}// 处理重试队列中的订单processRetryOrders();} catch (Exception e) {log.error("处理订单失败", e);}}/*** 处理单个订单* @param orderJson 订单JSON字符串*/private void processOrder(String orderJson) {try {// 解析订单Order order = objectMapper.readValue(orderJson, Order.class);log.info("开始处理订单: {}", order.getOrderId());// 更新订单状态为处理中order.setStatus(1);// 模拟订单处理// 1. 检查库存boolean stockAvailable = checkInventory(order.getProductId(), order.getQuantity());if (!stockAvailable) {log.warn("订单 {} 库存不足", order.getOrderId());// 将订单放入重试队列addToRetryQueue(orderJson);return;}// 2. 扣减库存reduceInventory(order.getProductId(), order.getQuantity());// 3. 计算优惠BigDecimal discount = calculateDiscount(order);// 4. 更新订单状态为已完成order.setStatus(2);// 5. 保存订单到数据库(这里简化处理)saveOrderToDatabase(order);// 6. 发送通知(可以发送到另一个队列)sendNotification(order);log.info("订单处理完成: {}", order.getOrderId());} catch (Exception e) {log.error("处理订单失败: {}", orderJson, e);// 将失败订单放入重试队列addToRetryQueue(orderJson);}}/*** 处理重试队列中的订单*/private void processRetryOrders() {try {// 获取重试队列中的订单String orderJson = redisTemplate.opsForList().rightPop(RETRY_QUEUE);if (orderJson != null) {// 解析订单Order order = objectMapper.readValue(orderJson, Order.class);// 检查重试次数(这里简化处理,实际应该记录重试次数)// 如果重试次数超过阈值,放入死信队列// 否则重新处理// 这里简化处理,直接重新处理log.info("重新处理订单: {}", order.getOrderId());processOrder(orderJson);}} catch (Exception e) {log.error("处理重试订单失败", e);}}/*** 将订单添加到重试队列* @param orderJson 订单JSON字符串*/private void addToRetryQueue(String orderJson) {try {redisTemplate.opsForList().leftPush(RETRY_QUEUE, orderJson);log.info("订单已添加到重试队列");} catch (Exception e) {log.error("添加订单到重试队列失败", e);// 如果重试队列也失败,放入死信队列addToDeadLetterQueue(orderJson);}}/*** 将订单添加到死信队列* @param orderJson 订单JSON字符串*/private void addToDeadLetterQueue(String orderJson) {try {redisTemplate.opsForList().leftPush(DEAD_LETTER_QUEUE, orderJson);log.warn("订单已添加到死信队列");} catch (Exception e) {log.error("添加订单到死信队列失败", e);}}// 以下是模拟的业务方法private boolean checkInventory(String productId, Integer quantity) {// 模拟检查库存// 90%的概率有库存return Math.random() > 0.1;}private void reduceInventory(String productId, Integer quantity) {// 模拟扣减库存log.info("产品 {} 库存已扣减 {}", productId, quantity);}private BigDecimal calculateDiscount(Order order) {// 模拟计算优惠return BigDecimal.ZERO;}private void saveOrderToDatabase(Order order) {// 模拟保存订单到数据库log.info("订单 {} 已保存到数据库", order.getOrderId());}private void sendNotification(Order order) {// 模拟发送通知log.info("订单 {} 通知已发送", order.getOrderId());}
}
d) 控制器(提供Web接口)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.*;import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 订单控制器*/
@Slf4j
@RestController
@RequestMapping("/api/orders")
public class OrderController {@Autowiredprivate OrderService orderService;@Autowiredprivate StringRedisTemplate redisTemplate;// 死信队列名称private static final String DEAD_LETTER_QUEUE = "order:dead:queue";/*** 创建订单*/@PostMappingpublic Map<String, Object> createOrder(@RequestBody Map<String, Object> request) {Map<String, Object> response = new HashMap<>();try {String userId = (String) request.get("userId");String productId = (String) request.get("productId");String productName = (String) request.get("productName");Integer quantity = (Integer) request.get("quantity");BigDecimal price = new BigDecimal(request.get("price").toString());String orderId = orderService.createOrder(userId, productId, productName, quantity, price);response.put("success", true);response.put("orderId", orderId);response.put("message", "订单创建成功");} catch (Exception e) {log.error("创建订单失败", e);response.put("success", false);response.put("message", "创建订单失败: " + e.getMessage());}return response;}/*** 查看死信队列中的订单*/@GetMapping("/dead-letters")public Map<String, Object> getDeadLetterOrders() {Map<String, Object> response = new HashMap<>();try {// 获取死信队列中的所有订单List<String> orders = redisTemplate.opsForList().range(DEAD_LETTER_QUEUE, 0, -1);response.put("success", true);response.put("count", orders != null ? orders.size() : 0);response.put("orders", orders);} catch (Exception e) {log.error("获取死信订单失败", e);response.put("success", false);response.put("message", "获取死信订单失败: " + e.getMessage());}return response;}
}
五、问题解决:常见问题与解决方案
1. 消息丢失问题
问题描述
在高并发或系统异常情况下,消息可能会丢失。
解决方案
a) 使用持久化机制
# application.yml
spring:redis:# 开启持久化# 注意:这需要在Redis服务器端配置
b) 使用 Redis Stream 替代 List
// Redis Stream 支持持久化,即使Redis重启消息也不会丢失
String messageId = redisTemplate.opsForStream().add(record).getValue();
c) 生产者确认机制
// 发送消息后检查返回值
Long result = redisTemplate.opsForList().leftPush(QUEUE_NAME, message);
if (result != null && result > 0) {// 发送成功
} else {// 发送失败,重试或记录日志
}
2. 消息重复消费问题
问题描述
由于网络问题或消费者异常,可能导致消息被重复消费。
解决方案
a) 消息去重表
// 使用Redis Set实现消息去重
private boolean isMessageProcessed(String messageId) {String key = "processed:messages";return !redisTemplate.opsForSet().add(key, messageId);
}
b) 幂等性设计
// 在处理消息前检查是否已处理
public void processMessage(String messageId, String message) {// 检查消息是否已处理if (isMessageProcessed(messageId)) {log.info("消息已处理,跳过: {}", messageId);return;}// 处理消息...// 标记消息已处理markMessageAsProcessed(messageId);
}
3. 消息积压问题
问题描述
生产者生产速度大于消费者消费速度,导致消息积压。
解决方案
a) 动态增加消费者
// 根据队列长度动态调整消费者数量
@Scheduled(fixedDelay = 5000)
public void adjustConsumers() {Long queueSize = redisTemplate.opsForList().size(QUEUE_NAME);int currentConsumers = getCurrentConsumerCount();// 如果队列积压严重,增加消费者if (queueSize > 1000 && currentConsumers < MAX_CONSUMERS) {addNewConsumer();}// 如果队列为空,减少消费者else if (queueSize == 0 && currentConsumers > MIN_CONSUMERS) {removeConsumer();}
}
b) 消息优先级处理
// 使用有序集合实现优先级队列
public void sendPriorityMessage(String message, int priority) {redisTemplate.opsForZSet().add(PRIORITY_QUEUE, message, priority);
}// 消费高优先级消息
public String consumeHighPriorityMessage() {Set<String> messages = redisTemplate.opsForZSet().reverseRange(PRIORITY_QUEUE, 0, 0);if (!messages.isEmpty()) {String message = messages.iterator().next();redisTemplate.opsForZSet().remove(PRIORITY_QUEUE, message);return message;}return null;
}
4. 消息顺序性问题
问题描述
在某些业务场景中,需要保证消息的顺序性。
解决方案
a) 单消费者队列
// 对于需要保证顺序的消息,使用单消费者
@RabbitListener(queues = "ordered.queue", concurrency = "1")
public void processOrderedMessage(String message) {// 处理有序消息
}
b) 分片队列
// 根据消息键分片到不同队列
public void sendMessage(String key, String message) {// 计算分片int shard = Math.abs(key.hashCode()) % SHARD_COUNT;String queueName = QUEUE_NAME + ":" + shard;// 发送到对应分片队列redisTemplate.opsForList().leftPush(queueName, message);
}
5. 消息监控问题
问题描述
需要监控消息队列的状态,及时发现异常。
解决方案
a) 队列监控接口
@RestController
@RequestMapping("/api/monitor")
public class QueueMonitorController {@Autowiredprivate StringRedisTemplate redisTemplate;@GetMapping("/queue/status")public Map<String, Object> getQueueStatus() {Map<String, Object> status = new HashMap<>();// 获取队列长度Long queueSize = redisTemplate.opsForList().size(QUEUE_NAME);status.put("queueSize", queueSize);// 获取消费者数量(如果有注册)Integer consumerCount = getConsumerCount();status.put("consumerCount", consumerCount);// 计算处理速度Double processingRate = calculateProcessingRate();status.put("processingRate", processingRate);return status;}
}
b) 告警机制
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void checkQueueHealth() {Long queueSize = redisTemplate.opsForList().size(QUEUE_NAME);// 如果队列积压超过阈值,发送告警if (queueSize > WARNING_THRESHOLD) {alertService.sendAlert("队列积压严重: " + queueSize);}
}
六、总结提炼
1. Redis 消息队列的核心优势
- 轻量级:无需额外部署,直接使用Redis
- 高性能:基于内存操作,吞吐量高
- 简单易用:API简洁,上手快
- 多种实现:支持List、Pub/Sub、Stream等多种方式
2. 三种实现方式的适用场景
| 实现方式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| List | 简单队列、任务队列 | 实现简单、性能高 | 不支持消息确认、不支持消费者组 |
| Pub/Sub | 实时通知、广播 | 支持多消费者、实时性高 | 消息不持久化、可能丢失 |
| Stream | 可靠消息队列 | 支持持久化、消费者组、消息确认 | 实现相对复杂、需要Redis 5.0+ |
3. 实施步骤总结
- 需求分析:明确业务场景,选择合适的实现方式
- 环境准备:安装配置Redis,添加Spring Boot依赖
- 队列设计:定义队列名称、消息格式、处理流程
- 生产者实现:编写消息发送逻辑
- 消费者实现:编写消息处理逻辑,考虑异常处理
- 测试验证:进行功能测试和性能测试
- 监控告警:实现队列监控和异常告警
4. 最佳实践建议
- 消息设计:消息应包含唯一ID和时间戳,便于追踪和去重
- 异常处理:实现重试机制和死信队列,确保消息不丢失
- 性能优化:根据业务特点调整消费者数量和批处理大小
- 监控告警:实时监控队列状态,及时发现问题
- 容量规划:评估消息量和大小,合理配置Redis内存
