Redis 监听过期Key
Redis 监听过期Key
- Redis 键空间通知
- 事件驱动
- 文件事件
- 时间事件
- 打开方式
- 动态配置(临时开,重启失效)
- 静态配置(重启redis-server生效)
- 事件类型表
- 事件是怎么样发送的?
- 实测
- 使用场景
- 缺点
- 在SpringBoot应用
- 依赖
- KeyspaceEventMessageListener
- 样例
- KeyExpirationEventMessageListener
- 样例
- @EventListener
- 继承并重写
Redis 键空间通知
实时监控Redis键和值的变化
注意:Redis Pub/Sub
是即发即弃的;也就是说,如果 Pub/Sub
客户端断开连接,然后稍后重新连接,则客户端断开连接期间传递的所有事件都将丢失。
事件驱动
Redis
定义了两种事件类型,一种是文件事件,一种是时间事件。不同了事件类型通过不同的处理去处理。
文件事件
可以理解为是IO
操作:建立连接,读请求,写响应。
时间事件
定时执行:过期key,AOF等。
打开方式
redis 默认是不发送任意事件通知的。
动态配置(临时开,重启失效)
CONFIG SET notify-keyspace-events AKE
静态配置(重启redis-server生效)
在redis.conf
中找到 notify-keyspace-events ""
修改为 notify-keyspace-events "AKE"
则表示开启所有事件通知。
事件类型表
字母 | 所属维度 | 含义说明 | 典型命令/场景 |
---|---|---|---|
K | 通道维度 | Keyspace 事件,频道名格式 __keyspace@<db>__:<key> | 监听“某个键发生过什么” |
E | 通道维度 | Keyevent 事件,频道名格式 __keyevent@<db>__:<cmd> | 监听“某个命令影响了哪些键” |
g | 命令维度 | 通用命令(与类型无关) | DEL、EXPIRE、RENAME、MOVE、SWAPDB 等 |
$ | 命令维度 | 字符串专属命令 | SET、MSET、APPEND、INCR、DECR … |
l | 命令维度 | 列表专属命令 | LPUSH、RPOP、LTRIM … |
s | 命令维度 | 集合专属命令 | SADD、SPOP、SINTER … |
h | 命令维度 | 哈希专属命令 | HSET、HDEL、HINCRBY … |
z | 命令维度 | 有序集合专属命令 | ZADD、ZREM、ZINCRBY … |
t | 命令维度 | Stream 专属命令 | XADD、XDEL、XTRIM … |
d | 命令维度 | 模块键类型事件(Module key) | 自定义模块产生的键 |
x | 生命周期 | 过期事件(expired) | 键因 TTL 到达被删除 |
e | 生命周期 | 淘汰事件(evicted) | 键因 maxmemory 策略被逐出 |
m | 生命周期 | 未命中事件(miss) | 访问了一个不存在的键 |
n | 生命周期 | 新建事件(new) | 第一次向一个键写入值 |
A | 快捷别名 | 相当于 g$lshztxed 的集合(不含 m、n) | 想“一键全开”时常用 |
事件是怎么样发送的?
本质:Redis 发布/订阅
。
执行命令/键修改时 -> 触发对应回调-> 调用 publish
向固定的频道(__keyspace@<db>__:<key>||__keyevent@<db>__:<event>
)发送消息事件。
口诀:“双下划线,keyspace / keyevent 二选一;@库号,冒号后跟事件名;过期找 expired,淘汰找 evicted。”
维度 | 频道格式 | 示例(库 0) | 说明 |
---|---|---|---|
keyspace | __keyspace@<db>__:<key> | __keyspace@0__:mylock | 告诉你“哪个键发生了事” |
keyevent | __keyevent@<db>__:<event> | __keyevent@0__:expired | 告诉你“什么事发生了 |
不同命令产生的事件:官方文档 https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/#events-generated-by-different-commands
**重要提示:**所有命令仅在目标键真正被修改时才会生成事件。例如, DELETE 删除一个不存在的元素并不会真正改变该键的值,因此不会生成任何事件。
监听过期key的时候只需要关注:expired
。
补充:每次根据**maxmemory**
策略从数据集中逐出一个键以释放内存时(内存不够,驱逐未使用过的key): <font style="color:rgb(17, 24, 39);">evicted
实测
监听:**psubscribe __key*__:***
127.0.0.1:6379> psubscribe __key*__:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 通过模式频道监听
2) "__key*__:*" # 监听的模式平道
3) (integer) 1 # 当前客户端监听的频道
另外启动一个客户端
127.0.0.1:6379> set hello world; # 创建hello这个键
OK
127.0.0.1:6379> expire hello 10 # 给hello这个键设置一个过期时间
(integer) 1
127.0.0.1:6379>
观察监听的哪个客户端
1) "pmessage" # 发送消息
2) "__key*__:*" # 发送消息的模式频道
3) "__keyspace@0__:hello" # 发送消息的实际频道
4) "set" # 发送消息的内容 这里表示对hello这个key进行了set操作
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:set"
4) "hello" # 这里表示通过set命名操作了hello这个key
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:hello"
4) "expire"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expire" # 这里表示通过expire对hello设置了一个过期时间
4) "hello"
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:hello"
4) "expired"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expired" # hello 这个key过期后会触发发送事件
4) "hello"
了解以上内容后我们便可以监听事件来实现业务中的一些功能了。但是注意redis 发布/订阅是不可靠的。他不能保证一定会你发消息。如果要用一定要添加补偿机制。
使用场景
- 订单超市未支付-----订单的有效期为ttl,ttl到期后处理监听事件,来做关闭订单,解锁库存
- 配置刷新-----读取配置后给配置一个ttl监听该key,如果ttl则重新拉取配置
- 限流窗口----重新计数–可以做一个简单的限流-> 当某一个第一次接受到请求的时候往redis添加一个key并将value给1,并设置ttl为60,然后再接受同样请求的时候对这个key的value+1,如果该key的value值大于某个界限的时候可以在程序中直接拦截掉->如果该key过期了,可以通过监听重新设置该key的value为1。如此循环。
缺点
- 消息丢失:redis pub/sub 是无持久化。
- 消息重复消费:多节点下多个会同时收到过期key的消息
- 大key同时过期:节点压力会剧增
- 只发一次:过期后立刻就发送消息,当时没收到以后也不会收到。
在SpringBoot应用
Java:21
SpringBoot:3.5.5
Redis-client:Spring-data-redis 3.5.5+Lettuce-6.x
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
KeyspaceEventMessageListener
这是spring-data-redis 内置的一个 抽象类KeyspaceEventMessageListener
。只需要继承该类,并重写他的doHandleMessage方法即可监听__keyevent@*
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {// 监听的渠道private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");public void init() {RedisConnectionFactory connectionFactory = listenerContainer.getConnectionFactory();if (StringUtils.hasText(keyspaceNotificationsConfigParameter) && connectionFactory != null) {try (RedisConnection connection = connectionFactory.getConnection()) {RedisServerCommands commands = connection.serverCommands();Properties config = commands.getConfig("notify-keyspace-events");// 这个是配置是否包含notify-keyspace-events 用于为连接动态打开键空间通知if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {commands.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}}}// 1.注册doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {// 2.向listener容器中添加该渠道表示监听渠道listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message* 实际消费消息的地方* @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);
}
样例
// 注意要讲该对象添加为Bean对象
@Slf4j
@Component
public class ConsumeKeyspaceEventMessageListenerImpl extends KeyspaceEventMessageListener {/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public ConsumeKeyspaceEventMessageListenerImpl(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overrideprotected void doHandleMessage(Message message) {log.info("channel:{},body:{}", new String(message.getChannel()), new String(message.getBody()));}
}
KeyExpirationEventMessageListener
键过期的专用消息监听器
它继承了KeyspaceEventMessageListener
只关注__keyevent@*__:expired
它默认给我们实现的操作是:发送一个event(RedisKeyExpiredEvent)给Spring容器。让Spring的事件机制再去处理。
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListenerimplements ApplicationEventPublisherAware {// 只关注该频道private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {// 1 注册listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}// 默认实现@Overrideprotected void doHandleMessage(Message message) {// // 2. 将内容包装RedisKeyExpiredEvent为发送给Spring容器publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}}
如果我们自己想对键过期做专门的处理,有两种方式
- 直接通过
@EventListener
处理RedisKeyExpiredEvent
该事件即可。 - 继续
KeyExpirationEventMessageListener
重写doHandleMessage
即可。
样例
@EventListener
要保证容器中存在KeyExpirationEventMessageListener
的组件才会在key过期的时候发送RedisKeyExpiredEvent
@Bean
public KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer){return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
}@Bean
public RedisKeyExpiredEventListener redisKeyExpiredEventListener() {return new RedisKeyExpiredEventListener();
}@Slf4j
public class RedisKeyExpiredEventListener {/*** 通过Spring的EventListener处理过期key** @param event*/@EventListenerpublic void onMessage(RedisKeyExpiredEvent<byte[]> event) {log.info("expired key: {}", new String(event.getId()));log.info("timestamp: {}", event.getTimestamp());}
}
继承并重写
需要重写doHandleMessage
来实现自定义逻辑
@Bean
public ConsumeKeyExpirationEventMessageListener consumeKeyExpirationEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {return new ConsumeKeyExpirationEventMessageListener(redisMessageListenerContainer);
}
@Slf4j
public class ConsumeKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public ConsumeKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** 通过继承并重写doHandleMessage来达到监听某个key过期* @param message never {@literal null}.*/@Overrideprotected void doHandleMessage(Message message) {// 保留发送RedisKeyExpiredEvent事件,以便其他地方使用super.doHandleMessage(message);log.info("expired key: {}", new String(message.getBody()));log.info("channel: {}", new String(message.getChannel()));}
}