使用RedisTemplate设计一个消息队列?
使用RedisTemplate设计消息队列
下面我将详细介绍如何使用Spring Boot的RedisTemplate来设计各种类型的消息队列。
前言
在代码实现之前,我们先了解一下消息队列的基本需求。要保证数据的可靠性与性能之间提供选择。
1. 核心需求分析
在设计消息队列之前,首先要明确核心需求:
基本功能需求
- 消息发送:生产者能够发送消息到队列
- 消息接收:消费者能够从队列获取消息
- 消息存储:消息需要持久化存储,防止丢失
- 顺序保证:确保消息的消费顺序(FIFO或其他策略)
- 可靠性:消息不丢失,至少被消费一次
高级功能需求
- 消息确认机制:确保消息被成功处理
- 重试机制:处理失败的消息可以重新投递
- 延迟消息:支持定时或延迟发送
- 优先级队列:不同优先级的消息处理顺序不同
- 死信队列:处理多次重试失败的消息
- 监控管理:队列状态监控和管理功能
2. 架构设计思路
2.1 核心组件设计
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │───▶│ Message │───▶│ Consumer │
│ (生产者) │ │ Queue │ │ (消费者) │
└─────────────┘ └─────────────┘ └─────────────┘│┌────┴────┐│ Storage ││ (存储) │└─────────┘
2.2 数据流设计
- 生产者 → 队列:发送消息
- 队列 → 存储:持久化消息
- 队列 → 消费者:分发消息
- 消费者 → 队列:确认消息处理
3. 存储设计思路
3.1 存储选型考虑
- 内存存储:高性能,但数据易失
- 磁盘存储:数据持久,但性能较低
- 混合存储:内存+磁盘,兼顾性能和持久性
一. 项目配置
Maven依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>
Redis配置类
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer来序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.activateDefaultTyping(LazyCollectionDefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);serializer.setObjectMapper(mapper);template.setValueSerializer(serializer);template.setHashValueSerializer(serializer);template.afterPropertiesSet();return template;}@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {return new StringRedisTemplate(factory);}
}
二. 基本消息队列实现
消息实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class QueueMessage implements Serializable {private String id;private String topic;private Object payload;private Long timestamp;private Map<String, Object> headers;public QueueMessage(String topic, Object payload) {this.id = UUID.randomUUID().toString();this.topic = topic;this.payload = payload;this.timestamp = System.currentTimeMillis();this.headers = new HashMap<>();}public void addHeader(String key, Object value) {this.headers.put(key, value);}
}
基础队列服务
@Service
public class BasicQueueService {private static final Logger logger = LoggerFactory.getLogger(BasicQueueService.class);@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送消息到队列*/public void sendMessage(String queueName, Object message) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}redisTemplate.opsForList().leftPush(queueName, queueMessage);logger.info("Sent message to queue [{}]: {}", queueName, queueMessage.getId());}/*** 批量发送消息*/public void batchSendMessages(String queueName, List<Object> messages) {List<Object> queueMessages = messages.stream().map(msg -> {if (msg instanceof QueueMessage) {return msg;} else {return new QueueMessage(queueName, msg);}}).collect(Collectors.toList());redisTemplate.opsForList().leftPushAll(queueName, queueMessages);logger.info("Batch sent {} messages to queue [{}]", messages.size(), queueName);}/*** 阻塞获取消息*/public QueueMessage blockingReceiveMessage(String queueName, long timeout, TimeUnit unit) {Object result = redisTemplate.opsForList().rightPop(queueName, timeout, unit);if (result instanceof QueueMessage) {QueueMessage message = (QueueMessage) result;logger.info("Received message from queue [{}]: {}", queueName, message.getId());return message;}return null;}/*** 非阻塞获取消息*/public QueueMessage receiveMessage(String queueName) {Object result = redisTemplate.opsForList().rightPop(queueName);if (result instanceof QueueMessage) {QueueMessage message = (QueueMessage) result;logger.info("Received message from queue [{}]: {}", queueName, message.getId());return message;}return null;}/*** 获取队列长度*/public long getQueueSize(String queueName) {Long size = redisTemplate.opsForList().size(queueName);return size != null ? size : 0;}/*** 查看消息但不消费*/public List<QueueMessage> peekMessages(String queueName, int count) {List<Object> messages = redisTemplate.opsForList().range(queueName, 0, count - 1);return messages.stream().filter(msg -> msg instanceof QueueMessage).map(msg -> (QueueMessage) msg).collect(Collectors.toList());}
}
三. 可靠消息队列(支持消息确认)
@Service
public class ReliableQueueService {private static final Logger logger = LoggerFactory.getLogger(ReliableQueueService.class);@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送消息到主队列,同时备份到处理中队列*/public void sendReliableMessage(String queueName, Object message) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}String processingQueue = getProcessingQueueName(queueName);// 使用事务确保原子性操作redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();operations.opsForList().leftPush(queueName, queueMessage);operations.opsForHash().put(processingQueue, queueMessage.getId(), queueMessage);return operations.exec();}});logger.info("Sent reliable message to queue [{}]: {}", queueName, queueMessage.getId());}/*** 获取并处理消息(移动到处理中状态)*/public QueueMessage receiveAndProcess(String queueName) {String processingQueue = getProcessingQueueName(queueName);// 从主队列移动到处理队列Object message = redisTemplate.opsForList().rightPopAndLeftPush(queueName, processingQueue);if (message instanceof QueueMessage) {QueueMessage queueMessage = (QueueMessage) message;logger.info("Message moved to processing: {}", queueMessage.getId());return queueMessage;}return null;}/*** 确认消息处理完成*/public void acknowledge(String queueName, String messageId) {String processingQueue = getProcessingQueueName(queueName);redisTemplate.opsForHash().delete(processingQueue, messageId);logger.info("Message acknowledged: {}", messageId);}/*** 消息处理失败,重新入队*/public void requeue(String queueName, String messageId) {String processingQueue = getProcessingQueueName(queueName);redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();QueueMessage message = (QueueMessage) operations.opsForHash().get(processingQueue, messageId);if (message != null) {operations.opsForList().leftPush(queueName, message);operations.opsForHash().delete(processingQueue, messageId);}return operations.exec();}});logger.info("Message requeued: {}", messageId);}/*** 恢复处理中的消息(应用重启时调用)*/public void recoverProcessingMessages(String queueName) {String processingQueue = getProcessingQueueName(queueName);Map<Object, Object> processingMessages = redisTemplate.opsForHash().entries(processingQueue);if (!processingMessages.isEmpty()) {logger.info("Recovering {} processing messages for queue [{}]", processingMessages.size(), queueName);redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();for (Object message : processingMessages.values()) {operations.opsForList().leftPush(queueName, message);}operations.delete(processingQueue);return operations.exec();}});}}/*** 获取处理中的消息数量*/public long getProcessingCount(String queueName) {String processingQueue = getProcessingQueueName(queueName);Long size = redisTemplate.opsForHash().size(processingQueue);return size != null ? size : 0;}private String getProcessingQueueName(String queueName) {return queueName + ":processing";}
}
四. 延迟队列实现
@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate BasicQueueService basicQueueService;/*** 发送延迟消息*/public void sendDelayedMessage(String targetQueue, Object message, long delay, TimeUnit timeUnit) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(targetQueue, message);}// 添加延迟相关信息queueMessage.addHeader("targetQueue", targetQueue);queueMessage.addHeader("deliverTime", System.currentTimeMillis() + timeUnit.toMillis(delay));String delayedQueue = getDelayedQueueName();double score = System.currentTimeMillis() + timeUnit.toMillis(delay);redisTemplate.opsForZSet().add(delayedQueue, queueMessage, score);}/*** 处理到期的延迟消息*/@Scheduled(fixedRate = 5000) // 每5秒执行一次public void processDelayedMessages() {String delayedQueue = getDelayedQueueName();long currentTime = System.currentTimeMillis();// 获取所有到期的消息Set<Object> expiredMessages = redisTemplate.opsForZSet().rangeByScore(delayedQueue, 0, currentTime);if (expiredMessages != null && !expiredMessages.isEmpty()) {for (Object messageObj : expiredMessages) {if (messageObj instanceof QueueMessage) {QueueMessage message = (QueueMessage) messageObj;String targetQueue = (String) message.getHeaders().get("targetQueue");// 发送到目标队列basicQueueService.sendMessage(targetQueue, message);// 从延迟队列中移除redisTemplate.opsForZSet().remove(delayedQueue, message);}}}}/*** 获取延迟队列中的消息数量*/public long getDelayedMessageCount() {String delayedQueue = getDelayedQueueName();Long count = redisTemplate.opsForZSet().size(delayedQueue);return count != null ? count : 0;}private String getDelayedQueueName() {return "delayed:queue";}
}
五. 优先级队列实现
@Service
public class PriorityQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送优先级消息*/public void sendPriorityMessage(String queueName, Object message, int priority) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}queueMessage.addHeader("priority", priority);// 使用不同的队列表示不同优先级String priorityQueue = getPriorityQueueName(queueName, priority);redisTemplate.opsForList().leftPush(priorityQueue, queueMessage);}/*** 按优先级获取消息(高优先级先出)*/public QueueMessage receivePriorityMessage(String queueName, int maxPriority) {for (int priority = maxPriority; priority >= 0; priority--) {String priorityQueue = getPriorityQueueName(queueName, priority);Object message = redisTemplate.opsForList().rightPop(priorityQueue);if (message instanceof QueueMessage) {return (QueueMessage) message;}}return null;}/*** 阻塞式按优先级获取消息*/public QueueMessage blockingReceivePriorityMessage(String queueName, int maxPriority, long timeout, TimeUnit unit) {List<String> priorityQueues = new ArrayList<>();for (int priority = maxPriority; priority >= 0; priority--) {priorityQueues.add(getPriorityQueueName(queueName, priority));}// 使用BRPOP命令,按优先级顺序检查队列for (String queue : priorityQueues) {Object message = redisTemplate.opsForList().rightPop(queue, timeout, unit);if (message instanceof QueueMessage) {return (QueueMessage) message;}}return null;}private String getPriorityQueueName(String baseQueue, int priority) {return baseQueue + ":priority:" + priority;}
}
六. 完整的队列管理器
@Service
public class RedisQueueManager {@Autowiredprivate BasicQueueService basicQueueService;@Autowiredprivate ReliableQueueService reliableQueueService;@Autowiredprivate DelayedQueueService delayedQueueService;@Autowiredprivate PriorityQueueService priorityQueueService;/*** 发送简单消息*/public void send(String queueName, Object message) {basicQueueService.sendMessage(queueName, message);}/*** 发送可靠消息*/public void sendReliable(String queueName, Object message) {reliableQueueService.sendReliableMessage(queueName, message);}/*** 发送延迟消息*/public void sendDelayed(String targetQueue, Object message, long delay, TimeUnit timeUnit) {delayedQueueService.sendDelayedMessage(targetQueue, message, delay, timeUnit);}/*** 发送优先级消息*/public void sendPriority(String queueName, Object message, int priority) {priorityQueueService.sendPriorityMessage(queueName, message, priority);}/*** 接收消息*/public QueueMessage receive(String queueName) {return basicQueueService.receiveMessage(queueName);}/*** 阻塞接收消息*/public QueueMessage blockingReceive(String queueName, long timeout, TimeUnit unit) {return basicQueueService.blockingReceiveMessage(queueName, timeout, unit);}/*** 接收并处理可靠消息*/public QueueMessage receiveAndProcess(String queueName) {return reliableQueueService.receiveAndProcess(queueName);}/*** 确认消息处理完成*/public void acknowledge(String queueName, String messageId) {reliableQueueService.acknowledge(queueName, messageId);}/*** 获取队列统计信息*/public Map<String, Object> getQueueStats(String queueName) {Map<String, Object> stats = new HashMap<>();stats.put("queueSize", basicQueueService.getQueueSize(queueName));stats.put("processingCount", reliableQueueService.getProcessingCount(queueName));stats.put("delayedCount", delayedQueueService.getDelayedMessageCount());return stats;}
}
七. 控制器示例
@RestController
@RequestMapping("/api/queue")
public class QueueController {@Autowiredprivate RedisQueueManager queueManager;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestParam String queueName, @RequestBody Object message) {queueManager.send(queueName, message);return ResponseEntity.ok("Message sent successfully");}@PostMapping("/send/reliable")public ResponseEntity<String> sendReliableMessage(@RequestParam String queueName, @RequestBody Object message) {queueManager.sendReliable(queueName, message);return ResponseEntity.ok("Reliable message sent successfully");}@PostMapping("/send/delayed")public ResponseEntity<String> sendDelayedMessage(@RequestParam String queueName,@RequestParam long delay,@RequestParam TimeUnit unit,@RequestBody Object message) {queueManager.sendDelayed(queueName, message, delay, unit);return ResponseEntity.ok("Delayed message sent successfully");}@GetMapping("/receive")public ResponseEntity<QueueMessage> receiveMessage(@RequestParam String queueName) {QueueMessage message = queueManager.receive(queueName);return message != null ? ResponseEntity.ok(message) : ResponseEntity.noContent().build();}@PostMapping("/acknowledge")public ResponseEntity<String> acknowledgeMessage(@RequestParam String queueName,@RequestParam String messageId) {queueManager.acknowledge(queueName, messageId);return ResponseEntity.ok("Message acknowledged");}@GetMapping("/stats")public ResponseEntity<Map<String, Object>> getQueueStats(@RequestParam String queueName) {return ResponseEntity.ok(queueManager.getQueueStats(queueName));}
}
八. 消费者服务示例
@Service
public class MessageConsumerService {private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);@Autowiredprivate RedisQueueManager queueManager;/*** 启动简单消息消费者*/@Asyncpublic void startSimpleConsumer(String queueName) {logger.info("Starting simple consumer for queue: {}", queueName);while (true) {try {QueueMessage message = queueManager.blockingReceive(queueName, 30, TimeUnit.SECONDS);if (message != null) {processMessage(message);}} catch (Exception e) {logger.error("Error in simple consumer for queue {}: {}", queueName, e.getMessage(), e);}}}/*** 启动可靠消息消费者*/@Asyncpublic void startReliableConsumer(String queueName) {logger.info("Starting reliable consumer for queue: {}", queueName);while (true) {try {QueueMessage message = queueManager.receiveAndProcess(queueName);if (message != null) {boolean success = processMessage(message);if (success) {queueManager.acknowledge(queueName, message.getId());} else {// 处理失败,可以选择重试或记录日志logger.warn("Message processing failed: {}", message.getId());}} else {Thread.sleep(1000); // 队列为空时休眠}} catch (Exception e) {logger.error("Error in reliable consumer for queue {}: {}", queueName, e.getMessage(), e);}}}private boolean processMessage(QueueMessage message) {try {logger.info("Processing message: {}", message.getId());// 模拟业务处理Thread.sleep(100);logger.info("Message processed successfully: {}", message.getId());return true;} catch (Exception e) {logger.error("Error processing message {}: {}", message.getId(), e.getMessage(), e);return false;}}
}
9. 配置类
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig {@Beanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("RedisQueue-");executor.initialize();return executor;}
}
十. 使用示例
@Component
public class QueueDemo implements CommandLineRunner {@Autowiredprivate RedisQueueManager queueManager;@Autowiredprivate MessageConsumerService consumerService;@Overridepublic void run(String... args) throws Exception {// 启动消费者consumerService.startSimpleConsumer("test.queue");consumerService.startReliableConsumer("reliable.queue");// 发送测试消息for (int i = 0; i < 10; i++) {Map<String, Object> data = new HashMap<>();data.put("index", i);data.put("timestamp", System.currentTimeMillis());// 发送简单消息queueManager.send("test.queue", data);// 发送可靠消息queueManager.sendReliable("reliable.queue", data);// 发送延迟消息if (i % 3 == 0) {queueManager.sendDelayed("test.queue", data, 10, TimeUnit.SECONDS);}Thread.sleep(1000);}}
}
设计要点总结
- 序列化配置:正确配置RedisTemplate的序列化方式,支持复杂对象存储
- 事务支持:使用SessionCallback确保多个操作的原子性
- 异常处理:完善的异常处理和日志记录
- 性能考虑:使用连接池、批量操作、异步处理
- 可靠性:实现消息确认、重试、死信处理机制
- 扩展性:支持延迟消息、优先级队列等高级特性
- 监控管理:提供队列统计和监控接口
这种基于RedisTemplate的消息队列方案适合中小型应用,具有实现简单、部署方便、性能良好的特点。对于高吞吐量、高可靠性的生产环境,建议还是使用专业的消息中间件如RabbitMQ、Kafka等。
