Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)
Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程
一、整体架构
二、实现步骤
步骤1:添加Maven依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
步骤2:配置Redis连接
# application.yml
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 16max-idle: 8
# redisStream配置信息
app:redis:stream: app-eventsgroup: app-groupconsumer: consumer-${random.int(1000)}
步骤3:创建消费者组
@Configuration
public class RedisConfig {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Beanpublic void createConsumerGroup(StringRedisTemplate redisTemplate) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {System.out.println("消费者组已存在: " + groupName);}}
}
步骤4:配置消息监听容器
@Configuration
public class RedisConfig {// 配置消息监听线程池@Bean(name = "redisStreamTaskExecutor")public ThreadPoolTaskExecutor redisStreamTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setThreadNamePrefix("redis-stream-");return executor;}// 创建消息监听容器@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(RedisConnectionFactory factory,@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).executor(executor).batchSize(10).build();return StreamMessageListenerContainer.create(factory, options);}
}
步骤5:注册消息监听器
@Component
public class StreamListenerRegistrar {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;@PostConstructpublic void registerListener(StreamMessageListenerContainer container, RedisMessageProcessor processor) {StreamReadRequest<String> readRequest = StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false) // 手动ACK.build();container.register(readRequest, processor);}
}
步骤6:实现消息处理器
@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {@Overridepublic void onMessage(MapRecord<String, String, String> record) {CompletableFuture.runAsync(() -> {try {// 业务处理逻辑processBusiness(record);// 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());} catch (Exception e) {// 失败消息进入Pending List}});}private void processBusiness(MapRecord<String, String, String> record) throws Exception {String eventType = record.getValue().get("eventType");String payload = record.getValue().get("payload");// 根据事件类型处理switch (eventType) {case "ORDER_CREATED": handleOrder(payload); break;case "PAYMENT_PROCESSED": handlePayment(payload); break;}}
}
步骤7:实现Pending消息处理器
@Component
@Slf4j
public class PendingMessageProcessor {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;// 每分钟处理一次Pending消息@Scheduled(fixedRate = 60000)public void processPendingMessages() {// 1. 查询Pending消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);pending.forEach(this::handlePendingMessage);}private void handlePendingMessage(PendingMessage pending) {try {// 2. 重新认领消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().claim(streamKey, Consumer.from(groupName, consumerName), Duration.ofSeconds(30), pending.getId());if (!records.isEmpty()) {MapRecord<String, String, String> record = records.get(0);// 3. 重试处理messageProcessor.processBusiness(record);// 4. 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());}} catch (Exception e) {// 5. 超过重试次数移入死信队列if (pending.getTotalDeliveryCount() > 3) {moveToDeadLetterQueue(pending);}}}private void moveToDeadLetterQueue(PendingMessage pending) {// 获取消息内容List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(streamKey, Range.from(pending.getId()));if (!records.isEmpty()) {// 添加到死信队列redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());// 确认原始消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());}}
}
步骤8:实现消息生产者
@Service
public class RedisMessageProducer {@Value("${app.redis.stream}")private String streamKey;public String sendMessage(String eventType, String payload) {Map<String, String> message = Map.of("eventType", eventType,"payload", payload,"timestamp", String.valueOf(System.currentTimeMillis()));return redisTemplate.opsForStream().add(streamKey, message).getValue();}
}
步骤9:创建REST接口
@RestController
@RequestMapping("/messages")
public class MessageController {private final RedisMessageProducer producer;@PostMappingpublic String sendMessage(@RequestBody MessageRequest request) {return producer.sendMessage(request.getEventType(), request.getPayload());}@Datapublic static class MessageRequest {private String eventType;private String payload;}
}
三、消息生命周期流程图
1. 正常消息处理流程
2. Pending消息处理流程
3. ACK机制工作原理
四、生产环境建议
消费者命名策略
@Value("${app.redis.consumer}") private String consumerName;// 在应用启动时设置 @PostConstruct public void initConsumerName() {String hostName = InetAddress.getLocalHost().getHostName();String port = environment.getProperty("server.port");consumerName = "consumer-" + hostName + "-" + port; }
动态配置重试策略
app:pending:max_retry: 5retry_interval: 30000 # 30秒
死信队列监控
@Scheduled(fixedRate = 3600000) // 每小时检查一次 public void checkDeadLetterQueue() {Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);if (size > 0) {alertService.sendAlert("死信队列有 " + size + " 条未处理消息");} }
消息TTL设置
// 发送消息时设置最大长度 public String sendMessage(String eventType, String payload) {MapRecord<String, String, String> record = ...;return redisTemplate.opsForStream().add(Record.of(record).withMaxLen(10000).approximate(true)); }
六、总结
本文详细介绍了Spring Boot整合Redis实现发布/订阅功能并添加ACK机制的完整方案:
事件驱动架构:使用Redis Stream监听器实现真正的发布/订阅模式
可靠ACK机制:通过手动ACK确认确保消息可靠处理
自动恢复系统:Pending消息处理器自动处理失败消息
死信队列:隔离无法处理的消息,防止系统阻塞
生产就绪:包含多实例部署、动态配置、监控告警等生产级特性
该方案适用于需要高可靠性消息传递的场景,如订单处理、支付系统、事件溯源等,在保证系统吞吐量的同时提供了消息可靠性保障。