当前位置: 首页 > news >正文

Redis实现消息队列三种方式

参考

Redis队列详解(springboot实战)_redis 队列-CSDN博客


前言

MQ消息队列有很多种,比如RabbitMQ,RocketMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过springboot实战使其更易懂。

1. 基于List的 LPUSH+BRPOP 的实现

2. PUB/SUB,订阅/发布模式

3. 基于Stream类型的实现


1、基于List的的实现

原理

使用rpush和lpush操作入队列,lpop和rpop操作出队列。

List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

优点

一旦数据到来则立刻醒过来,消息延迟几乎为零。

缺点

  • 不能重复消费,一旦消费就会被删除

  • 不能做广播模式 , 不支持分组消费

  • lpop和rpop会一直空轮训,消耗资源 ,但可以 引入阻塞读blpop和brpop 同时也有新的问题 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常

代码

引入依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId></dependency>

配置文件

server:port: ${SERVER_PORT:9210}# Spring
spring:application:# 应用名称name: ruoyi-redis-messageredis:host: localhostport: 6379password: 123456

启动类

@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class RuoYiRedisMessageApplication
{public static void main(String[] args){SpringApplication.run(RuoYiRedisMessageApplication.class, args);System.out.println("(♥◠‿◠)ノ゙  ruoyi-redis-message启动成功");}
}

添加redis配置类

/*** redis配置*/
@Configuration
public class RedisConfig {private static final RedisSerializer<Object> SERIALIZER = createSerializer();@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 创建 RedisTemplate 对象RedisTemplate<String, Object> template = new RedisTemplate<>();// 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());template.setHashKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。template.setValueSerializer(SERIALIZER);template.setHashValueSerializer(SERIALIZER);return template;}private static RedisSerializer<Object> createSerializer() {ObjectMapper mapper = new ObjectMapper();mapper.registerModules(new JavaTimeModule());// 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXXmapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);return new GenericJackson2JsonRedisSerializer(mapper);}}

队列方法

@Slf4j
@Service
public class ListRedisQueue {//队列名public static final String KEY = "listQueue";@Resourceprivate RedisTemplate redisTemplate;public void produce(String message) {redisTemplate.opsForList().rightPush(KEY, message);}public void consume() {while (true) {String msg = (String) redisTemplate.opsForList().leftPop(KEY);log.info("疯狂获取消息:" + msg);}}public void blockingConsume() {while (true) {List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {//队列没有元素会阻塞操作,直到队列获取新的元素或超时,5表示如果没元素就每五秒去拿一次消息return connection.bRPop(5, KEY.getBytes());}}, new StringRedisSerializer());for (Object str : obj) {log.info("blockingConsume获取消息 : {}", str);}}}}

测试

lPop/rPop消费数据

@Slf4j
@SpringBootTest
public class ListQueueTest {@Autowiredprivate ListRedisQueue listRedisQueue;@Testpublic void produce() {for (int i = 0; i < 5; i++) {listRedisQueue.produce("第"+i + "个数据");}}@Testpublic void consume() {produce();log.info("生产消息完毕");listRedisQueue.consume();}}

blpop / brpop 消费数据

    @Testpublic void blockingConsume() {produce();log.info("生产消息完毕");listRedisQueue.blockingConsume();}


2、PUB/SUB,订阅/发布模式

原理

SUBSCRIBE,用于订阅信道

PUBLISH,向信道发送消息

UNSUBSCRIBE,取消订阅

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

优点

  • 一个消息可以发布到多个消费者

  • 消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)

  • 消息即时发送,消费者会自动接收到信道发布的消息

缺点

  • 消息发布时,如果客户端不在线,则消息丢失

  • 消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失

  • 消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)

代码

http://www.dtcms.com/a/323927.html

相关文章:

  • 前端学习日记 - 前端函数防抖详解
  • c#属性(Property)的概念定义及使用详解
  • 音视频学习(五十二):ADTS
  • i2c dump工具使用(202589)
  • WAV音频数据集MFCC特征提取处理办法
  • 人工智能正在学习自我提升的方式
  • Agent在游戏行业的应用:NPC智能化与游戏体验提升
  • PySpark
  • Java集合中的 LinkedList
  • 通过sealos工具在ubuntu 24.02上安装k8s集群
  • JavaScript性能优化30招实战指南
  • JUC学习笔记-----ReentrantLock
  • 怎么用java实现视频逐帧截图并保存
  • ELK分布式日志采集系统
  • 二、Linux 设置文件系统扩展属性
  • 242. 有效的字母异位词
  • 【Html网页模板】炫酷科技风公司首页
  • 元数据管理与数据治理平台:Apache Atlas 通知和业务元数据 Notifications And Business Metadata
  • Java学习第一百二十二部分——HTTPS
  • Apache Pulsar性能与可用性优化实践指南
  • JavaWeb(苍穹外卖)--学习笔记17(Apache Echarts)
  • JavaWeb(苍穹外卖)--学习笔记18(Apache POI)
  • 元数据管理与数据治理平台:Apache Atlas 分类传播 Classification Propagation
  • Qt 框架全面解析:从基础到应用
  • Android 四大布局:使用方式与性能优化原理
  • ES 调优帖:Gateway 批量写入性能优化实践
  • Redis基本原理,性能优化和参数调优简述
  • #C语言——刷题攻略:牛客编程入门训练(八):分支控制(二)
  • es-drager-blog
  • 编程与数学 03-003 计算机操作系统 15_设备管理(三):缓冲技术与I/O性能优化