基于 Redis Stream 实现消息队列功能
好长时间没更新了。。。。。。
背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至可以选择自定义线程池+DelayQueue 这种操作去进行异步操作,而大多数人会在第一时间想到消息丢列,但是消息引入消息队列这件事对于一个并发量不大、后半段消息允许失败的情况单独引入一个中间件对系统的开发维护难度都会提升一个等级,所以我就想到应用 Redis Stream 这种方式来实现异步任务的执行
废话不多说,直接上代码
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisCountdownTaskProducer implements CountdownStrategy {
private final StringRedisTemplate stringRedisTemplate;
@Override
public void startCountdown(long duration, Runnable onFinish, String userId, String taskId) {
log.info("使用redis stream 的延时任务开始执行 userId:{},taskId:{}",userId,taskId);
Map<String,String> producerMap = Map.of(
"userId",userId,
"taskId",taskId,
"duration",String.valueOf(duration)
);
// 发送延迟消费信息 topic: pickUpTheLightRecord
stringRedisTemplate.opsForStream().add("streamKey", producerMap);
}
@Override
public void cancelCountdown(String userId, String taskId) {
}
}
以我的应用场景为例,大家可以忽略这个继承的 CountdownStrategy 的接口,我这是用策略模式来实现多种方式的动态切换
最核心的代码就是一行 stringRedisTemplate.opsForStream().add(“pickUpTheLightRecord”, producerMap);
应用 redis 提供的 stream 功能,直接发送你的 topic 和你的 key(这个 key 可以是你的某个实体,某个信息,或者说某种标识,以便后续取出的时候可以知道自己要进行什么操作)
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {
private final RedisConnectionFactory redisConnectionFactory;
private final RedisCountdownTaskConsumer redisCountdownTaskConsumer;
@Bean
public ExecutorService asyncStreamConsumer() {
AtomicInteger index = new AtomicInteger();
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(processors,
processors + processors >> 1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("stream_consumer_countdown_task_" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
}
);
}
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {
// 创建消费者组
String consumerGroup = "your_comsumer_group";
String streamKey = "streamKey"; // 与生产者topic一致
try {
redisConnectionFactory.getConnection()
.xGroupCreate(streamKey.getBytes(), consumerGroup, ReadOffset.from("0-0"), true);
} catch (Exception e) {
// 捕获异常,检查是否是因为消费者组已存在导致的错误
if (e.getMessage().contains("BUSYGROUP")) {
// 如果消费者组已存在,则复用现有的消费者组
log.warn("消费者组已存在,复用现有的消费者组: {}", consumerGroup);
} else {
// 如果是其他错误,则记录日志
log.warn("消费者组创建失败: {}", e.getMessage());
}
}
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 执行从 Stream 拉取到消息的任务流程
.executor(asyncStreamConsumer)
// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时
.pollTimeout(Duration.ofSeconds(3))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
streamMessageListenerContainer.receiveAutoAck(
Consumer.from(consumerGroup,"countdownTaskConsumer"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
redisCountdownTaskConsumer);
return streamMessageListenerContainer;
}
}
以上是对消费者的配置,配置过后我们通过 Bean 的形式直接注入 Spring 容器,方便在应用启动时它可以自动创建,应用结束时可以自动销毁,避免资源浪费
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCountdownTaskConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
private final TaskCommonPomodoroTechniqueMapper taskCommonPomodoroTechniqueMapper;
private final StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("receive message:{},倒计时任务开始执行 userId:{},taskId:{}",message);
// 执行倒计时任务
String stream = message.getStream();
RecordId id = message.getId();
String consumerGroup = "pickUpTheLightRecord_consumer_group";
//消息幂等处理
if(!messageQueueIdempotentHandler.isMessageProcessed(id.toString())){
//判断当前消息是否执行完成
if(messageQueueIdempotentHandler.isAccomplish(id.toString())){
log.info("消息已处理完成: {}", id);
stringRedisTemplate.opsForStream().acknowledge(stream, consumerGroup, id); // 显式AC
return;
}
throw new RuntimeException("消息未完成流程,选择消息队列重试");
}
try {
Map<String, String> value = message.getValue();
String userId = value.get("userId");
String taskId = value.get("taskId");
LambdaUpdateWrapper<TaskCommonPomodoroTechnique> wrapper = new LambdaUpdateWrapper<>();
wrapper.eq(TaskCommonPomodoroTechnique::getUserId,userId);
wrapper.eq(TaskCommonPomodoroTechnique::getId,taskId);
TaskCommonPomodoroTechnique taskCommonPomodoroTechnique = taskCommonPomodoroTechniqueMapper.selectOne(wrapper);
if(taskCommonPomodoroTechnique == null){
log.error("倒计时任务不存在,userId:{},taskId:{}",userId,taskId);
return;
}
taskCommonPomodoroTechnique.setCompletionTimes(taskCommonPomodoroTechnique.getCompletionTimes()+1);
taskCommonPomodoroTechniqueMapper.updateById(taskCommonPomodoroTechnique);
log.info("倒计时任务执行成功,userId:{},taskId:{}",userId,taskId);
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
}catch (Throwable e){
messageQueueIdempotentHandler.deleteMessageProcessed(id.toString());
log.error("记录倒计时任务异常",e);
throw e;
}
messageQueueIdempotentHandler.setAccomplish(id.toString());
}
}
最后我们来实现具体的消费者,直接继承 StreamListener 这个类,重写 onMessage 方法,在这个方法中定义你要执行的业务逻辑(大家可以忽略幂等的处理,这个不是讲解的重点)
这样就可以使用基于 Redis Stream 的消息丢列啦
总的来说和 RocketMQ 的使用差不多,但是多了一些配置的过程