Redis实现消息队列三种方式
参考
Redis队列详解(springboot实战)_redis 队列-CSDN博客
前言
MQ消息队列有很多种,比如RabbitMQ,RocketMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过springboot实战使其更易懂。
1. 基于List的 LPUSH+BRPOP 的实现
2. PUB/SUB,订阅/发布模式
3. 基于Stream类型的实现
1、基于List的的实现
原理
使用rpush和lpush操作入队列,lpop和rpop操作出队列。
List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。
优点
一旦数据到来则立刻醒过来,消息延迟几乎为零。
缺点
-
不能重复消费,一旦消费就会被删除
-
不能做广播模式 , 不支持分组消费
-
lpop和rpop会一直空轮训,消耗资源 ,但可以 引入阻塞读blpop和brpop 同时也有新的问题 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常
代码
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId></dependency>
配置文件
server:port: ${SERVER_PORT:9210}# Spring
spring:application:# 应用名称name: ruoyi-redis-messageredis:host: localhostport: 6379password: 123456
启动类
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class RuoYiRedisMessageApplication
{public static void main(String[] args){SpringApplication.run(RuoYiRedisMessageApplication.class, args);System.out.println("(♥◠‿◠)ノ゙ ruoyi-redis-message启动成功");}
}
添加redis配置类
/*** redis配置*/
@Configuration
public class RedisConfig {private static final RedisSerializer<Object> SERIALIZER = createSerializer();@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 创建 RedisTemplate 对象RedisTemplate<String, Object> template = new RedisTemplate<>();// 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());template.setHashKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。template.setValueSerializer(SERIALIZER);template.setHashValueSerializer(SERIALIZER);return template;}private static RedisSerializer<Object> createSerializer() {ObjectMapper mapper = new ObjectMapper();mapper.registerModules(new JavaTimeModule());// 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXXmapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);return new GenericJackson2JsonRedisSerializer(mapper);}}
队列方法
@Slf4j
@Service
public class ListRedisQueue {//队列名public static final String KEY = "listQueue";@Resourceprivate RedisTemplate redisTemplate;public void produce(String message) {redisTemplate.opsForList().rightPush(KEY, message);}public void consume() {while (true) {String msg = (String) redisTemplate.opsForList().leftPop(KEY);log.info("疯狂获取消息:" + msg);}}public void blockingConsume() {while (true) {List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {//队列没有元素会阻塞操作,直到队列获取新的元素或超时,5表示如果没元素就每五秒去拿一次消息return connection.bRPop(5, KEY.getBytes());}}, new StringRedisSerializer());for (Object str : obj) {log.info("blockingConsume获取消息 : {}", str);}}}}
测试
lPop/rPop消费数据
@Slf4j
@SpringBootTest
public class ListQueueTest {@Autowiredprivate ListRedisQueue listRedisQueue;@Testpublic void produce() {for (int i = 0; i < 5; i++) {listRedisQueue.produce("第"+i + "个数据");}}@Testpublic void consume() {produce();log.info("生产消息完毕");listRedisQueue.consume();}}
blpop / brpop 消费数据
@Testpublic void blockingConsume() {produce();log.info("生产消息完毕");listRedisQueue.blockingConsume();}
2、PUB/SUB,订阅/发布模式
原理
SUBSCRIBE,用于订阅信道
PUBLISH,向信道发送消息
UNSUBSCRIBE,取消订阅
此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。
优点
-
一个消息可以发布到多个消费者
-
消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)
-
消息即时发送,消费者会自动接收到信道发布的消息
缺点
-
消息发布时,如果客户端不在线,则消息丢失
-
消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失
-
消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)