当前位置: 首页 > news >正文

2025 年 Redis 面试天花板

前言

Redis 作为当前最流行的内存数据库,已成为大厂面试的必考题。无论是初级开发工程师还是资深架构师,Redis 相关知识点都是面试中的重中之重。本文汇总了 2025 年各大互联网公司(阿里、腾讯、字节、美团、拼多多等)最新 Redis 面试真题,从基础到进阶,从原理到实战,全方位解析 Redis 核心知识点,助你轻松应对各类 Redis 面试挑战。

一、Redis 基础概念

1. 什么是 Redis?它有哪些核心特性?

Redis(Remote Dictionary Server)是一个开源的、高性能的键值对存储数据库,它支持多种数据结构,采用内存存储,同时提供持久化功能。

核心特性:

  • 基于内存操作,速度快(读速度约 110000 次 / 秒,写速度约 81000 次 / 秒)
  • 支持多种数据结构:String、Hash、List、Set、Sorted Set、Bitmaps、HyperLogLogs、Geospatial 等
  • 支持持久化(RDB 和 AOF 两种方式)
  • 支持主从复制、哨兵模式和集群
  • 支持事务、Lua 脚本
  • 支持过期键自动删除
  • 支持发布 / 订阅模式
  • 单线程架构,避免了多线程切换的开销

2. Redis 和 Memcached 有什么区别?

特性RedisMemcached
数据结构丰富(String、Hash、List 等)单一(主要是 String)
持久化支持(RDB、AOF)不支持
主从复制支持不支持原生,需第三方
集群支持不支持原生,需第三方
内存管理自己管理,支持多种淘汰策略使用 Slab Allocation
多线程网络 IO 多线程,数据操作单线程多线程
适用场景复杂数据结构、持久化需求、高可用需求简单缓存、高并发场景

3. Redis 为什么这么快?

Redis 之所以性能优异,主要原因有以下几点:

  1. 基于内存存储:所有数据都在内存中,避免了磁盘 IO 的开销
  2. 单线程架构:避免了多线程上下文切换和锁竞争的开销
  3. 高效的数据结构:针对不同场景设计了高效的数据结构(如跳表、压缩列表等)
  4. IO 多路复用:采用 epoll/kqueue 等 IO 多路复用技术,高效处理并发连接
  5. 精简的代码实现:代码量小,逻辑清晰,执行效率高

二、Redis 数据结构

4. Redis 有哪些数据结构?各自的使用场景是什么?

Redis 提供了多种数据结构,每种结构都有其特定的应用场景:

  1. String(字符串)

    • 最基本的数据结构,可存储字符串、整数或浮点数
    • 应用场景:缓存用户信息、计数器、分布式锁等
  2. Hash(哈希)

    • 键值对集合,适合存储对象
    • 应用场景:存储用户信息、商品详情等对象数据
  3. List(列表)

    • 有序字符串列表,可在两端操作
    • 应用场景:消息队列、最新消息展示、排行榜等
  4. Set(集合)

    • 无序且唯一的字符串集合
    • 应用场景:好友关系、标签、去重等
  5. Sorted Set(有序集合)

    • 有序且唯一的字符串集合,通过分数排序
    • 应用场景:排行榜、带权重的消息队列等
  6. Bitmaps(位图)

    • 按位存储,节省空间
    • 应用场景:用户签到、活跃用户统计等
  7. HyperLogLogs(基数统计)

    • 用于基数统计,占用空间小
    • 应用场景:UV 统计、独立访客数统计等
  8. Geospatial(地理空间)

    • 存储地理位置信息
    • 应用场景:附近的人、地理位置查询等

5. 详细介绍 Redis 的 String 数据结构及其实现原理

String 是 Redis 最基本的数据结构,它可以存储字符串(二进制安全)、整数和浮点数,最大长度为 512MB。

String 的内部实现:

  • 当存储的是整数且范围在 long 类型范围内时,Redis 会直接用 long 类型存储
  • 当存储的是字符串时,Redis 使用 SDS(Simple Dynamic String)结构存储

SDS 结构定义(Redis 6.0+):

struct __attribute__ ((__packed__)) sdshdr8 {uint8_t len; /* 已使用长度 */uint8_t alloc; /* 总分配长度,不包括头部和空终止符 */unsigned char flags; /* 标志位,区分不同的sdshdr类型 */char buf[]; /* 柔性数组,存储实际字符串 */
};

SDS 相比 C 语言字符串的优势:

  1. 常数时间获取字符串长度
  2. 避免缓冲区溢出
  3. 减少字符串修改时的内存重分配次数
  4. 二进制安全
  5. 兼容部分 C 字符串函数

示例:使用 String 存储用户信息

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;/*** Redis String 数据结构示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis String 示例")
public class RedisStringService {private final RedisTemplate<String, Object> redisTemplate;/*** 存储用户信息到Redis* @param userId 用户ID* @param user 用户对象*/@ApiOperation("存储用户信息")public void saveUser(Long userId, User user) {if (ObjectUtils.isEmpty(userId) || ObjectUtils.isEmpty(user)) {log.error("用户ID或用户信息不能为空");throw new IllegalArgumentException("用户ID或用户信息不能为空");}String key = "user:" + userId;// 将用户对象转为JSON字符串存储redisTemplate.opsForValue().set(key, JSON.toJSONString(user));log.info("用户信息已保存到Redis,key: {}", key);}/*** 从Redis获取用户信息* @param userId 用户ID* @return 用户对象*/@ApiOperation("获取用户信息")public User getUser(Long userId) {if (ObjectUtils.isEmpty(userId)) {log.error("用户ID不能为空");throw new IllegalArgumentException("用户ID不能为空");}String key = "user:" + userId;String userJson = (String) redisTemplate.opsForValue().get(key);if (StringUtils.hasText(userJson)) {return JSON.parseObject(userJson, User.class);}return null;}/*** 自增计数器* @param counterKey 计数器键名* @param increment 增量* @return 自增后的值*/@ApiOperation("计数器自增")public Long incrementCounter(String counterKey, long increment) {if (!StringUtils.hasText(counterKey)) {log.error("计数器键名不能为空");throw new IllegalArgumentException("计数器键名不能为空");}return redisTemplate.opsForValue().increment(counterKey, increment);}
}

6. Redis 的 Hash 数据结构底层是如何实现的?

Hash 数据结构用于存储键值对集合,适合存储对象。其底层实现有两种方式:

  1. 压缩列表(ziplist):当哈希表中元素数量少且元素值小时使用

    • 元素数量少于 hash-max-ziplist-entries(默认 512)
    • 每个元素的键和值的长度都小于 hash-max-ziplist-value(默认 64 字节)
  2. 哈希表(hashtable):当不满足压缩列表条件时使用

压缩列表是一种紧凑的连续内存结构,由一系列特殊编码的连续内存块组成,相比哈希表更节省内存,但操作效率较低。当数据量增大时,Redis 会自动将压缩列表转换为哈希表。

哈希表的实现类似于 Java 中的 HashMap,采用数组 + 链表(或红黑树)的结构,通过链地址法解决哈希冲突。

示例:使用 Hash 存储商品信息

import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Set;/*** Redis Hash 数据结构示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Hash 示例")
public class RedisHashService {private final RedisTemplate<String, Object> redisTemplate;/*** 存储商品信息到Hash* @param productId 商品ID* @param productInfo 商品信息键值对*/@ApiOperation("存储商品信息")public void saveProduct(Long productId, Map<String, Object> productInfo) {if (ObjectUtils.isEmpty(productId) || CollectionUtils.isEmpty(productInfo)) {log.error("商品ID或商品信息不能为空");throw new IllegalArgumentException("商品ID或商品信息不能为空");}String key = "product:" + productId;HashOperations<String, String, Object> hashOps = redisTemplate.opsForHash();hashOps.putAll(key, productInfo);log.info("商品信息已保存到Redis Hash,key: {}", key);}/*** 获取商品的某个属性* @param productId 商品ID* @param field 属性名* @return 属性值*/@ApiOperation("获取商品属性")public Object getProductField(Long productId, String field) {if (ObjectUtils.isEmpty(productId) || !StringUtils.hasText(field)) {log.error("商品ID或属性名不能为空");throw new IllegalArgumentException("商品ID或属性名不能为空");}String key = "product:" + productId;HashOperations<String, String, Object> hashOps = redisTemplate.opsForHash();return hashOps.get(key, field);}/*** 获取商品的所有信息* @param productId 商品ID* @return 商品所有属性键值对*/@ApiOperation("获取商品所有信息")public Map<String, Object> getProductAllFields(Long productId) {if (ObjectUtils.isEmpty(productId)) {log.error("商品ID不能为空");throw new IllegalArgumentException("商品ID不能为空");}String key = "product:" + productId;HashOperations<String, String, Object> hashOps = redisTemplate.opsForHash();return hashOps.entries(key);}/*** 删除商品的某个属性* @param productId 商品ID* @param fields 要删除的属性名* @return 删除的数量*/@ApiOperation("删除商品属性")public Long deleteProductFields(Long productId, String... fields) {if (ObjectUtils.isEmpty(productId) || ObjectUtils.isEmpty(fields)) {log.error("商品ID或属性名不能为空");throw new IllegalArgumentException("商品ID或属性名不能为空");}String key = "product:" + productId;HashOperations<String, String, Object> hashOps = redisTemplate.opsForHash();return hashOps.delete(key, (Object[]) fields);}
}

7. Redis 的 Sorted Set 底层实现原理是什么?

Sorted Set(有序集合)是 Redis 中最有特色的数据结构之一,它兼具集合和排序的功能,每个元素都有一个分数(score),用于排序。

Sorted Set 的底层实现有两种方式:

  1. 压缩列表(ziplist):当元素数量少且元素值小时使用

    • 元素数量少于 zset-max-ziplist-entries(默认 128)
    • 每个元素的成员(member)长度小于 zset-max-ziplist-value(默认 64 字节)
  2. 跳表(skiplist)+ 哈希表:当不满足压缩列表条件时使用

    • 跳表:用于根据分数范围获取元素,支持快速插入、删除和查找
    • 哈希表:用于根据成员快速查找分数,时间复杂度为 O (1)

跳表是一种有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。跳表的查找、插入、删除操作的平均时间复杂度为 O (logN),最坏情况下为 O (N)。

示例:使用 Sorted Set 实现排行榜功能

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Set;/*** Redis Sorted Set 数据结构示例服务(排行榜)* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Sorted Set 示例(排行榜)")
public class RedisSortedSetService {private final RedisTemplate<String, Object> redisTemplate;/*** 添加用户分数到排行榜* @param rankKey 排行榜键名* @param userId 用户ID* @param score 分数*/@ApiOperation("添加用户分数到排行榜")public void addUserScore(String rankKey, Long userId, double score) {if (!StringUtils.hasText(rankKey) || ObjectUtils.isEmpty(userId)) {log.error("排行榜键名或用户ID不能为空");throw new IllegalArgumentException("排行榜键名或用户ID不能为空");}ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();zSetOps.add(rankKey, userId, score);log.info("用户{}已添加到排行榜{},分数: {}", userId, rankKey, score);}/*** 增加用户分数* @param rankKey 排行榜键名* @param userId 用户ID* @param increment 增加的分数* @return 增加后的分数*/@ApiOperation("增加用户分数")public Double incrementUserScore(String rankKey, Long userId, double increment) {if (!StringUtils.hasText(rankKey) || ObjectUtils.isEmpty(userId)) {log.error("排行榜键名或用户ID不能为空");throw new IllegalArgumentException("排行榜键名或用户ID不能为空");}ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();return zSetOps.incrementScore(rankKey, userId, increment);}/*** 获取用户排名(从高到低)* @param rankKey 排行榜键名* @param userId 用户ID* @return 排名(从0开始)*/@ApiOperation("获取用户排名")public Long getUserRank(String rankKey, Long userId) {if (!StringUtils.hasText(rankKey) || ObjectUtils.isEmpty(userId)) {log.error("排行榜键名或用户ID不能为空");throw new IllegalArgumentException("排行榜键名或用户ID不能为空");}ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();// 注意:zRank是从低到高排名,zReverseRank是从高到低排名return zSetOps.reverseRank(rankKey, userId);}/*** 获取排行榜前N名用户* @param rankKey 排行榜键名* @param topN 前N名* @return 前N名用户及分数*/@ApiOperation("获取排行榜前N名")public Set<ZSetOperations.TypedTuple<Object>> getTopUsers(String rankKey, long topN) {if (!StringUtils.hasText(rankKey) || topN <= 0) {log.error("排行榜键名不能为空且topN必须大于0");throw new IllegalArgumentException("排行榜键名不能为空且topN必须大于0");}ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();// 0到topN-1表示前topN名,true表示按分数从高到低return zSetOps.reverseRangeWithScores(rankKey, 0, topN - 1);}/*** 获取用户分数* @param rankKey 排行榜键名* @param userId 用户ID* @return 用户分数*/@ApiOperation("获取用户分数")public Double getUserScore(String rankKey, Long userId) {if (!StringUtils.hasText(rankKey) || ObjectUtils.isEmpty(userId)) {log.error("排行榜键名或用户ID不能为空");throw new IllegalArgumentException("排行榜键名或用户ID不能为空");}ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();return zSetOps.score(rankKey, userId);}
}

三、Redis 持久化

8. Redis 有哪些持久化方式?各有什么优缺点?

Redis 提供两种主要的持久化方式:RDB(Redis Database)和 AOF(Append Only File)。

RDB(快照持久化)

RDB 是指在指定的时间间隔内将内存中的数据集快照写入磁盘。

优点

  • RDB 文件是紧凑的二进制文件,体积小,适合备份和灾难恢复
  • 恢复大数据集时,RDB 方式比 AOF 方式快
  • RDB 持久化对 Redis 性能影响较小,因为主进程只需 fork 一个子进程来完成 RDB 文件的创建

缺点

  • 数据安全性较低,如果 Redis 意外宕机,可能会丢失最后一次 RDB 快照后的所有数据
  • 如果数据集很大,fork 子进程可能会阻塞 Redis 服务器一段时间
AOF(追加文件持久化)

AOF 是以日志的形式记录 Redis 的每一个写操作,在 Redis 重启时,通过重新执行这些命令来恢复数据。

优点

  • 数据安全性更高,支持多种同步策略(always、everysec、no)
  • AOF 文件是可读的文本文件,便于理解和修改
  • 即使 AOF 文件损坏,也可以通过 redis-check-aof 工具修复

缺点

  • AOF 文件通常比 RDB 文件大很多
  • 在高写入负载下,AOF 可能会影响 Redis 性能
  • 恢复大数据集时,AOF 方式比 RDB 方式慢

9. RDB 和 AOF 的实现原理是什么?

RDB 实现原理

RDB 持久化通过以下步骤实现:

  1. Redis 主进程收到 RDB 持久化命令(save 或 bgsave)
  2. 如果是 save 命令,主进程会直接进行 RDB 持久化,期间会阻塞所有客户端请求
  3. 如果是 bgsave 命令,主进程会 fork 一个子进程
  4. 子进程负责将内存中的数据写入临时 RDB 文件
  5. 写入完成后,原子性地替换旧的 RDB 文件
  6. 子进程退出

RDB 文件是一个经过压缩的二进制文件,包含了 Redis 在某个时间点的所有数据。

AOF 实现原理

AOF 持久化通过以下步骤实现:

  1. 开启 AOF 后,Redis 会将每个写命令追加到 AOF 缓冲区
  2. 根据配置的同步策略(appendfsync),将缓冲区内容同步到磁盘
    • always:每个写命令都立即同步到磁盘,安全性最高,性能最差
    • everysec:每秒同步一次,平衡安全性和性能
    • no:由操作系统决定何时同步,性能最好,安全性最差
  3. AOF 文件会随着时间变得越来越大,Redis 通过 AOF 重写(rewrite)机制来压缩 AOF 文件
  4. AOF 重写可以通过 bgrewriteaof 命令触发,也可以根据配置自动触发
  5. 重写过程与 RDB 类似,也是通过 fork 子进程来完成,不会阻塞主进程

AOF 重写不是对旧文件进行修改,而是直接读取当前内存中的数据,生成一系列新的写命令来替代旧的命令序列,从而减小文件体积。

10. RDB 和 AOF 如何选择?Redis 4.0 后的混合持久化是什么?

RDB 和 AOF 的选择取决于具体的业务需求:

  • 如果追求数据安全性,愿意牺牲一些性能和存储空间,选择 AOF
  • 如果追求性能和快速恢复,对数据安全性要求不高,选择 RDB
  • 实际生产环境中,通常建议同时开启 RDB 和 AOF,以兼顾安全性和性能
Redis 4.0 后的混合持久化

Redis 4.0 引入了混合持久化(RDB-AOF 混合模式),它结合了 RDB 和 AOF 的优点:

  • AOF 文件的开头是 RDB 格式的全量数据快照
  • 后续是 AOF 格式的增量命令
  • 这样既保证了快速恢复(利用 RDB),又保证了数据安全性(利用 AOF)

混合持久化的配置:

aof-use-rdb-preamble yes

混合持久化的恢复过程:

  1. 先加载 RDB 部分的全量数据
  2. 再执行 AOF 部分的增量命令

四、Redis 高可用

11. Redis 主从复制的原理是什么?如何配置?

Redis 主从复制是实现高可用的基础,它允许将一台 Redis 服务器(主节点)的数据复制到其他 Redis 服务器(从节点)。

主从复制的原理
  1. 建立连接:从节点通过 slaveof 命令连接主节点,发送 SYNC 命令
  2. 全量复制:主节点收到 SYNC 命令后,执行 bgsave 生成 RDB 文件,同时记录此后的写命令到缓冲区
  3. 传输 RDB 文件:主节点将 RDB 文件发送给从节点,从节点接收并加载 RDB 文件
  4. 增量复制:主节点将缓冲区中的写命令发送给从节点,从节点执行这些命令
  5. 命令传播:此后主节点每执行一个写命令,都会将该命令发送给从节点,从节点执行该命令以保持数据同步

从 Redis 2.8 开始,引入了部分重同步功能,通过偏移量(offset)和复制积压缓冲区(replication backlog)来实现:

  • 主从节点都维护一个偏移量,记录已复制的命令字节数
  • 主节点有一个固定大小的环形缓冲区,保存最近执行的写命令
  • 当从节点断线重连后,会发送自己的偏移量,如果主节点的缓冲区中包含该偏移量之后的所有命令,则进行部分重同步,否则进行全量重同步
主从复制配置

主节点通常不需要特殊配置,从节点配置如下:

# 从节点配置文件 redis.conf
# 连接主节点的IP和端口
slaveof 192.168.1.100 6379# 如果主节点有密码,需要配置密码
masterauth password# 从节点是否只读(建议设置为yes)
slave-read-only yes# 复制缓冲区大小,默认1MB
repl-backlog-size 1mb# 主节点断线后,从节点等待多久后成为主节点(仅在哨兵模式下有效)
replica-priority 100

也可以通过命令动态配置:

redis-cli

# 从节点执行
127.0.0.1:6379> slaveof 192.168.1.100 6379
OK# 如果需要取消从节点身份
127.0.0.1:6379> slaveof no one
OK

示例:Java 代码中使用主从复制

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** Redis 主从复制配置* @author ken*/
@Configuration
public class RedisConfig {/*** 配置Redis连接工厂,连接从节点进行读操作* @return Redis连接工厂*/@Beanpublic RedisConnectionFactory redisConnectionFactory() {// 配置从节点信息RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("192.168.1.101"); // 从节点IPconfig.setPort(6379); // 从节点端口config.setPassword("password"); // 密码,如果有的话return new LettuceConnectionFactory(config);}/*** 配置RedisTemplate* @param connectionFactory Redis连接工厂* @return RedisTemplate*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);// 设置键序列化器template.setKeySerializer(new StringRedisSerializer());// 设置值序列化器template.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 设置Hash键序列化器template.setHashKeySerializer(new StringRedisSerializer());// 设置Hash值序列化器template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}/*** 配置主节点RedisTemplate,用于写操作* @return 主节点RedisTemplate*/@Bean("masterRedisTemplate")public RedisTemplate<String, Object> masterRedisTemplate() {// 配置主节点信息RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("192.168.1.100"); // 主节点IPconfig.setPort(6379); // 主节点端口config.setPassword("password"); // 密码,如果有的话LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(config);connectionFactory.afterPropertiesSet();RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);// 设置序列化器,同上template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}
}

12. Redis 哨兵(Sentinel)的工作原理是什么?如何配置?

Redis 哨兵是 Redis 官方提供的高可用解决方案,它可以监控 Redis 主从节点,并在主节点故障时自动将从节点晋升为主节点,实现故障自动转移。

哨兵的工作原理
  1. 监控(Monitoring):哨兵不断检查主节点和从节点是否正常运行
  2. 通知(Notification):当某个节点出现故障时,哨兵可以通过 API 通知管理员或其他应用程序
  3. 自动故障转移(Automatic failover):当主节点不可用时,哨兵会自动将一个从节点晋升为主节点,并让其他从节点指向新的主节点

哨兵的核心功能是通过以下机制实现的:

  • 心跳检测:哨兵向所有节点发送 PING 命令,如果在指定时间内没有收到有效回复,则认为该节点主观下线(SDOWN)
  • 客观下线:当多个哨兵都认为主节点主观下线时,会通过投票机制确定主节点是否客观下线(ODOWN)
  • 领导者选举:当主节点客观下线后,哨兵之间会进行领导者选举,选出一个哨兵来执行故障转移
  • 故障转移:领导者哨兵会从所有健康的从节点中选择一个作为新的主节点,然后让其他从节点复制新的主节点

哨兵配置

哨兵通常以集群形式部署,至少需要 3 个哨兵实例以保证高可用。

哨兵配置文件(sentinel.conf)示例:

# 哨兵实例端口
port 26379# 监控主节点,mymaster是主节点名称,192.168.1.100 6379是主节点地址,2是投票数
sentinel monitor mymaster 192.168.1.100 6379 2# 主节点密码
sentinel auth-pass mymaster password# 哨兵认为主节点主观下线的时间(毫秒)
sentinel down-after-milliseconds mymaster 30000# 故障转移超时时间(毫秒)
sentinel failover-timeout mymaster 180000# 故障转移时,最多有多少个从节点同时对新主节点进行同步
sentinel parallel-syncs mymaster 1

启动哨兵:

redis-sentinel /path/to/sentinel.conf

Java 代码中使用哨兵模式:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.Collections;/*** Redis 哨兵模式配置* @author ken*/
@Configuration
public class RedisSentinelConfig {/*** 配置Redis哨兵连接工厂* @return Redis连接工厂*/@Beanpublic LettuceConnectionFactory redisConnectionFactory() {// 配置哨兵RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration();// 主节点名称sentinelConfig.setMaster("mymaster");// 哨兵节点列表sentinelConfig.setSentinels(Collections.singletonList(new RedisNode("192.168.1.100", 26379)));// 添加更多哨兵节点sentinelConfig.addSentinel(new RedisNode("192.168.1.101", 26379));sentinelConfig.addSentinel(new RedisNode("192.168.1.102", 26379));// 设置密码sentinelConfig.setPassword("password");return new LettuceConnectionFactory(sentinelConfig);}/*** 配置RedisTemplate* @param connectionFactory Redis连接工厂* @return RedisTemplate*/@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);// 设置序列化器template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}
}

13. Redis Cluster(集群)的原理和架构是什么?如何配置?

Redis Cluster 是 Redis 官方提供的分布式解决方案,用于解决单节点 Redis 的容量和性能瓶颈,支持数据分片、高可用和自动故障转移。

Redis Cluster 的原理和架构
  1. 数据分片:Redis Cluster 将数据分散存储在多个节点上,采用哈希槽(hash slot)机制,共有 16384 个哈希槽
  2. 哈希槽分配:每个节点负责一部分哈希槽,键通过 CRC16 (key) % 16384 计算所属的哈希槽
  3. 节点通信:节点之间通过 Gossip 协议进行通信,交换节点状态信息
  4. 高可用:每个主节点可以有多个从节点,当主节点故障时,从节点会晋升为主节点
  5. 客户端路由:客户端可以连接任意节点,如果该节点不负责请求的哈希槽,会返回 MOVED 或 ASK 重定向

Redis Cluster 的最小配置是 3 个主节点,每个主节点至少有 1 个从节点,共 6 个节点,以保证高可用。

Redis Cluster 配置
  1. 每个节点的配置文件(redis.conf)需要添加以下配置:
# 开启集群模式
cluster-enabled yes# 集群配置文件,由Redis自动生成和更新
cluster-config-file nodes-6379.conf# 节点超时时间(毫秒)
cluster-node-timeout 15000# 当主节点无法正常工作时,是否允许从节点自动晋升为主节点
cluster-require-full-coverage no
  1. 启动所有节点:
redis-server /path/to/redis.conf
  1. 创建集群:
# Redis 5.0+使用redis-cli创建集群
redis-cli --cluster create \
192.168.1.100:6379 \
192.168.1.101:6379 \
192.168.1.102:6379 \
192.168.1.103:6380 \
192.168.1.104:6380 \
192.168.1.105:6380 \
--cluster-replicas 1

Java 代码中使用 Redis Cluster:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.Arrays;
import java.util.List;/*** Redis Cluster 配置* @author ken*/
@Configuration
public class RedisClusterConfig {/*** 配置Redis集群连接工厂* @return Redis连接工厂*/@Beanpublic LettuceConnectionFactory redisConnectionFactory() {// 集群节点列表List<RedisNode> clusterNodes = Arrays.asList(new RedisNode("192.168.1.100", 6379),new RedisNode("192.168.1.101", 6379),new RedisNode("192.168.1.102", 6379),new RedisNode("192.168.1.103", 6380),new RedisNode("192.168.1.104", 6380),new RedisNode("192.168.1.105", 6380));// 配置集群RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();clusterConfig.setClusterNodes(clusterNodes);// 设置最大重定向次数clusterConfig.setMaxRedirects(3);// 设置密码clusterConfig.setPassword("password");return new LettuceConnectionFactory(clusterConfig);}/*** 配置RedisTemplate* @param connectionFactory Redis连接工厂* @return RedisTemplate*/@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);// 设置序列化器template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}
}

五、Redis 缓存问题

14. 什么是缓存穿透?如何解决?

缓存穿透是指查询一个不存在的数据,由于缓存中没有该数据,所有请求都会穿透到数据库,导致数据库压力增大,甚至宕机。

缓存穿透的解决方案
  1. 空值缓存:对于查询结果为空的数据,也将其缓存起来,设置一个较短的过期时间

    • 优点:实现简单
    • 缺点:可能会缓存大量空值,浪费内存
  2. 布隆过滤器(Bloom Filter):在缓存之前设置布隆过滤器,过滤掉一定不存在的数据

    • 优点:高效判断数据是否存在,内存占用小
    • 缺点:有一定的误判率,删除困难
  3. 接口层校验:在接口层对请求参数进行校验,过滤掉明显不合理的参数

    • 优点:提前拦截无效请求
    • 缺点:只能过滤简单的无效请求
  4. 限流降级:对数据库访问进行限流,当请求量超过阈值时进行降级处理

    • 优点:保护数据库不被压垮
    • 缺点:可能影响正常用户体验

示例:使用布隆过滤器解决缓存穿透

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.List;/*** 布隆过滤器工具类,用于解决缓存穿透* @author ken*/
@Component
public class BloomFilterUtil {// 预计数据量private static final long EXPECTED_INSERTIONS = 1000000;// 误判率private static final double FALSE_POSITIVE_RATE = 0.01;// 布隆过滤器实例private BloomFilter<String> bloomFilter;/*** 初始化布隆过滤器,加载所有可能存在的ID*/@PostConstructpublic void init() {// 创建布隆过滤器bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8),EXPECTED_INSERTIONS,FALSE_POSITIVE_RATE);// 从数据库加载所有存在的ID并添加到布隆过滤器loadExistingIds();}/*** 从数据库加载所有存在的ID*/private void loadExistingIds() {// 实际项目中,这里会从数据库查询所有存在的IDList<String> existingIds = Arrays.asList("1", "2", "3", "4", "5");// 将ID添加到布隆过滤器for (String id : existingIds) {bloomFilter.put(id);}}/*** 判断ID是否可能存在* @param id 要判断的ID* @return true:可能存在;false:一定不存在*/public boolean mightContain(String id) {if (!StringUtils.hasText(id)) {return false;}return bloomFilter.mightContain(id);}/*** 添加ID到布隆过滤器* @param id 要添加的ID*/public void put(String id) {if (StringUtils.hasText(id)) {bloomFilter.put(id);}}
}

使用布隆过滤器的服务类:

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.StringUtils;/*** 使用布隆过滤器解决缓存穿透示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "缓存穿透解决方案示例")
public class CachePenetrationService {private final RedisTemplate<String, Object> redisTemplate;private final BloomFilterUtil bloomFilter;private final UserMapper userMapper;/*** 获取用户信息,解决缓存穿透问题* @param userId 用户ID* @return 用户信息*/@ApiOperation("获取用户信息(防缓存穿透)")public User getUserById(String userId) {if (!StringUtils.hasText(userId)) {log.error("用户ID不能为空");throw new IllegalArgumentException("用户ID不能为空");}String key = "user:" + userId;// 1. 先检查布隆过滤器,如果不存在,直接返回nullif (!bloomFilter.mightContain(userId)) {log.info("布隆过滤器判断用户ID:{}不存在,直接返回", userId);return null;}// 2. 从缓存获取数据User user = (User) redisTemplate.opsForValue().get(key);if (user != null) {log.info("从缓存获取用户信息,userId:{}", userId);return user;}// 3. 缓存未命中,从数据库获取log.info("缓存未命中,从数据库获取用户信息,userId:{}", userId);user = userMapper.selectById(userId);if (user != null) {// 4. 数据库存在,写入缓存,设置合理的过期时间redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);log.info("用户信息写入缓存,userId:{}", userId);} else {// 5. 数据库不存在,缓存空值,设置较短的过期时间redisTemplate.opsForValue().set(key, null, 5, TimeUnit.MINUTES);log.info("用户信息不存在,缓存空值,userId:{}", userId);}return user;}
}

15. 什么是缓存击穿?如何解决?

缓存击穿是指一个热点 key 在缓存中过期的瞬间,有大量并发请求同时访问这个 key,导致所有请求都穿透到数据库,造成数据库压力骤增。

缓存击穿的解决方案
  1. 互斥锁:当缓存失效时,只允许一个线程去数据库查询并更新缓存,其他线程等待

    • 优点:实现简单,能有效防止缓存击穿
    • 缺点:可能导致线程阻塞,影响性能
  2. 热点 key 永不过期:对于热点 key,不设置过期时间,通过后台线程定期更新

    • 优点:不会出现缓存过期的情况
    • 缺点:需要额外的线程维护,可能存在数据不一致
  3. 过期时间加随机值:为热点 key 的过期时间加上一个随机值,避免大量热点 key 同时过期

    • 优点:实现简单,能分散缓存过期时间
    • 缺点:只能减轻缓存击穿问题,不能完全解决
  4. 熔断降级:当请求量超过阈值时,对部分请求进行降级处理,返回默认值

    • 优点:保护数据库
    • 缺点:可能影响用户体验

示例:使用互斥锁解决缓存击穿

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.concurrent.TimeUnit;/*** 使用互斥锁解决缓存击穿示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "缓存击穿解决方案示例")
public class CacheBreakdownService {private final RedisTemplate<String, Object> redisTemplate;private final ProductMapper productMapper;// 分布式锁前缀private static final String LOCK_PREFIX = "lock:";// 锁过期时间(毫秒)private static final long LOCK_EXPIRE = 30000;// 重试间隔(毫秒)private static final long RETRY_INTERVAL = 100;/*** 获取商品信息,解决缓存击穿问题* @param productId 商品ID* @return 商品信息*/@ApiOperation("获取商品信息(防缓存击穿)")public Product getProductById(Long productId) {if (ObjectUtils.isEmpty(productId)) {log.error("商品ID不能为空");throw new IllegalArgumentException("商品ID不能为空");}String key = "product:" + productId;// 1. 从缓存获取数据Product product = (Product) redisTemplate.opsForValue().get(key);if (product != null) {log.info("从缓存获取商品信息,productId:{}", productId);return product;}// 2. 缓存未命中,尝试获取锁String lockKey = LOCK_PREFIX + key;boolean locked = false;try {// 尝试获取锁locked = tryLock(lockKey);if (locked) {// 3. 成功获取锁,再次检查缓存(防止其他线程已更新缓存)product = (Product) redisTemplate.opsForValue().get(key);if (product != null) {log.info("再次检查缓存命中,productId:{}", productId);return product;}// 4. 缓存仍未命中,从数据库获取log.info("从数据库获取商品信息,productId:{}", productId);product = productMapper.selectById(productId);if (product != null) {// 5. 写入缓存,设置过期时间,并添加随机值避免同时过期int expireTime = 30 + (int) (Math.random() * 10);redisTemplate.opsForValue().set(key, product, expireTime, TimeUnit.MINUTES);log.info("商品信息写入缓存,productId:{}, 过期时间:{}分钟", productId, expireTime);}} else {// 6. 未获取到锁,等待重试log.info("未获取到锁,等待重试,productId:{}", productId);Thread.sleep(RETRY_INTERVAL);// 递归重试return getProductById(productId);}} catch (InterruptedException e) {log.error("获取商品信息异常", e);Thread.currentThread().interrupt();} finally {// 7. 释放锁if (locked) {unlock(lockKey);log.info("释放锁,lockKey:{}", lockKey);}}return product;}/*** 尝试获取分布式锁* @param lockKey 锁的键名* @return 是否获取成功*/private boolean tryLock(String lockKey) {// 使用setIfAbsent实现分布式锁,相当于SETNX命令Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE, TimeUnit.MILLISECONDS);return Boolean.TRUE.equals(result);}/*** 释放分布式锁* @param lockKey 锁的键名*/private void unlock(String lockKey) {// 使用Lua脚本确保释放锁的原子性String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), "1");if (result != null && result > 0) {log.info("锁释放成功,lockKey:{}", lockKey);} else {log.info("锁释放失败或锁已过期,lockKey:{}", lockKey);}}
}

16. 什么是缓存雪崩?如何解决?

缓存雪崩是指在某一时间段内,缓存中大量 key 同时过期或 Redis 服务宕机,导致大量请求直接穿透到数据库,造成数据库压力骤增,甚至宕机。

缓存雪崩的解决方案
  1. 过期时间加随机值:为 key 的过期时间加上一个随机值,避免大量 key 同时过期

    • 优点:实现简单
    • 缺点:只能解决 key 同时过期的问题
  2. 多级缓存:使用本地缓存(如 Caffeine)+ 分布式缓存(Redis)的多级缓存架构

    • 优点:即使分布式缓存失效,本地缓存还能提供服务
    • 缺点:实现复杂,可能存在数据一致性问题
  3. 服务熔断与降级:当数据库压力过大时,对部分请求进行熔断或降级处理

    • 优点:保护数据库不被压垮
    • 缺点:可能影响用户体验
  4. Redis 高可用:部署 Redis 集群(主从 + 哨兵或 Redis Cluster),避免单点故障

    • 优点:提高 Redis 的可用性
    • 缺点:增加系统复杂度和运维成本
  5. 缓存预热:在系统启动或低峰期,提前将热点数据加载到缓存中

    • 优点:避免高峰期缓存未命中
    • 缺点:需要额外的预热机制
  6. 限流:对访问数据库的请求进行限流,控制并发量

    • 优点:保护数据库
    • 缺点:可能影响用户体验

示例:使用多级缓存和限流解决缓存雪崩

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** 使用多级缓存和限流解决缓存雪崩示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "缓存雪崩解决方案示例")
public class CacheAvalancheService {private final RedisTemplate<String, Object> redisTemplate;private final GoodsMapper goodsMapper;private final RateLimiterUtil rateLimiter;// 本地缓存,使用Caffeineprivate final Cache<Long, Goods> localCache = Caffeine.newBuilder().maximumSize(10000) // 最大缓存数量.expireAfterWrite(5, TimeUnit.MINUTES) // 写入后过期时间.build();// 互斥锁,用于本地缓存更新private final ReentrantLock lock = new ReentrantLock();/*** 获取商品信息,解决缓存雪崩问题* @param goodsId 商品ID* @return 商品信息*/@ApiOperation("获取商品信息(防缓存雪崩)")public Goods getGoodsById(Long goodsId) {if (ObjectUtils.isEmpty(goodsId)) {log.error("商品ID不能为空");throw new IllegalArgumentException("商品ID不能为空");}String redisKey = "goods:" + goodsId;Goods goods;// 1. 先查询本地缓存goods = localCache.getIfPresent(goodsId);if (goods != null) {log.info("从本地缓存获取商品信息,goodsId:{}", goodsId);return goods;}// 2. 本地缓存未命中,查询Redis缓存goods = (Goods) redisTemplate.opsForValue().get(redisKey);if (goods != null) {log.info("从Redis缓存获取商品信息,goodsId:{}", goodsId);// 更新本地缓存localCache.put(goodsId, goods);return goods;}// 3. Redis缓存未命中,尝试获取数据库访问权限(限流)boolean acquire = rateLimiter.tryAcquire();if (!acquire) {log.warn("数据库访问被限流,goodsId:{}", goodsId);// 返回降级数据return getFallbackGoods(goodsId);}try {// 4. 获取到访问权限,查询数据库// 使用本地锁防止并发查询数据库lock.lock();try {// 双重检查,防止其他线程已更新缓存goods = (Goods) redisTemplate.opsForValue().get(redisKey);if (goods != null) {log.info("双重检查Redis缓存命中,goodsId:{}", goodsId);localCache.put(goodsId, goods);return goods;}log.info("从数据库获取商品信息,goodsId:{}", goodsId);goods = goodsMapper.selectById(goodsId);if (goods != null) {// 5. 写入Redis缓存,添加随机过期时间,避免同时过期int baseExpire = 30; // 基础过期时间30分钟int randomExpire = (int) (Math.random() * 10); // 0-10分钟随机值redisTemplate.opsForValue().set(redisKey, goods, baseExpire + randomExpire, TimeUnit.MINUTES);log.info("商品信息写入Redis缓存,goodsId:{}, 过期时间:{}分钟", goodsId, baseExpire + randomExpire);// 更新本地缓存localCache.put(goodsId, goods);}} finally {lock.unlock();}} finally {// 释放限流令牌(如果需要)rateLimiter.release();}return goods;}/*** 获取降级商品信息* @param goodsId 商品ID* @return 降级商品信息*/private Goods getFallbackGoods(Long goodsId) {Goods fallback = new Goods();fallback.setId(goodsId);fallback.setName("商品信息加载中,请稍后重试");fallback.setPrice(0.0);return fallback;}/*** 缓存预热方法,在系统启动或低峰期调用*/@ApiOperation("缓存预热")public void preloadCache() {log.info("开始进行缓存预热");// 获取热点商品ID列表List<Long> hotGoodsIds = goodsMapper.selectHotGoodsIds();for (Long goodsId : hotGoodsIds) {try {Goods goods = goodsMapper.selectById(goodsId);if (goods != null) {String redisKey = "goods:" + goodsId;// 添加随机过期时间int baseExpire = 30;int randomExpire = (int) (Math.random() * 10);redisTemplate.opsForValue().set(redisKey, goods, baseExpire + randomExpire, TimeUnit.MINUTES);// 更新本地缓存localCache.put(goodsId, goods);log.info("缓存预热完成,goodsId:{}", goodsId);}} catch (Exception e) {log.error("缓存预热失败,goodsId:{}", goodsId, e);}}log.info("缓存预热完成,共预热{}个商品", hotGoodsIds.size());}
}

限流工具类实现:

import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Component;/*** 限流工具类,使用Guava的RateLimiter* @author ken*/
@Component
public class RateLimiterUtil {// 每秒允许的请求数private static final double PERMITS_PER_SECOND = 100.0;// 创建RateLimiter实例private final RateLimiter rateLimiter = RateLimiter.create(PERMITS_PER_SECOND);/*** 尝试获取令牌* @return 是否获取成功*/public boolean tryAcquire() {return rateLimiter.tryAcquire();}/*** 获取令牌,会阻塞直到获取成功*/public void acquire() {rateLimiter.acquire();}/*** 释放令牌(对于RateLimiter来说,不需要显式释放,这里只是为了统一接口)*/public void release() {// RateLimiter不需要释放令牌,空实现}
}

六、Redis 分布式锁

17. 如何使用 Redis 实现分布式锁?需要注意哪些问题?

分布式锁是分布式系统中用于解决并发问题的一种机制,Redis 凭借其高性能和单线程特性,成为实现分布式锁的常用选择。

Redis 分布式锁的基本实现
  1. 获取锁:使用 SET 命令的 NX(只在键不存在时才设置)和 PX(设置过期时间)选项

    SET lock:key value NX PX 30000
    
    • 成功返回 OK,表示获取锁成功
    • 失败返回 nil,表示获取锁失败
  2. 释放锁:使用 Lua 脚本确保释放锁的原子性,防止误释放其他线程的锁

    if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])
    elsereturn 0
    end
    
Redis 分布式锁需要注意的问题
  1. 锁超时问题:如果持有锁的线程在锁过期前未完成操作,会导致锁被其他线程获取,可能引发并发问题

    • 解决方案:设置合理的过期时间,或实现锁的自动续期(看门狗机制)
  2. 锁释放问题:如果持有锁的线程在释放锁前崩溃,会导致锁永远无法释放

    • 解决方案:必须为锁设置过期时间
  3. 误释放问题:如果线程 A 的锁已过期,线程 B 获取了锁,此时线程 A 执行完操作释放锁,可能会释放线程 B 的锁

    • 解决方案:释放锁时检查锁的所有者
  4. Redis 集群下的锁可靠性问题:在 Redis 集群中,主节点宕机可能导致锁丢失

    • 解决方案:使用 Redlock 算法,或使用 Redis Cluster 的同步机制
  5. 并发性能问题:大量线程竞争锁可能导致性能下降

    • 解决方案:合理设置锁粒度,或使用分段锁

示例:Redis 分布式锁实现(带看门狗机制)

java

运行

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** Redis分布式锁实现(带看门狗机制)* @author ken*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisDistributedLock {private final RedisTemplate<String, Object> redisTemplate;// 线程池,用于执行看门狗任务private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1,new ThreadFactory() {private int counter = 0;@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "redis-lock-watchdog-" + counter++);thread.setDaemon(true); // 守护线程,随主线程退出return thread;}});// 默认锁过期时间(毫秒)private static final long DEFAULT_LOCK_EXPIRE = 30000;// 看门狗续期间隔(为过期时间的1/3)private static final long WATCHDOG_INTERVAL = DEFAULT_LOCK_EXPIRE / 3;/*** 获取分布式锁* @param lockKey 锁的键名* @return 锁的标识(用于释放锁),null表示获取失败*/public String lock(String lockKey) {return lock(lockKey, DEFAULT_LOCK_EXPIRE);}/*** 获取分布式锁* @param lockKey 锁的键名* @param expireTime 锁的过期时间(毫秒)* @return 锁的标识(用于释放锁),null表示获取失败*/public String lock(String lockKey, long expireTime) {if (!StringUtils.hasText(lockKey)) {log.error("锁的键名不能为空");throw new IllegalArgumentException("锁的键名不能为空");}if (expireTime <= 0) {expireTime = DEFAULT_LOCK_EXPIRE;}// 生成唯一的锁标识String lockValue = UUID.randomUUID().toString();// 使用SET NX PX命令获取锁Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {log.info("获取分布式锁成功,lockKey:{}", lockKey);// 启动看门狗,自动续期startWatchdog(lockKey, lockValue, expireTime);return lockValue;}log.info("获取分布式锁失败,lockKey:{}", lockKey);return null;}/*** 尝试获取分布式锁,在指定时间内获取不到则返回失败* @param lockKey 锁的键名* @param waitTime 最大等待时间(毫秒)* @param expireTime 锁的过期时间(毫秒)* @return 锁的标识(用于释放锁),null表示获取失败* @throws InterruptedException 线程中断异常*/public String tryLock(String lockKey, long waitTime, long expireTime) throws InterruptedException {long startTime = System.currentTimeMillis();long interval = 100; // 重试间隔(毫秒)while (true) {// 尝试获取锁String lockValue = lock(lockKey, expireTime);if (lockValue != null) {return lockValue;}// 检查是否超时if (System.currentTimeMillis() - startTime >= waitTime) {log.info("尝试获取分布式锁超时,lockKey:{}", lockKey);return null;}// 等待一段时间后重试Thread.sleep(interval);}}/*** 释放分布式锁* @param lockKey 锁的键名* @param lockValue 锁的标识(获取锁时返回的值)* @return 是否释放成功*/public boolean unlock(String lockKey, String lockValue) {if (!StringUtils.hasText(lockKey) || !StringUtils.hasText(lockValue)) {log.error("锁的键名或标识不能为空");return false;}// Lua脚本:检查锁的标识是否匹配,匹配则释放锁String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), lockValue);// 停止看门狗stopWatchdog(lockKey, lockValue);if (result != null && result > 0) {log.info("释放分布式锁成功,lockKey:{}", lockKey);return true;} else {log.info("释放分布式锁失败或锁已过期,lockKey:{}", lockKey);return false;}}/*** 启动看门狗,自动续期* @param lockKey 锁的键名* @param lockValue 锁的标识* @param expireTime 锁的过期时间(毫秒)*/private void startWatchdog(String lockKey, String lockValue, long expireTime) {// 创建看门狗任务WatchdogTask task = new WatchdogTask(lockKey, lockValue, expireTime);// 存储任务,用于后续停止ThreadLocalHolder.set(lockKey + ":" + lockValue, task);// 定时执行任务,延迟expireTime/3后执行,之后每隔expireTime/3执行一次scheduler.scheduleWithFixedDelay(task,WATCHDOG_INTERVAL,WATCHDOG_INTERVAL,TimeUnit.MILLISECONDS);}/*** 停止看门狗* @param lockKey 锁的键名* @param lockValue 锁的标识*/private void stopWatchdog(String lockKey, String lockValue) {String key = lockKey + ":" + lockValue;WatchdogTask task = ThreadLocalHolder.get(key);if (task != null) {task.cancel();ThreadLocalHolder.remove(key);}}/*** 看门狗任务,负责锁的自动续期*/private class WatchdogTask implements Runnable {private final String lockKey;private final String lockValue;private final long expireTime;private final AtomicBoolean cancelled = new AtomicBoolean(false);public WatchdogTask(String lockKey, String lockValue, long expireTime) {this.lockKey = lockKey;this.lockValue = lockValue;this.expireTime = expireTime;}@Overridepublic void run() {if (cancelled.get()) {return;}// Lua脚本:检查锁的标识是否匹配,匹配则续期String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);Long result = redisTemplate.execute(redisScript,Collections.singletonList(lockKey),lockValue,String.valueOf(expireTime));if (result == null || result == 0) {log.info("锁续期失败或锁已释放,lockKey:{}", lockKey);cancel();} else {log.debug("锁续期成功,lockKey:{}, 过期时间:{}ms", lockKey, expireTime);}}public void cancel() {cancelled.set(true);}}/*** ThreadLocal工具类,用于存储看门狗任务*/private static class ThreadLocalHolder {private static final ThreadLocal<ConcurrentHashMap<String, WatchdogTask>> holder = ThreadLocal.withInitial(ConcurrentHashMap::new);public static void set(String key, WatchdogTask task) {holder.get().put(key, task);}public static WatchdogTask get(String key) {return holder.get().get(key);}public static void remove(String key) {holder.get().remove(key);}}
}

分布式锁使用示例:

java

运行

import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.StringUtils;/*** 分布式锁使用示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "分布式锁使用示例")
public class DistributedLockDemoService {private final RedisDistributedLock distributedLock;private final OrderMapper orderMapper;/*** 创建订单,使用分布式锁防止超卖* @param productId 商品ID* @param userId 用户ID* @return 订单ID*/@ApiOperation("创建订单(防超卖)")public Long createOrder(Long productId, Long userId) {if (productId == null || userId == null) {log.error("商品ID和用户ID不能为空");throw new IllegalArgumentException("商品ID和用户ID不能为空");}// 锁的键名,使用商品ID确保同一商品的并发控制String lockKey = "order:lock:" + productId;String lockValue = null;try {// 尝试获取锁,最多等待3秒,锁过期时间5秒lockValue = distributedLock.tryLock(lockKey, 3000, 5000);if (!StringUtils.hasText(lockValue)) {log.error("获取分布式锁失败,无法创建订单,productId:{}", productId);throw new RuntimeException("系统繁忙,请稍后再试");}// 检查库存Product product = orderMapper.selectProductById(productId);if (product == null) {log.error("商品不存在,productId:{}", productId);throw new RuntimeException("商品不存在");}if (product.getStock() <= 0) {log.error("商品库存不足,productId:{}", productId);throw new RuntimeException("商品库存不足");}// 扣减库存int rows = orderMapper.decreaseStock(productId, 1);if (rows <= 0) {log.error("扣减库存失败,productId:{}", productId);throw new RuntimeException("创建订单失败");}// 创建订单Order order = new Order();order.setProductId(productId);order.setUserId(userId);order.setStatus(1); // 待支付orderMapper.insert(order);log.info("订单创建成功,orderId:{}, productId:{}", order.getId(), productId);return order.getId();} catch (InterruptedException e) {log.error("创建订单异常", e);Thread.currentThread().interrupt();throw new RuntimeException("创建订单失败");} finally {// 释放锁if (StringUtils.hasText(lockValue)) {distributedLock.unlock(lockKey, lockValue);}}}
}

七、Redis 高级特性

18. Redis 事务的实现原理是什么?有什么局限性?

Redis 事务允许一次执行多个命令,提供了批量执行命令的功能,并保证这些命令的原子性:要么全部执行,要么全部不执行。

Redis 事务的实现原理

Redis 事务通过以下四个命令实现:

  1. MULTI:标记一个事务块的开始
  2. EXEC:执行事务块中的所有命令
  3. DISCARD:取消事务,放弃执行事务块中的所有命令
  4. WATCH:监视一个或多个键,如果在事务执行前这些键被其他命令修改,则事务被打断

Redis 事务的执行过程:

  1. 客户端发送 MULTI 命令,Redis 服务器返回 "OK",表示事务开始
  2. 客户端发送的所有命令都会被服务器缓存到一个事务队列中,而不是立即执行
  3. 客户端发送 EXEC 命令,服务器会按顺序执行事务队列中的所有命令,并将结果返回给客户端
  4. 如果在事务执行前客户端发送了 DISCARD 命令,服务器会清空事务队列,并放弃执行事务
  5. 如果使用了 WATCH 命令监视键,在事务执行前如果有被监视的键被修改,事务会被取消
Redis 事务的局限性
  1. 没有隔离级别的概念:事务执行过程中,其他客户端的命令可以正常执行,可能看到事务执行过程中的中间结果
  2. 不支持回滚:如果事务中的某个命令执行失败,其他命令仍然会继续执行,不会回滚
  3. WATCH 命令的局限性:WATCH 只能监视键的修改,无法监视键的过期或删除
  4. 事务队列中的命令不支持条件判断:无法根据前一个命令的结果决定是否执行下一个命令
  5. 长时间运行的事务会阻塞其他命令:Redis 是单线程的,长时间运行的事务会阻塞其他命令的执行

示例:Redis 事务使用

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.ObjectUtils;
import java.util.List;/*** Redis事务示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis事务示例")
public class RedisTransactionService {private final RedisTemplate<String, Object> redisTemplate;/*** 使用Redis事务进行转账操作* @param fromAccount 转出账户* @param toAccount 转入账户* @param amount 转账金额* @return 转账是否成功*/@ApiOperation("使用Redis事务进行转账")public boolean transfer(String fromAccount, String toAccount, double amount) {if (!StringUtils.hasText(fromAccount) || !StringUtils.hasText(toAccount)) {log.error("转出账户和转入账户不能为空");throw new IllegalArgumentException("转出账户和转入账户不能为空");}if (amount <= 0) {log.error("转账金额必须大于0");throw new IllegalArgumentException("转账金额必须大于0");}if (fromAccount.equals(toAccount)) {log.error("转出账户和转入账户不能相同");throw new IllegalArgumentException("转出账户和转入账户不能相同");}String fromKey = "account:" + fromAccount;String toKey = "account:" + toAccount;// 使用SessionCallback执行事务List<Object> results = redisTemplate.execute(new SessionCallback<List<Object>>() {@Override@SuppressWarnings("unchecked")public List<Object> execute(RedisOperations operations) throws Exception {// 监视转出账户,防止在事务执行前被修改operations.watch(fromKey);// 检查转出账户余额Double fromBalance = (Double) operations.opsForValue().get(fromKey);if (fromBalance == null) {log.error("转出账户不存在:{}", fromAccount);operations.unwatch(); // 取消监视return null;}if (fromBalance < amount) {log.error("转出账户余额不足:{},余额:{},转账金额:{}", fromAccount, fromBalance, amount);operations.unwatch(); // 取消监视return null;}// 开始事务operations.multi();// 扣减转出账户余额operations.opsForValue().increment(fromKey, -amount);// 增加转入账户余额,如果账户不存在则初始化为0后再增加operations.opsForValue().increment(toKey, amount);// 执行事务return operations.exec();}});if (ObjectUtils.isEmpty(results)) {log.error("转账失败,事务未执行");return false;}log.info("转账成功,从{}到{},金额:{}", fromAccount, toAccount, amount);return true;}/*** 初始化账户余额* @param account 账户名* @param balance 初始余额*/@ApiOperation("初始化账户余额")public void initAccount(String account, double balance) {if (!StringUtils.hasText(account)) {log.error("账户名不能为空");throw new IllegalArgumentException("账户名不能为空");}if (balance < 0) {log.error("初始余额不能为负数");throw new IllegalArgumentException("初始余额不能为负数");}String key = "account:" + account;redisTemplate.opsForValue().set(key, balance);log.info("账户初始化成功,account:{}, balance:{}", account, balance);}/*** 查询账户余额* @param account 账户名* @return 账户余额*/@ApiOperation("查询账户余额")public Double getAccountBalance(String account) {if (!StringUtils.hasText(account)) {log.error("账户名不能为空");throw new IllegalArgumentException("账户名不能为空");}String key = "account:" + account;Double balance = (Double) redisTemplate.opsForValue().get(key);return balance == null ? 0.0 : balance;}
}

19. Redis 的 Lua 脚本有什么作用?如何使用?

Lua 脚本是 Redis 支持的一种脚本语言,允许用户在 Redis 服务器端执行自定义的脚本,提供了更强大的功能和灵活性。

Redis Lua 脚本的作用
  1. 原子性操作:Lua 脚本在 Redis 中是原子执行的,不会被其他命令打断,保证了复杂操作的原子性
  2. 减少网络开销:将多个命令打包成一个 Lua 脚本执行,减少了客户端与服务器之间的网络交互
  3. 自定义命令:可以通过 Lua 脚本实现 Redis 不支持的自定义命令
  4. 提高性能:脚本在服务器端执行,减少了数据传输和命令解析的开销
Redis Lua 脚本的使用
  1. EVAL 命令:直接执行 Lua 脚本

    redis-cli

    EVAL "return KEYS[1] .. ARGV[1]" 1 "hello" "world"
    
  2. EVALSHA 命令:执行已缓存的 Lua 脚本(通过脚本的 SHA1 哈希值)

    redis-cli

    # 先加载脚本并获取SHA1哈希值
    SCRIPT LOAD "return KEYS[1] .. ARGV[1]"
    # 执行脚本
    EVALSHA <sha1> 1 "hello" "world"
    
  3. SCRIPT 命令:管理 Lua 脚本,如加载、删除、检查是否存在等

Lua 脚本中可以使用 Redis 命令:

-- 示例:实现一个原子性的增减操作,如果键不存在则初始化
local current = redis.call('get', KEYS[1])
if current == nil thenredis.call('set', KEYS[1], ARGV[1])return ARGV[1]
elselocal result = current + ARGV[1]redis.call('set', KEYS[1], result)return result
end
Lua 脚本的注意事项
  1. 避免长时间运行的脚本:Redis 是单线程的,长时间运行的脚本会阻塞其他命令的执行
  2. 脚本的原子性:脚本执行过程中,其他客户端的命令会被阻塞,直到脚本执行完成
  3. 脚本的安全性:避免在脚本中使用不确定的操作(如随机数、时间等),可能导致复制不一致
  4. 内存使用:大量的 Lua 脚本会占用 Redis 的内存,可以使用 SCRIPT FLUSH 命令清除所有缓存的脚本

示例:Redis Lua 脚本使用

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.List;/*** Redis Lua脚本示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Lua脚本示例")
public class RedisLuaScriptService {private final RedisTemplate<String, Object> redisTemplate;/*** 使用Lua脚本实现原子性的库存扣减* @param productId 商品ID* @param quantity 扣减数量* @return 扣减后的库存,-1表示库存不足,-2表示商品不存在*/@ApiOperation("使用Lua脚本扣减库存")public Long decreaseStockWithLua(Long productId, int quantity) {if (ObjectUtils.isEmpty(productId) || quantity <= 0) {log.error("商品ID不能为空且扣减数量必须大于0");throw new IllegalArgumentException("商品ID不能为空且扣减数量必须大于0");}String key = "stock:" + productId;// Lua脚本:原子性扣减库存String luaScript = "local current = redis.call('get', KEYS[1]) " +"if current == nil then " +"    return -2 " +  // 商品不存在"end " +"if tonumber(current) < tonumber(ARGV[1]) then " +"    return -1 " +  // 库存不足"end " +"local result = redis.call('decrby', KEYS[1], ARGV[1]) " +"return result";   // 返回扣减后的库存DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);// 执行Lua脚本Long result = redisTemplate.execute(redisScript,Collections.singletonList(key),String.valueOf(quantity));log.info("库存扣减结果,productId:{}, quantity:{}, result:{}", productId, quantity, result);return result;}/*** 使用Lua脚本实现分布式限流* @param limitKey 限流键名* @param limit 限制次数* @param period 时间窗口(秒)* @return 是否允许访问:1-允许,0-拒绝*/@ApiOperation("使用Lua脚本实现分布式限流")public Long rateLimitWithLua(String limitKey, int limit, int period) {if (!StringUtils.hasText(limitKey) || limit <= 0 || period <= 0) {log.error("限流键名不能为空且限制次数和时间窗口必须大于0");throw new IllegalArgumentException("限流键名不能为空且限制次数和时间窗口必须大于0");}// Lua脚本:实现滑动窗口限流String luaScript = "local current = redis.call('incr', KEYS[1]) " +"if current == 1 then " +"    redis.call('expire', KEYS[1], ARGV[2]) " +  // 设置过期时间"end " +"if current > tonumber(ARGV[1]) then " +"    return 0 " +  // 超过限制,拒绝访问"else " +"    return 1 " +  // 未超过限制,允许访问"end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);// 执行Lua脚本Long result = redisTemplate.execute(redisScript,Collections.singletonList(limitKey),String.valueOf(limit),String.valueOf(period));log.info("限流结果,limitKey:{}, limit:{}, period:{}, result:{}", limitKey, limit, period, result);return result;}/*** 加载Lua脚本并返回其SHA1哈希值* @param luaScript Lua脚本内容* @return 脚本的SHA1哈希值*/@ApiOperation("加载Lua脚本")public String loadLuaScript(String luaScript) {if (!StringUtils.hasText(luaScript)) {log.error("Lua脚本内容不能为空");throw new IllegalArgumentException("Lua脚本内容不能为空");}DefaultRedisScript<String> script = new DefaultRedisScript<>(luaScript, String.class);String sha1 = redisTemplate.execute(script, Collections.emptyList());log.info("Lua脚本加载成功,SHA1:{}", sha1);return sha1;}/*** 通过SHA1哈希值执行已加载的Lua脚本* @param sha1 脚本的SHA1哈希值* @param keys 键列表* @param args 参数列表* @return 脚本执行结果*/@ApiOperation("通过SHA1执行Lua脚本")public Object executeLuaScriptBySha1(String sha1, List<String> keys, List<String> args) {if (!StringUtils.hasText(sha1)) {log.error("SHA1哈希值不能为空");throw new IllegalArgumentException("SHA1哈希值不能为空");}if (CollectionUtils.isEmpty(keys)) {keys = Collections.emptyList();}if (CollectionUtils.isEmpty(args)) {args = Collections.emptyList();}DefaultRedisScript<Object> script = new DefaultRedisScript<>();script.setSha1(sha1);script.setResultType(Object.class);Object result = redisTemplate.execute(script, keys, args.toArray());log.info("通过SHA1执行Lua脚本,结果:{}", result);return result;}
}

20. Redis 的发布 / 订阅机制是什么?如何使用?

Redis 的发布 / 订阅(Pub/Sub)机制是一种消息通信模式,允许发送者(发布者)发送消息,接收者(订阅者)接收消息。Redis 作为中间代理,负责将发布者发送的消息转发给所有订阅了相关频道的订阅者。

Redis 发布 / 订阅的基本概念
  1. 频道(Channel):消息的传递通道,每个消息都发送到指定的频道
  2. 发布者(Publisher):发送消息到指定频道的客户端
  3. 订阅者(Subscriber):订阅一个或多个频道,接收发送到这些频道的消息
  4. 模式订阅(Pattern Matching):订阅者可以通过模式匹配订阅多个相关的频道(如订阅 "news.*" 可以接收 "news.sport"、"news.politics" 等频道的消息)
Redis 发布 / 订阅的命令
  1. 发布消息

    • PUBLISH channel message:向指定频道发布消息,返回接收消息的订阅者数量
  2. 订阅频道

    • SUBSCRIBE channel [channel ...]:订阅一个或多个频道
    • PSUBSCRIBE pattern [pattern ...]:通过模式匹配订阅多个频道
  3. 取消订阅

    • UNSUBSCRIBE [channel [channel ...]]:取消订阅一个或多个频道
    • PUNSUBSCRIBE [pattern [pattern ...]]:取消通过模式匹配订阅的频道
Redis 发布 / 订阅的特点
  1. 实时性:消息一经发布,立即发送给所有订阅者
  2. 无持久化:Redis 不会持久化发布的消息,如果订阅者在消息发布时未在线,则会错过该消息
  3. 多对多:一个发布者可以向多个频道发布消息,一个订阅者可以订阅多个频道
  4. 轻量级:实现简单,开销小
Redis 发布 / 订阅的局限性
  1. 消息不持久:没有消息堆积和持久化机制,不适合需要可靠消息传递的场景
  2. 没有消息确认机制:无法确认订阅者是否收到消息
  3. 不支持消息回溯:订阅者只能收到订阅之后发布的消息
  4. 集群支持有限:在 Redis 集群中,发布的消息只会发送给同一个节点上的订阅者

对于需要可靠消息传递的场景,建议使用专门的消息队列(如 RabbitMQ、Kafka 等)。

示例:Redis 发布 / 订阅使用

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;/*** Redis发布/订阅示例服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis发布/订阅示例")
public class RedisPubSubService {private final RedisTemplate<String, Object> redisTemplate;private final RedisMessageListenerContainer redisMessageListenerContainer;/*** 发布消息到指定频道* @param channel 频道名称* @param message 消息内容* @return 接收消息的订阅者数量*/@ApiOperation("发布消息")public Long publishMessage(String channel, String message) {if (!StringUtils.hasText(channel) || !StringUtils.hasText(message)) {log.error("频道名称和消息内容不能为空");throw new IllegalArgumentException("频道名称和消息内容不能为空");}Long count = redisTemplate.convertAndSend(channel, message);log.info("向频道{}发布消息:{},接收者数量:{}", channel, message, count);return count;}/*** 订阅指定频道* @param channel 频道名称* @param listener 消息监听器*/@ApiOperation("订阅频道")public void subscribeChannel(String channel, MessageListener listener) {if (!StringUtils.hasText(channel) || listener == null) {log.error("频道名称和监听器不能为空");throw new IllegalArgumentException("频道名称和监听器不能为空");}redisMessageListenerContainer.addMessageListener(listener, new ChannelTopic(channel));log.info("已订阅频道:{}", channel);}/*** 通过模式匹配订阅多个频道* @param pattern 模式(如"news.*")* @param listener 消息监听器*/@ApiOperation("模式订阅")public void psubscribePattern(String pattern, MessageListener listener) {if (!StringUtils.hasText(pattern) || listener == null) {log.error("模式和监听器不能为空");throw new IllegalArgumentException("模式和监听器不能为空");}redisMessageListenerContainer.addMessageListener(listener, new PatternTopic(pattern));log.info("已通过模式订阅:{}", pattern);}/*** 取消订阅指定频道* @param channel 频道名称* @param listener 消息监听器*/@ApiOperation("取消订阅")public void unsubscribeChannel(String channel, MessageListener listener) {if (!StringUtils.hasText(channel) || listener == null) {log.error("频道名称和监听器不能为空");throw new IllegalArgumentException("频道名称和监听器不能为空");}redisMessageListenerContainer.removeMessageListener(listener, new ChannelTopic(channel));log.info("已取消订阅频道:{}", channel);}/*** 示例:新闻消息监听器*/@Servicepublic static class NewsMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String body = new String(message.getBody());log.info("收到新闻消息,频道:{},内容:{}", channel, body);// 处理消息的业务逻辑processNewsMessage(channel, body);}private void processNewsMessage(String channel, String content) {// 实际业务处理逻辑log.info("处理新闻消息: {} - {}", channel, content);}}/*** 初始化示例订阅*/@PostConstructpublic void initSubscriptions() {// 创建新闻消息监听器NewsMessageListener newsListener = new NewsMessageListener();// 订阅体育新闻频道subscribeChannel("news.sport", newsListener);// 通过模式匹配订阅所有新闻频道psubscribePattern("news.*", newsListener);}/*** 销毁时关闭监听器容器*/@PreDestroypublic void destroy() {redisMessageListenerContainer.stop();log.info("Redis消息监听器容器已关闭");}
}

Redis 消息监听器容器配置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** Redis消息监听器配置* @author ken*/
@Configuration
public class RedisMessageListenerConfig {/*** 配置Redis消息监听器容器* @param connectionFactory Redis连接工厂* @return Redis消息监听器容器*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 配置线程池,用于处理消息Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);container.setTaskExecutor(executor);return container;}
}

八、Redis 性能优化

21. 如何优化 Redis 的性能?

Redis 是高性能的内存数据库,但在高并发场景下,仍可能遇到性能瓶颈。以下是一些常见的 Redis 性能优化方法:

1. 合理配置内存
  • 设置最大内存限制(maxmemory),避免 Redis 耗尽系统内存
  • 根据业务需求选择合适的内存淘汰策略(maxmemory-policy)
    • volatile-lru:从已设置过期时间的数据集挑选最近最少使用的数据淘汰
    • allkeys-lru:从所有数据集挑选最近最少使用的数据淘汰
    • volatile-lfu:从已设置过期时间的数据集挑选最不经常使用的数据淘汰
    • allkeys-lfu:从所有数据集挑选最不经常使用的数据淘汰
    • volatile-random:从已设置过期时间的数据集随机挑选数据淘汰
    • allkeys-random:从所有数据集随机挑选数据淘汰
    • volatile-ttl:从已设置过期时间的数据集挑选将要过期的数据淘汰
    • noeviction:不淘汰任何数据,内存满时新写入操作报错
# 配置示例
maxmemory 4gb
maxmemory-policy allkeys-lru
2. 优化数据结构和命令
  • 选择合适的数据结构,避免过度使用 String 存储复杂数据
  • 避免使用 O (N) 复杂度的命令(如 KEYS、HGETALL 等),大数据集优先使用 SCAN、HSCAN 等迭代命令
  • 使用批量命令(如 MSET、MGET、HMSET、HMGET)减少网络往返次数
  • 对大型集合使用分片处理,避免单个键过大
/*** 优化示例:使用SCAN替代KEYS命令* @author ken*/
@ApiOperation("使用SCAN命令遍历键,替代KEYS")
public List<String> scanKeys(String pattern) {if (!StringUtils.hasText(pattern)) {pattern = "*";}List<String> keys = Lists.newArrayList();ScanOptions options = ScanOptions.scanOptions().match(pattern).count(100) // 每次扫描数量.build();RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();try (Cursor<byte[]> cursor = connection.scan(options)) {while (cursor.hasNext()) {String key = new String(cursor.next(), StandardCharsets.UTF_8);keys.add(key);}} finally {connection.close();}return keys;
}
3. 持久化优化
  • 根据业务需求选择合适的持久化方式(RDB、AOF 或混合持久化)
  • 调整 AOF 重写触发条件,避免频繁重写
  • 对于 AOF,优先使用 everysec 同步策略平衡安全性和性能
  • 大内存实例建议关闭 RDB 自动保存,手动在低峰期执行 bgsave

redis.conf

# AOF重写配置示例
auto-aof-rewrite-min-size 64mb
auto-aof-rewrite-percentage 100
appendfsync everysec# 关闭自动RDB保存
save ""
4. 网络优化
  • 使用连接池管理 Redis 连接,避免频繁创建和关闭连接
  • 调整 TCP 参数,优化网络传输效率
  • 大 value 数据建议压缩后存储,减少网络传输量
  • 避免大量小命令,合并为批量命令执行
/*** Redis连接池配置优化* @author ken*/
@Configuration
public class RedisPoolConfig {@Beanpublic LettuceConnectionFactory redisConnectionFactory() {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("127.0.0.1");config.setPort(6379);// 连接池配置GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(16); // 最大连接数poolConfig.setMaxIdle(8); // 最大空闲连接poolConfig.setMinIdle(4); // 最小空闲连接poolConfig.setMaxWait(Duration.ofSeconds(3)); // 最大等待时间poolConfig.setTestOnBorrow(true); // 借用时测试连接可用性LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig).commandTimeout(Duration.ofSeconds(2)).build();return new LettuceConnectionFactory(config, clientConfig);}
}
5. 集群优化
  • 合理分配哈希槽,避免数据倾斜
  • 为每个主节点配置足够的从节点,提高可用性
  • 实现读写分离,将读请求分散到从节点
  • 热点数据分散存储,避免单节点压力过大
6. 硬件和系统优化
  • 使用高性能 CPU 和大内存,避免内存不足导致 swap
  • 配置大页内存(hugepage)提高内存访问效率
  • 关闭 THP(Transparent Huge Pages),避免 Redis 延迟波动
  • 使用 SSD 存储持久化文件,提升 RDB/AOF 读写速度
  • 调整系统内核参数,优化网络和内存性能
# 关闭THP示例(Linux系统)
echo never > /sys/kernel/mm/transparent_hugepage/enabled
7. 监控和调优
  • 监控 Redis 关键指标(内存使用、命中率、响应时间等)
  • 使用 Redis 自带的 INFO 命令分析性能瓶颈
  • 定期进行性能测试,验证优化效果
  • 根据监控数据持续调优配置

22. Redis 的内存淘汰策略有哪些?如何选择?

Redis 的内存淘汰策略是指当 Redis 的内存使用达到 maxmemory 限制时,如何选择要淘汰的键,以释放内存空间。

内存淘汰策略分类
  1. 基于过期时间的淘汰策略

    • volatile-lru:从已设置过期时间的键中,淘汰最近最少使用的键
    • volatile-lfu:从已设置过期时间的键中,淘汰最不经常使用的键
    • volatile-random:从已设置过期时间的键中,随机淘汰
    • volatile-ttl:从已设置过期时间的键中,淘汰剩余生存时间最短的键
  2. 全量数据淘汰策略

    • allkeys-lru:从所有键中,淘汰最近最少使用的键
    • allkeys-lfu:从所有键中,淘汰最不经常使用的键
    • allkeys-random:从所有键中,随机淘汰
  3. 禁止淘汰策略

    • noeviction:不淘汰任何键,内存满时新写入操作报错(默认策略)
淘汰策略选择依据
  1. 业务数据特性

    • 有明确过期时间的业务:优先选择 volatile-* 系列策略
    • 无过期时间的缓存业务:选择 allkeys-* 系列策略
    • 数据访问分布均匀:可选择 random 系列策略
  2. 访问模式

    • 热点数据集中:优先选择 LRU/LFU 策略,保留热点数据
    • 访问频率差异大:LFU 比 LRU 更适合,能淘汰长期访问频率低的键
  3. 数据重要性

    • 数据不允许丢失:选择 noeviction 策略,结合持久化保证数据安全
    • 允许部分数据丢失:选择 LRU/LFU 等策略,保证服务可用性
实际应用建议
  • 缓存场景:优先使用 allkeys-lru 或 allkeys-lfu,确保热点数据不被淘汰
  • 会话存储:使用 volatile-ttl,确保会话过期后及时释放内存
  • 计数器 / 限流器:使用 volatile-lru,保留近期活跃的计数数据
  • 高可用要求高的场景:避免使用 noeviction,防止服务不可用
/*** 动态调整Redis内存淘汰策略* @author ken*/
@ApiOperation("调整Redis内存淘汰策略")
public boolean setMemoryPolicy(String policy) {if (!StringUtils.hasText(policy)) {log.error("淘汰策略不能为空");throw new IllegalArgumentException("淘汰策略不能为空");}// 验证策略合法性List<String> validPolicies = Arrays.asList("volatile-lru", "volatile-lfu", "volatile-random", "volatile-ttl","allkeys-lru", "allkeys-lfu", "allkeys-random", "noeviction");if (!validPolicies.contains(policy)) {log.error("无效的淘汰策略:{}", policy);return false;}try {redisTemplate.execute((RedisCallback<Void>) connection -> {connection.setConfig("maxmemory-policy", policy);return null;});log.info("Redis内存淘汰策略已调整为:{}", policy);return true;} catch (Exception e) {log.error("调整内存淘汰策略失败", e);return false;}
}

23. 如何解决 Redis 的热点 key 问题?

热点 key 是指访问频率极高的 key,可能导致单个 Redis 节点压力过大,成为性能瓶颈。

热点 key 的危害
  1. 单个节点 CPU 利用率过高
  2. 网络带宽占用过大
  3. 可能导致缓存击穿
  4. 影响整个 Redis 集群的稳定性
热点 key 解决方案
  1. 客户端缓存

    • 在应用端本地缓存热点数据(如使用 Caffeine)
    • 减少对 Redis 的重复访问,降低 Redis 压力
  2. 数据分片

    • 将热点 key 分散到多个 Redis 节点
    • 通过添加随机后缀实现(如 key:{random_num})
  3. 读写分离

    • 主节点负责写操作,多个从节点负责读操作
    • 将热点 key 的读请求分散到多个从节点
  4. 热点 key 预加载

    • 系统启动时提前加载热点数据到 Redis
    • 定期更新热点数据,避免缓存失效
  5. 限流熔断

    • 对热点 key 的访问进行限流,避免节点被压垮
    • 超过阈值时返回降级数据,保护 Redis
热点 key 检测
  1. 通过 Redis 监控工具(如 RedisInsight)识别热点 key
  2. 利用 Redis 的 INFO 命令统计命令执行频率
  3. 结合业务日志分析高频访问的 key
/*** 热点key处理示例:客户端缓存+随机后缀分片* @author ken*/
@Service
public class HotKeyService {// 本地缓存热点数据,过期时间1分钟private final LoadingCache<String, Object> localCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).maximumSize(1000).build(this::loadDataFromRedis);/*** 获取热点数据(本地缓存+Redis分片)* @param baseKey 基础key* @return 数据*/@ApiOperation("获取热点数据(防热点key优化)")public Object getHotData(String baseKey) {if (!StringUtils.hasText(baseKey)) {throw new IllegalArgumentException("key不能为空");}try {// 优先从本地缓存获取return localCache.get(baseKey);} catch (Exception e) {log.error("获取热点数据失败", e);return null;}}/*** 从Redis加载数据(带随机后缀分片)* @param baseKey 基础key* @return 数据*/private Object loadDataFromRedis(String baseKey) {// 生成随机后缀(0-9),将热点key分散到10个不同的keyint randomSuffix = ThreadLocalRandom.current().nextInt(10);String shardedKey = baseKey + ":" + randomSuffix;// 从Redis获取数据Object data = redisTemplate.opsForValue().get(shardedKey);// 如果当前分片没有数据,从主key同步if (data == null) {String mainKey = baseKey + ":main";data = redisTemplate.opsForValue().get(mainKey);// 同步到当前分片,设置较短过期时间if (data != null) {redisTemplate.opsForValue().set(shardedKey, data, 5, TimeUnit.MINUTES);}}return data;}/*** 更新热点数据(同步所有分片)* @param baseKey 基础key* @param data 新数据*/@ApiOperation("更新热点数据")public void updateHotData(String baseKey, Object data) {if (!StringUtils.hasText(baseKey) || ObjectUtils.isEmpty(data)) {throw new IllegalArgumentException("key和数据不能为空");}// 更新主keyString mainKey = baseKey + ":main";redisTemplate.opsForValue().set(mainKey, data, 30, TimeUnit.MINUTES);// 批量更新所有分片keyList<String> shardedKeys = Lists.newArrayList();for (int i = 0; i < 10; i++) {shardedKeys.add(baseKey + ":" + i);}// 使用管道批量操作redisTemplate.executePipelined((RedisCallback<Void>) connection -> {for (String key : shardedKeys) {connection.set(key.getBytes(), JSON.toJSONBytes(data));connection.expire(key.getBytes(), 30 * 60);}return null;});// 清除本地缓存,下次重新加载localCache.invalidate(baseKey);}
}

九、Redis 集群实战

24. Redis Cluster 如何处理数据分片和迁移?

Redis Cluster 通过哈希槽(Hash Slot)实现数据分片,共有 16384 个哈希槽,每个键通过哈希算法分配到特定的槽位。

数据分片原理
  1. 哈希计算:键的槽位计算公式为 CRC16(key) % 16384
  2. 槽位分配:集群中的每个主节点负责一部分连续的哈希槽
  3. 数据存储:键根据计算出的槽位存储到对应的节点
数据迁移机制

Redis Cluster 支持在线数据迁移,用于节点扩容、缩容或负载均衡,迁移过程不影响集群可用性。

  1. 迁移触发方式

    • 手动触发:通过CLUSTER MIGRATE命令迁移单个槽
    • 自动触发:通过redis-cli --cluster reshard命令自动重新分片
  2. 迁移流程

    • 源节点标记槽为 "migrating" 状态
    • 目标节点标记槽为 "importing" 状态
    • 源节点将槽中的数据逐把迁移到目标节点
    • 迁移完成后,更新集群槽位映射表
    • 源节点和目标节点分别清除 "migrating" 和 "importing" 状态
  3. 迁移特点

    • 增量迁移:每次迁移一个键,避免大槽迁移阻塞
    • 原子性:单个键的迁移是原子的,不会出现数据不一致
    • 在线迁移:迁移过程中,槽仍可正常读写

数据迁移示例(Java 代码)
/*** Redis Cluster数据迁移工具类* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Cluster数据迁移")
public class RedisClusterMigrationService {private final RedisTemplate<String, Object> redisTemplate;/*** 迁移单个哈希槽* @param slot 槽位编号* @param targetNode 目标节点(格式:ip:port)* @return 迁移是否成功*/@ApiOperation("迁移单个哈希槽")public boolean migrateSlot(int slot, String targetNode) {if (slot < 0 || slot >= 16384) {log.error("无效的槽位编号:{}", slot);throw new IllegalArgumentException("槽位编号必须在0-16383之间");}if (!StringUtils.hasText(targetNode) || !targetNode.contains(":")) {log.error("无效的目标节点格式:{}", targetNode);throw new IllegalArgumentException("目标节点格式必须为ip:port");}String[] nodeInfo = targetNode.split(":");String targetHost = nodeInfo[0];int targetPort = Integer.parseInt(nodeInfo[1]);try {// 1. 获取当前槽位的源节点String sourceNode = getSlotNode(slot);if (sourceNode.equals(targetNode)) {log.info("槽位{}已在目标节点{}上,无需迁移", slot, targetNode);return true;}log.info("开始迁移槽位{},从{}到{}", slot, sourceNode, targetNode);// 2. 执行槽位迁移命令String migrateCmd = String.format("CLUSTER SETSLOT %d MIGRATING %s",slot, targetNode);executeClusterCommand(migrateCmd);// 3. 目标节点准备接收槽位String importCmd = String.format("CLUSTER SETSLOT %d IMPORTING %s",slot, sourceNode);executeClusterCommandOnNode(importCmd, targetHost, targetPort);// 4. 迁移槽位中的所有键migrateSlotKeys(slot, targetHost, targetPort);// 5. 更新集群槽位映射String setSlotCmd = String.format("CLUSTER SETSLOT %d NODE %s",slot, getNodeId(targetHost, targetPort));executeClusterCommand(setSlotCmd);log.info("槽位{}迁移完成", slot);return true;} catch (Exception e) {log.error("槽位{}迁移失败", slot, e);return false;}}/*** 获取槽位当前所在节点* @param slot 槽位编号* @return 节点信息(ip:port)*/private String getSlotNode(int slot) {return redisTemplate.execute((RedisCallback<String>) connection -> {ClusterSlotInformation slotInfo = connection.clusterGetSlotInformation(slot);return slotInfo.getMaster().getHost() + ":" + slotInfo.getMaster().getPort();});}/*** 迁移槽位中的所有键* @param slot 槽位编号* @param targetHost 目标节点IP* @param targetPort 目标节点端口*/private void migrateSlotKeys(int slot, String targetHost, int targetPort) {redisTemplate.execute((RedisCallback<Void>) connection -> {// 迭代获取槽位中的所有键ScanOptions options = ScanOptions.scanOptions().count(100).build();Cursor<byte[]> cursor = connection.scan(options);while (cursor.hasNext()) {byte[] keyBytes = cursor.next();String key = new String(keyBytes, StandardCharsets.UTF_8);// 验证键所属的槽位int keySlot = CRC16.crc16(key.getBytes()) % 16384;if (keySlot != slot) {continue;}// 迁移单个键到目标节点connection.migrate(targetHost,targetPort,keyBytes,0,Duration.ofSeconds(5),true,true);log.debug("已迁移键:{}到节点{}:{}", key, targetHost, targetPort);}return null;});}/*** 获取节点ID* @param host 节点IP* @param port 节点端口* @return 节点ID*/private String getNodeId(String host, int port) {return redisTemplate.execute((RedisCallback<String>) connection -> {Collection<ClusterNodeInformation> nodes = connection.clusterGetNodes();for (ClusterNodeInformation node : nodes) {if (node.getHost().equals(host) && node.getPort() == port) {return node.getId();}}throw new RuntimeException("节点" + host + ":" + port + "不存在");});}/*** 执行集群命令* @param command 命令内容* @return 命令执行结果*/private String executeClusterCommand(String command) {return redisTemplate.execute((RedisCallback<String>) connection -> {byte[][] commandBytes = parseCommand(command);return new String(connection.execute(commandBytes), StandardCharsets.UTF_8);});}/*** 在指定节点执行集群命令* @param command 命令内容* @param host 节点IP* @param port 节点端口* @return 命令执行结果*/private String executeClusterCommandOnNode(String command, String host, int port) {return redisTemplate.execute((RedisCallback<String>) connection -> {byte[][] commandBytes = parseCommand(command);return new String(connection.executeOnNode(commandBytes, new RedisNode(host, port)),StandardCharsets.UTF_8);});}/*** 解析命令字符串为字节数组* @param command 命令字符串* @return 命令字节数组*/private byte[][] parseCommand(String command) {String[] parts = command.split(" ");byte[][] result = new byte[parts.length][];for (int i = 0; i < parts.length; i++) {result[i] = parts[i].getBytes(StandardCharsets.UTF_8);}return result;}
}

25. Redis Cluster 如何处理节点故障?

Redis Cluster 具备自动故障检测和恢复能力,确保集群在节点故障时仍能正常运行。

故障检测机制
  1. 节点心跳检测:集群中的每个节点定期向其他节点发送 PING 消息
  2. 故障判断:如果一个节点在 cluster-node-timeout 时间内未响应,被标记为疑似下线(PFAIL)
  3. 故障确认:当超过半数主节点认为该节点疑似下线时,标记为确定下线(FAIL)
故障恢复流程
  1. 主节点故障恢复

    • 从节点检测到主节点下线
    • 从节点触发故障转移选举
    • 选出一个从节点晋升为主节点
    • 新主节点接管原主节点的哈希槽
    • 集群更新槽位映射表,通知所有节点
  2. 从节点故障恢复

    • 从节点故障不影响集群可用性
    • 仅需确保主节点有其他可用从节点
    • 故障从节点恢复后,自动重新加入集群并同步数据

故障恢复配置优化

redis.conf

# 节点超时时间(默认15000毫秒)
cluster-node-timeout 15000# 主节点故障后,从节点选举超时时间
cluster-slave-validity-factor 10# 允许从节点优先选举的偏移量差距
cluster-slave-no-failover-on-down-after-milliseconds 10000# 禁止故障转移时全量覆盖配置
cluster-config-file-update-on-failover yes
故障检测示例代码
/*** Redis Cluster故障检测服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Cluster故障检测")
public class RedisClusterMonitorService {private final RedisTemplate<String, Object> redisTemplate;private ScheduledExecutorService executor;/*** 启动集群监控* @param interval 监控间隔(秒)*/@ApiOperation("启动集群监控")public void startMonitor(int interval) {if (interval <= 0) {interval = 10; // 默认10秒}executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "redis-cluster-monitor"));executor.scheduleAtFixedRate(this::checkClusterHealth,0,interval,TimeUnit.SECONDS);log.info("Redis Cluster监控已启动,监控间隔:{}秒", interval);}/*** 停止集群监控*/@ApiOperation("停止集群监控")public void stopMonitor() {if (executor != null && !executor.isShutdown()) {executor.shutdown();log.info("Redis Cluster监控已停止");}}/*** 检查集群健康状态*/private void checkClusterHealth() {try {// 获取集群信息ClusterInfo clusterInfo = redisTemplate.execute(RedisCallback<ClusterInfo> :: connection -> connection.clusterGetClusterInfo());// 检查集群状态if (!"ok".equals(clusterInfo.getStatus())) {log.error("Redis Cluster状态异常:{}", clusterInfo.getStatus());sendAlert("Redis Cluster状态异常: " + clusterInfo.getStatus());return;}// 检查节点状态Collection<ClusterNodeInformation> nodes = redisTemplate.execute(RedisCallback<Collection<ClusterNodeInformation>> :: connection -> connection.clusterGetNodes());for (ClusterNodeInformation node : nodes) {// 检查节点是否下线if (node.getNodeStatus() == ClusterNodeInformation.NodeStatus.FAIL ||node.getNodeStatus() == ClusterNodeInformation.NodeStatus.PFAIL) {String alertMsg = String.format("节点%s:%d状态异常,状态:%s",node.getHost(), node.getPort(), node.getNodeStatus());log.error(alertMsg);sendAlert(alertMsg);}// 检查主节点的从节点数量if (node.getNodeRole() == ClusterNodeInformation.NodeRole.MASTER) {long slaveCount = nodes.stream().filter(n -> n.getNodeRole() == ClusterNodeInformation.NodeRole.SLAVE).filter(n -> n.getMasterId().equals(node.getId())).count();if (slaveCount < 1) {String alertMsg = String.format("主节点%s:%d没有从节点,存在单点故障风险",node.getHost(), node.getPort());log.warn(alertMsg);sendAlert(alertMsg);}}}// 检查哈希槽分配checkSlotAllocation();} catch (Exception e) {log.error("集群健康检查异常", e);sendAlert("Redis Cluster监控异常: " + e.getMessage());}}/*** 检查哈希槽分配情况*/private void checkSlotAllocation() {redisTemplate.execute((RedisCallback<Void>) connection -> {// 检查是否有未分配的槽位int unassignedSlots = 0;for (int slot = 0; slot < 16384; slot++) {ClusterSlotInformation slotInfo = connection.clusterGetSlotInformation(slot);if (slotInfo == null || slotInfo.getMaster() == null) {unassignedSlots++;}}if (unassignedSlots > 0) {String alertMsg = String.format("Redis Cluster有%d个哈希槽未分配,集群不可用",unassignedSlots);log.error(alertMsg);sendAlert(alertMsg);}return null;});}/*** 发送告警通知* @param message 告警信息*/private void sendAlert(String message) {// 实际项目中可集成短信、邮件、钉钉等告警方式log.error("【Redis Cluster告警】{}", message);// TODO: 集成告警渠道}
}

26. Redis Cluster 如何实现读写分离?

Redis Cluster 的读写分离是指将写操作路由到主节点,读操作分散到从节点,以提高集群的读性能和吞吐量。

读写分离实现原理
  1. 节点角色分工

    • 主节点:负责处理写操作和读操作
    • 从节点:仅负责处理读操作,数据从主节点同步
  2. 路由策略

    • 写操作:必须路由到键所在槽位的主节点
    • 读操作:可路由到主节点或其任意从节点
  3. 一致性保证

    • 同步策略:从节点通过复制机制同步主节点数据
    • 数据延迟:从节点数据可能存在短暂延迟,需业务容忍
读写分离配置实现
  1. 客户端配置

    • 配置读写分离规则,读请求优先路由到从节点
    • 支持故障转移,从节点不可用时自动切换到主节点
  2. 集群配置

    • 确保每个主节点有足够的从节点
    • 配置合理的复制同步策略
/*** Redis Cluster读写分离配置* @author ken*/
@Configuration
@Api(tags = "Redis Cluster读写分离配置")
public class RedisClusterReadWriteConfig {/*** 配置主节点RedisTemplate(写操作)* @return 主节点RedisTemplate*/@Bean("masterRedisTemplate")public RedisTemplate<String, Object> masterRedisTemplate() {RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();// 添加所有集群节点List<RedisNode> clusterNodes = Arrays.asList(new RedisNode("192.168.1.100", 6379),new RedisNode("192.168.1.101", 6379),new RedisNode("192.168.1.102", 6379),new RedisNode("192.168.1.103", 6380),new RedisNode("192.168.1.104", 6380),new RedisNode("192.168.1.105", 6380));clusterConfig.setClusterNodes(clusterNodes);clusterConfig.setMaxRedirects(3);clusterConfig.setPassword("password");// 配置仅连接主节点LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().readFrom(ReadFrom.MASTER) // 仅从主节点读取.commandTimeout(Duration.ofSeconds(2)).build();LettuceConnectionFactory factory = new LettuceConnectionFactory(clusterConfig, clientConfig);factory.afterPropertiesSet();return createRedisTemplate(factory);}/*** 配置从节点RedisTemplate(读操作)* @return 从节点RedisTemplate*/@Bean("slaveRedisTemplate")public RedisTemplate<String, Object> slaveRedisTemplate() {RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();List<RedisNode> clusterNodes = Arrays.asList(new RedisNode("192.168.1.100", 6379),new RedisNode("192.168.1.101", 6379),new RedisNode("192.168.1.102", 6379),new RedisNode("192.168.1.103", 6380),new RedisNode("192.168.1.104", 6380),new RedisNode("192.168.1.105", 6380));clusterConfig.setClusterNodes(clusterNodes);clusterConfig.setMaxRedirects(3);clusterConfig.setPassword("password");// 配置优先从从节点读取,从节点不可用时切换到主节点LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().readFrom(ReadFrom.REPLICA_PREFERRED) // 优先从从节点读取.commandTimeout(Duration.ofSeconds(2)).build();LettuceConnectionFactory factory = new LettuceConnectionFactory(clusterConfig, clientConfig);factory.afterPropertiesSet();return createRedisTemplate(factory);}/*** 创建RedisTemplate实例* @param factory 连接工厂* @return RedisTemplate*/private RedisTemplate<String, Object> createRedisTemplate(LettuceConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 设置序列化器StringRedisSerializer keySerializer = new StringRedisSerializer();GenericJackson2JsonRedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();template.setKeySerializer(keySerializer);template.setValueSerializer(valueSerializer);template.setHashKeySerializer(keySerializer);template.setHashValueSerializer(valueSerializer);template.afterPropertiesSet();return template;}
}
读写分离服务实现
/*** Redis Cluster读写分离服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Cluster读写分离服务")
public class RedisReadWriteService {@Qualifier("masterRedisTemplate")private final RedisTemplate<String, Object> masterRedisTemplate;@Qualifier("slaveRedisTemplate")private final RedisTemplate<String, Object> slaveRedisTemplate;/*** 写操作(使用主节点)* @param key 键* @param value 值*/@ApiOperation("写操作(主节点)")public void set(String key, Object value) {if (!StringUtils.hasText(key) || ObjectUtils.isEmpty(value)) {throw new IllegalArgumentException("key和value不能为空");}masterRedisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);log.info("写操作完成,key:{}", key);}/*** 读操作(使用从节点,优先)* @param key 键* @return 值*/@ApiOperation("读操作(从节点优先)")public Object get(String key) {if (!StringUtils.hasText(key)) {throw new IllegalArgumentException("key不能为空");}try {// 优先从从节点读取Object value = slaveRedisTemplate.opsForValue().get(key);log.info("从从节点读取数据,key:{}, value:{}", key, value);return value;} catch (Exception e) {log.error("从从节点读取数据失败,切换到主节点", e);// 从节点读取失败,切换到主节点Object value = masterRedisTemplate.opsForValue().get(key);log.info("从主节点读取数据,key:{}, value:{}", key, value);return value;}}/*** 批量写操作* @param keyValueMap 键值对集合*/@ApiOperation("批量写操作(主节点)")public void batchSet(Map<String, Object> keyValueMap) {if (CollectionUtils.isEmpty(keyValueMap)) {throw new IllegalArgumentException("键值对集合不能为空");}masterRedisTemplate.executePipelined((RedisCallback<Void>) connection -> {for (Map.Entry<String, Object> entry : keyValueMap.entrySet()) {connection.set(entry.getKey().getBytes(),JSON.toJSONBytes(entry.getValue()));connection.expire(entry.getKey().getBytes(), 30 * 60);}return null;});log.info("批量写操作完成,共{}个键", keyValueMap.size());}/*** 批量读操作* @param keys 键列表* @return 键值对集合*/@ApiOperation("批量读操作(从节点优先)")public Map<String, Object> batchGet(List<String> keys) {if (CollectionUtils.isEmpty(keys)) {throw new IllegalArgumentException("键列表不能为空");}Map<String, Object> result = Maps.newHashMap();try {// 从从节点批量读取List<Object> values = slaveRedisTemplate.opsForValue().multiGet(keys);for (int i = 0; i < keys.size(); i++) {result.put(keys.get(i), values.get(i));}log.info("从从节点批量读取完成,共{}个键", keys.size());} catch (Exception e) {log.error("从从节点批量读取失败,切换到主节点", e);// 切换到主节点List<Object> values = masterRedisTemplate.opsForValue().multiGet(keys);for (int i = 0; i < keys.size(); i++) {result.put(keys.get(i), values.get(i));}log.info("从主节点批量读取完成,共{}个键", keys.size());}return result;}
}

十、Redis 实战场景

27. 如何使用 Redis 实现分布式限流?

分布式限流是指在分布式系统中,对接口或资源的访问频率进行限制,防止系统被过度请求压垮。Redis 凭借其高性能和原子性操作,是实现分布式限流的理想选择。

常见的限流算法
  1. 固定窗口计数器:在固定时间窗口内限制请求数量
  2. 滑动窗口计数器:将时间窗口划分为更小的区间,平滑限流效果
  3. 漏桶算法:控制请求处理速度,平滑突发流量
  4. 令牌桶算法:允许一定程度的突发流量,灵活性更高
Redis 实现固定窗口计数器
/*** Redis分布式限流服务(固定窗口计数器)* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis分布式限流")
public class RedisRateLimiterService {private final RedisTemplate<String, Object> redisTemplate;/*** 固定窗口计数器限流* @param key 限流键(如接口名、用户ID等)* @param maxCount 窗口内最大请求数* @param windowSeconds 窗口大小(秒)* @return 是否允许访问*/@ApiOperation("固定窗口计数器限流")public boolean fixedWindowLimit(String key, int maxCount, int windowSeconds) {if (!StringUtils.hasText(key) || maxCount <= 0 || windowSeconds <= 0) {throw new IllegalArgumentException("参数非法");}String redisKey = "rate_limit:fixed:" + key;// Lua脚本实现原子性操作String luaScript = "local current = redis.call('incr', KEYS[1]) " +"if current == 1 then " +"    redis.call('expire', KEYS[1], ARGV[2]) " +"end " +"if current > tonumber(ARGV[1]) then " +"    return 0 " +"else " +"    return 1 " +"end";DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);Long result = redisTemplate.execute(script,Collections.singletonList(redisKey),String.valueOf(maxCount),String.valueOf(windowSeconds));boolean allowed = Boolean.TRUE.equals(result != null && result == 1);log.info("固定窗口限流,key:{}, 允许访问:{}", key, allowed);return allowed;}/*** 滑动窗口计数器限流* @param key 限流键* @param maxCount 窗口内最大请求数* @param windowSeconds 窗口大小(秒)* @param segmentSeconds 时间片大小(秒)* @return 是否允许访问*/@ApiOperation("滑动窗口计数器限流")public boolean slidingWindowLimit(String key, int maxCount, int windowSeconds, int segmentSeconds) {if (!StringUtils.hasText(key) || maxCount <= 0 || windowSeconds <= 0 || segmentSeconds <= 0) {throw new IllegalArgumentException("参数非法");}if (segmentSeconds >= windowSeconds) {throw new IllegalArgumentException("时间片大小必须小于窗口大小");}String redisKey = "rate_limit:sliding:" + key;long currentTime = System.currentTimeMillis() / 1000;long windowStart = currentTime - windowSeconds;// Lua脚本实现滑动窗口限流String luaScript = "local currentTime = tonumber(ARGV[1]) " +"local windowStart = tonumber(ARGV[2]) " +"local maxCount = tonumber(ARGV[3]) " +"local segmentSeconds = tonumber(ARGV[4]) " +-- 移除窗口外的时间片"redis.call('zremrangebyscore', KEYS[1], 0, windowStart) " +-- 统计当前窗口内的请求数"local currentCount = redis.call('zcard', KEYS[1]) " +"if currentCount >= maxCount then " +"    return 0 " +"end " +-- 添加当前请求的时间片"local segment = math.floor(currentTime / segmentSeconds) * segmentSeconds " +"redis.call('zadd', KEYS[1], currentTime, segment .. ':' .. redis.call('incr', KEYS[1] .. ':seq')) " +-- 设置过期时间"redis.call('expire', KEYS[1], ARGV[5]) " +"return 1";DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);Long result = redisTemplate.execute(script,Collections.singletonList(redisKey),String.valueOf(currentTime),String.valueOf(windowStart),String.valueOf(maxCount),String.valueOf(segmentSeconds),String.valueOf(windowSeconds + segmentSeconds));boolean allowed = Boolean.TRUE.equals(result != null && result == 1);log.info("滑动窗口限流,key:{}, 允许访问:{}", key, allowed);return allowed;}/*** 令牌桶算法限流* @param key 限流键* @param capacity 令牌桶容量* @param rate 令牌生成速率(个/秒)* @return 是否允许访问*/@ApiOperation("令牌桶算法限流")public boolean tokenBucketLimit(String key, int capacity, double rate) {if (!StringUtils.hasText(key) || capacity <= 0 || rate <= 0) {throw new IllegalArgumentException("参数非法");}String redisKey = "rate_limit:token:" + key;long currentTime = System.currentTimeMillis();// Lua脚本实现令牌桶限流String luaScript = "local currentTime = tonumber(ARGV[1]) " +"local capacity = tonumber(ARGV[2]) " +"local rate = tonumber(ARGV[3]) " +-- 获取当前桶内令牌数和最后更新时间"local bucket = redis.call('hmget', KEYS[1], 'tokens', 'last_time') " +"local tokens = tonumber(bucket[1]) or capacity " +"local lastTime = tonumber(bucket[2]) or currentTime " +-- 计算令牌补充数量"local elapsed = currentTime - lastTime " +"local addTokens = math.floor(elapsed * rate) " +"tokens = math.min(tokens + addTokens, capacity) " +-- 尝试获取令牌"if tokens >= 1 then " +"    tokens = tokens - 1 " +"    redis.call('hmset', KEYS[1], 'tokens', tokens, 'last_time', currentTime) " +"    redis.call('expire', KEYS[1], 3600) " +"    return 1 " +"else " +"    return 0 " +"end";DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);Long result = redisTemplate.execute(script,Collections.singletonList(redisKey),String.valueOf(currentTime),String.valueOf(capacity),String.valueOf(rate));boolean allowed = Boolean.TRUE.equals(result != null && result == 1);log.info("令牌桶限流,key:{}, 允许访问:{}", key, allowed);return allowed;}
}
限流注解实现
/*** 分布式限流注解* @author ken*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisRateLimit {/*** 限流键前缀*/String prefix() default "rate_limit";/*** 限流键(支持EL表达式)*/String key() default "";/*** 限流类型*/LimitType type() default LimitType.FIXED_WINDOW;/*** 最大请求数*/int maxCount();/*** 窗口大小(秒)*/int windowSeconds() default 60;/*** 时间片大小(仅滑动窗口使用)*/int segmentSeconds() default 10;/*** 令牌桶容量(仅令牌桶使用)*/int capacity() default 100;/*** 令牌生成速率(个/秒,仅令牌桶使用)*/double rate() default 10.0;/*** 限流提示信息*/String message() default "请求过于频繁,请稍后再试";/*** 限流类型枚举*/enum LimitType {FIXED_WINDOW,    // 固定窗口SLIDING_WINDOW,  // 滑动窗口TOKEN_BUCKET     // 令牌桶}
}
限流切面实现
/*** 分布式限流切面* @author ken*/
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisRateLimitAspect {private final RedisRateLimiterService rateLimiterService;private final ExpressionParser expressionParser = new SpelExpressionParser();/*** 切入点*/@Pointcut("@annotation(com.example.redis.annotation.RedisRateLimit)")public void rateLimitPointcut() {}/*** 环绕通知*/@Around("rateLimitPointcut() && @annotation(rateLimit)")public Object around(ProceedingJoinPoint joinPoint, RedisRateLimit rateLimit) throws Throwable {// 解析限流键String key = resolveKey(joinPoint, rateLimit);String fullKey = rateLimit.prefix() + ":" + key;// 执行限流检查boolean allowed = false;switch (rateLimit.type()) {case FIXED_WINDOW:allowed = rateLimiterService.fixedWindowLimit(fullKey,rateLimit.maxCount(),rateLimit.windowSeconds());break;case SLIDING_WINDOW:allowed = rateLimiterService.slidingWindowLimit(fullKey,rateLimit.maxCount(),rateLimit.windowSeconds(),rateLimit.segmentSeconds());break;case TOKEN_BUCKET:allowed = rateLimiterService.tokenBucketLimit(fullKey,rateLimit.capacity(),rateLimit.rate());break;}// 限流处理if (!allowed) {log.warn("接口限流触发,key:{}", fullKey);throw new BusinessException(429, rateLimit.message());}// 执行目标方法return joinPoint.proceed();}/*** 解析限流键(支持EL表达式)*/private String resolveKey(ProceedingJoinPoint joinPoint, RedisRateLimit rateLimit) {String key = rateLimit.key();if (!StringUtils.hasText(key)) {// 默认使用类名+方法名MethodSignature signature = (MethodSignature) joinPoint.getSignature();return signature.getDeclaringTypeName() + "." + signature.getName();}// 解析EL表达式MethodSignature signature = (MethodSignature) joinPoint.getSignature();EvaluationContext context = new MethodBasedEvaluationContext(joinPoint.getTarget(),signature.getMethod(),joinPoint.getArgs(),new DefaultParameterNameDiscoverer());return expressionParser.parseExpression(key).getValue(context, String.class);}
}

28. 如何使用 Redis 实现延迟队列?

延迟队列是指消息发送后,不立即被消费,而是在指定的延迟时间后才被消费。Redis 可以通过多种方式实现延迟队列,适用于订单超时取消、定时任务等场景。

Redis 实现延迟队列的几种方式
  1. Sorted Set 方式:利用分数存储时间戳,实现有序性
  2. List + 过期事件方式:结合 List 和 Redis 过期事件通知
  3. Stream 方式:Redis 5.0 + 新增的 Stream 数据结构,支持消费者组和消息确认
Sorted Set 实现延迟队列
/*** Redis延迟队列服务(Sorted Set实现)* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis延迟队列")
public class RedisDelayQueueService {private final RedisTemplate<String, Object> redisTemplate;private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "delay-queue-consumer"));// 延迟队列前缀private static final String QUEUE_PREFIX = "delay_queue:";/*** 启动延迟队列消费者* @param queueName 队列名称* @param interval 轮询间隔(毫秒)*/@PostConstructpublic void startConsumer() {// 启动消费者,轮询间隔100毫秒executor.scheduleAtFixedRate(this::consumeMessage,0,100,TimeUnit.MILLISECONDS);log.info("Redis延迟队列消费者已启动");}/*** 发送延迟消息* @param queueName 队列名称* @param message 消息内容* @param delayMillis 延迟时间(毫秒)* @return 消息ID*/@ApiOperation("发送延迟消息")public String sendDelayMessage(String queueName, Object message, long delayMillis) {if (!StringUtils.hasText(queueName) || ObjectUtils.isEmpty(message) || delayMillis <= 0) {throw new IllegalArgumentException("参数非法");}String queueKey = QUEUE_PREFIX + queueName;String messageId = UUID.randomUUID().toString();long executeTime = System.currentTimeMillis() + delayMillis;// 消息内容序列化String messageJson = JSON.toJSONString(message);// 写入Sorted Set,分数为执行时间戳ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();zSetOps.add(queueKey, messageJson, executeTime);log.info("延迟消息发送成功,queue:{}, messageId:{}, executeTime:{}",queueName, messageId, new Date(executeTime));return messageId;}/*** 消费延迟消息*/private void consumeMessage() {// 获取所有队列Set<String> queueKeys = redisTemplate.keys(QUEUE_PREFIX + "*");if (CollectionUtils.isEmpty(queueKeys)) {return;}long currentTime = System.currentTimeMillis();for (String queueKey : queueKeys) {String queueName = queueKey.substring(QUEUE_PREFIX.length());try {// 批量获取已到期的消息(分数 <= 当前时间戳)ZSetOperations<String, Object> zSetOps = redisTemplate.opsForZSet();Set<ZSetOperations.TypedTuple<Object>> messages = zSetOps.rangeByScoreWithScores(queueKey,0,currentTime,0,10 // 每次最多消费10条消息);if (CollectionUtils.isEmpty(messages)) {continue;}// 消费消息for (ZSetOperations.TypedTuple<Object> message : messages) {String messageJson = (String) message.getValue();double score = message.getScore();try {// 处理消息handleMessage(queueName, messageJson);// 消息处理成功后删除zSetOps.remove(queueKey, messageJson);log.info("延迟消息消费成功,queue:{}, executeTime:{}",queueName, new Date((long) score));} catch (Exception e) {log.error("延迟消息消费失败,queue:{}, message:{}", queueName, messageJson, e);// 消息处理失败,可根据业务需求重试或转移到死信队列handleFailedMessage(queueName, messageJson, e);}}} catch (Exception e) {log.error("消费延迟消息异常,queue:{}", queueName, e);}}}/*** 处理延迟消息* @param queueName 队列名称* @param messageJson 消息内容(JSON格式)*/private void handleMessage(String queueName, String messageJson) {// 根据队列名称路由到对应的消息处理器switch (queueName) {case "order_timeout":handleOrderTimeoutMessage(messageJson);break;case "task_schedule":handleTaskScheduleMessage(messageJson);break;default:log.warn("未知的队列名称:{}", queueName);}}/*** 处理订单超时消息*/private void handleOrderTimeoutMessage(String messageJson) {OrderTimeoutMessage message = JSON.parseObject(messageJson, OrderTimeoutMessage.class);log.info("处理订单超时消息,orderId:{}, userId:{}", message.getOrderId(), message.getUserId());// 实际业务逻辑:取消订单、恢复库存等// orderService.cancelOrder(message.getOrderId());}/*** 处理定时任务消息*/private void handleTaskScheduleMessage(String messageJson) {ScheduleMessage message = JSON.parseObject(messageJson, ScheduleMessage.class);log.info("处理定时任务消息,taskId:{}, taskName:{}", message.getTaskId(), message.getTaskName());// 实际业务逻辑:执行定时任务// taskService.executeTask(message.getTaskId());}/*** 处理消费失败的消息*/private void handleFailedMessage(String queueName, String messageJson, Exception e) {// 简单实现:转移到死信队列String deadLetterKey = "dead_letter_queue:" + queueName;redisTemplate.opsForList().rightPush(deadLetterKey, messageJson);redisTemplate.expire(deadLetterKey, 24, TimeUnit.HOURS);}/*** 停止消费者*/@PreDestroypublic void stopConsumer() {executor.shutdown();try {if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();}log.info("Redis延迟队列消费者已停止");}/*** 订单超时消息实体*/@Datapublic static class OrderTimeoutMessage {private String orderId;private String userId;private BigDecimal amount;}/*** 定时任务消息实体*/@Datapublic static class ScheduleMessage {private String taskId;private String taskName;private Map<String, Object> params;}
}
Stream 实现延迟队列(Redis 5.0+)
/*** Redis延迟队列服务(Stream实现,Redis 5.0+)* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis Stream延迟队列")
public class RedisStreamDelayQueueService {private final RedisTemplate<String, Object> redisTemplate;private static final String STREAM_PREFIX = "stream_delay_queue:";private static final String CONSUMER_GROUP = "delay_group";/*** 初始化消费者组* @param queueName 队列名称*/@ApiOperation("初始化延迟队列消费者组")public void initConsumerGroup(String queueName) {if (!StringUtils.hasText(queueName)) {throw new IllegalArgumentException("队列名称不能为空");}String streamKey = STREAM_PREFIX + queueName;try {// 创建消费者组,不存在则创建redisTemplate.opsForStream().createGroup(streamKey,CONSUMER_GROUP,ReadOffset.latest());log.info("消费者组创建成功,queue:{}, group:{}", queueName, CONSUMER_GROUP);} catch (Exception e) {// 忽略消费者组已存在的异常if (!e.getMessage().contains("BUSYGROUP")) {log.error("创建消费者组失败", e);throw e;}log.info("消费者组已存在,queue:{}, group:{}", queueName, CONSUMER_GROUP);}}/*** 发送延迟消息* @param queueName 队列名称* @param message 消息内容* @param delayMillis 延迟时间(毫秒)* @return 消息ID*/@ApiOperation("发送Stream延迟消息")public String sendStreamDelayMessage(String queueName, Object message, long delayMillis) {if (!StringUtils.hasText(queueName) || ObjectUtils.isEmpty(message) || delayMillis <= 0) {throw new IllegalArgumentException("参数非法");}String streamKey = STREAM_PREFIX + queueName;long executeTime = System.currentTimeMillis() + delayMillis;// 构建消息体,包含执行时间和消息内容Map<String, Object> messageMap = Maps.newHashMap();messageMap.put("execute_time", executeTime);messageMap.put("content", JSON.toJSONString(message));messageMap.put("create_time", System.currentTimeMillis());// 发送消息到StreamRecordId recordId = redisTemplate.opsForStream().add(StreamRecords.newRecord().ofMap(messageMap).withStreamKey(streamKey));String messageId = recordId.getValue();log.info("Stream延迟消息发送成功,queue:{}, messageId:{}, executeTime:{}",queueName, messageId, new Date(executeTime));return messageId;}/*** 消费Stream延迟消息* @param queueName 队列名称* @param consumerName 消费者名称*/@ApiOperation("消费Stream延迟消息")public void consumeStreamMessage(String queueName, String consumerName) {if (!StringUtils.hasText(queueName) || !StringUtils.hasText(consumerName)) {throw new IllegalArgumentException("队列名称和消费者名称不能为空");}// 初始化消费者组initConsumerGroup(queueName);String streamKey = STREAM_PREFIX + queueName;// 启动消费者线程new Thread(() -> {while (!Thread.currentThread().isInterrupted()) {try {// 读取消息,每次最多读取10条,阻塞时间1秒StreamRecords<String, Object> records = redisTemplate.opsForStream().read(Consumer.from(CONSUMER_GROUP, consumerName),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));if (CollectionUtils.isEmpty(records)) {continue;}// 处理消息for (MapRecord<String, String, Object> record : records) {handleStreamMessage(queueName, record);}} catch (Exception e) {log.error("消费Stream消息异常", e);try {Thread.sleep(1000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}, "stream-consumer-" + queueName).start();log.info("Stream延迟队列消费者已启动,queue:{}, consumer:{}", queueName, consumerName);}/*** 处理Stream延迟消息*/private void handleStreamMessage(String queueName, MapRecord<String, String, Object> record) {String streamKey = record.getStream();String messageId = record.getId();Map<String, Object> messageMap = record.getValue();try {// 获取执行时间和消息内容long executeTime = Long.parseLong(messageMap.get("execute_time").toString());String content = messageMap.get("content").toString();long currentTime = System.currentTimeMillis();if (currentTime < executeTime) {// 消息未到期,重新加入队列,等待下次消费redisTemplate.opsForStream().acknowledge(streamKey, CONSUMER_GROUP, messageId);// 重新发送消息(延迟时间为剩余时间)sendStreamDelayMessage(queueName, JSON.parseObject(content), executeTime - currentTime);log.info("消息未到期,重新加入队列,messageId:{}, remainingTime:{}ms",messageId, executeTime - currentTime);return;}// 消息已到期,执行消费逻辑log.info("Stream延迟消息消费,queue:{}, messageId:{}, content:{}",queueName, messageId, content);// 实际业务处理processStreamMessage(queueName, content);// 消息处理成功,确认消费redisTemplate.opsForStream().acknowledge(streamKey, CONSUMER_GROUP, messageId);log.info("Stream延迟消息消费成功,messageId:{}", messageId);// 删除已消费的消息(可选,根据业务需求)redisTemplate.opsForStream().delete(streamKey, messageId);} catch (Exception e) {log.error("处理Stream消息失败,messageId:{}", messageId, e);// 处理失败,根据重试策略处理handleStreamFailedMessage(streamKey, messageId, messageMap);}}/*** 处理Stream消息内容*/private void processStreamMessage(String queueName, String content) {// 根据队列名称处理不同类型的消息switch (queueName) {case "order_timeout":OrderTimeoutMessage orderMessage = JSON.parseObject(content, OrderTimeoutMessage.class);// orderService.cancelOrder(orderMessage.getOrderId());break;case "task_schedule":ScheduleMessage taskMessage = JSON.parseObject(content, ScheduleMessage.class);// taskService.executeTask(taskMessage.getTaskId());break;}}/*** 处理消费失败的Stream消息*/private void handleStreamFailedMessage(String streamKey, String messageId, Map<String, Object> messageMap) {// 获取消息重试次数int retryCount = messageMap.containsKey("retry_count")? Integer.parseInt(messageMap.get("retry_count").toString()) + 1: 1;if (retryCount > 3) {// 超过最大重试次数,转移到死信队列String deadLetterKey = "stream_dead_letter:" + streamKey;messageMap.put("retry_count", retryCount);redisTemplate.opsForStream().add(StreamRecords.newRecord().ofMap(messageMap).withStreamKey(deadLetterKey));// 确认消费,避免重复处理redisTemplate.opsForStream().acknowledge(streamKey, CONSUMER_GROUP, messageId);log.info("消息超过最大重试次数,转移到死信队列,messageId:{}", messageId);return;}// 未超过重试次数,延迟重试messageMap.put("retry_count", retryCount);long delayMillis = (long) (Math.pow(2, retryCount) * 1000); // 指数退避策略redisTemplate.opsForStream().add(StreamRecords.newRecord().ofMap(messageMap).withStreamKey(streamKey));// 确认消费原消息redisTemplate.opsForStream().acknowledge(streamKey, CONSUMER_GROUP, messageId);log.info("消息消费失败,进行重试,messageId:{}, retryCount:{}, delay:{}ms",messageId, retryCount, delayMillis);}
}

29. 如何使用 Redis 实现分布式 Session?

分布式 Session 是指在分布式系统中,将用户 Session 数据存储在 Redis 中,实现多实例共享 Session,解决单点登录和 Session 共享问题。

分布式 Session 实现原理
  1. Session 存储:将用户 Session 数据存储在 Redis 中,以 SessionID 为键
  2. SessionID 传递:通过 Cookie 或 URL 参数传递 SessionID
  3. Session 管理:实现 Session 的创建、读取、更新和删除
  4. 过期处理:设置 Session 过期时间,自动清理过期 Session
Spring Boot 集成 Redis 实现分布式 Session
/*** Redis分布式Session配置* @author ken*/
@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 1800, // Session过期时间30分钟redisNamespace = "distributed:session", // Redis键前缀redisFlushMode = RedisFlushMode.ON_SAVE, // 保存时刷新到RediscleanupCron = "0 * * * * *" // 定时清理过期Session的Cron表达式
)
@Api(tags = "Redis分布式Session配置")
public class RedisSessionConfig {/*** 配置Redis连接工厂*/@Beanpublic LettuceConnectionFactory redisConnectionFactory() {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();config.setHostName("127.0.0.1");config.setPort(6379);config.setPassword("password");// 连接池配置GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(16);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(4);poolConfig.setMaxWait(Duration.ofSeconds(3));LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig).commandTimeout(Duration.ofSeconds(2)).build();return new LettuceConnectionFactory(config, clientConfig);}/*** 配置SessionRepository*/@Beanpublic RedisOperationsSessionRepository sessionRepository(RedisConnectionFactory connectionFactory) {RedisOperationsSessionRepository repository = new RedisOperationsSessionRepository(connectionFactory);// 设置Session序列化器(默认使用JdkSerializationRedisSerializer,建议替换为JSON序列化)repository.setDefaultSerializer(new GenericJackson2JsonRedisSerializer());// 设置Session过期时间repository.setDefaultMaxInactiveInterval(1800);return repository;}/*** 配置Session过滤器*/@Beanpublic FilterRegistrationBean<SessionRepositoryFilter<?>> sessionRepositoryFilterRegistration(SessionRepositoryFilter<?> filter) {FilterRegistrationBean<SessionRepositoryFilter<?>> registration = new FilterRegistrationBean<>(filter);registration.setOrder(SessionRepositoryFilter.DEFAULT_ORDER);registration.addUrlPatterns("/*");return registration;}
}
分布式 Session 操作服务
/*** 分布式Session操作服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "分布式Session操作")
public class DistributedSessionService {private final HttpSession session;private final RedisOperationsSessionRepository sessionRepository;/*** 设置Session属性* @param key 属性键* @param value 属性值*/@ApiOperation("设置Session属性")public void setAttribute(String key, Object value) {if (!StringUtils.hasText(key)) {throw new IllegalArgumentException("属性键不能为空");}session.setAttribute(key, value);log.info("设置Session属性,key:{}, sessionId:{}", key, session.getId());}/*** 获取Session属性* @param key 属性键* @return 属性值*/@ApiOperation("获取Session属性")public Object getAttribute(String key) {if (!StringUtils.hasText(key)) {return null;}Object value = session.getAttribute(key);log.info("获取Session属性,key:{}, sessionId:{}, value:{}", key, session.getId(), value);return value;}/*** 移除Session属性* @param key 属性键*/@ApiOperation("移除Session属性")public void removeAttribute(String key) {if (StringUtils.hasText(key)) {session.removeAttribute(key);log.info("移除Session属性,key:{}, sessionId:{}", key, session.getId());}}/*** 获取当前SessionID* @return SessionID*/@ApiOperation("获取当前SessionID")public String getSessionId() {String sessionId = session.getId();log.info("获取SessionID:{}", sessionId);return sessionId;}/***  invalidate当前Session*/@ApiOperation("销毁当前Session")public void invalidateSession() {String sessionId = session.getId();session.invalidate();log.info("销毁Session,sessionId:{}", sessionId);}/*** 根据SessionID获取Session信息* @param sessionId SessionID* @return Session信息*/@ApiOperation("根据SessionID获取Session信息")public Map<String, Object> getSessionBySessionId(String sessionId) {if (!StringUtils.hasText(sessionId)) {return Collections.emptyMap();}RedisSession session = (RedisSession) sessionRepository.findById(sessionId);if (session == null || session.isExpired()) {log.warn("Session不存在或已过期,sessionId:{}", sessionId);return Collections.emptyMap();}Map<String, Object> attributes = Maps.newHashMap();session.getAttributeNames().asIterator().forEachRemaining(key -> attributes.put(key, session.getAttribute(key)));// 添加Session基本信息attributes.put("sessionId", sessionId);attributes.put("creationTime", new Date(session.getCreationTime()));attributes.put("lastAccessedTime", new Date(session.getLastAccessedTime()));attributes.put("maxInactiveInterval", session.getMaxInactiveInterval());log.info("根据SessionID获取Session信息,sessionId:{}, attributes:{}", sessionId, attributes.size());return attributes;}/*** 延长Session过期时间* @param sessionId SessionID* @param seconds 延长时间(秒)* @return 是否成功*/@ApiOperation("延长Session过期时间")public boolean extendSessionExpiration(String sessionId, int seconds) {if (!StringUtils.hasText(sessionId) || seconds <= 0) {return false;}RedisSession session = (RedisSession) sessionRepository.findById(sessionId);if (session == null || session.isExpired()) {log.warn("Session不存在或已过期,无法延长有效期,sessionId:{}", sessionId);return false;}// 延长Session过期时间session.setMaxInactiveInterval(Duration.ofSeconds(seconds));sessionRepository.save(session);log.info("延长Session过期时间,sessionId:{}, 新有效期:{}秒", sessionId, seconds);return true;}/*** 获取在线用户数量* @return 在线用户数量*/@ApiOperation("获取在线用户数量")public long getOnlineUserCount() {// 扫描所有Session键String pattern = "distributed:session:sessions:*";Set<String> sessionKeys = redisTemplate.keys(pattern);return sessionKeys != null ? sessionKeys.size() : 0;}
}
分布式 Session 控制器示例
/*** 分布式Session控制器* @author ken*/
@RestController
@RequestMapping("/api/session")
@Api(tags = "分布式Session接口")
@RequiredArgsConstructor
public class SessionController {private final DistributedSessionService sessionService;/*** 用户登录,创建Session*/@PostMapping("/login")@ApiOperation("用户登录")public Result login(@RequestBody LoginRequest request) {// 实际项目中需验证用户名密码if (!"admin".equals(request.getUsername()) || !"123456".equals(request.getPassword())) {return Result.fail("用户名或密码错误");}// 登录成功,设置用户信息到SessionUserInfo userInfo = new UserInfo();userInfo.setUserId("10001");userInfo.setUsername(request.getUsername());userInfo.setNickname("管理员");userInfo.setRole("ADMIN");sessionService.setAttribute("USER_INFO", userInfo);return Result.success(Map.of("sessionId", sessionService.getSessionId(),"userInfo", userInfo));}/*** 获取当前登录用户信息*/@GetMapping("/currentUser")@ApiOperation("获取当前登录用户信息")public Result getCurrentUser() {UserInfo userInfo = (UserInfo) sessionService.getAttribute("USER_INFO");if (userInfo == null) {return Result.fail("未登录");}return Result.success(userInfo);}/*** 用户登出,销毁Session*/@PostMapping("/logout")@ApiOperation("用户登出")public Result logout() {sessionService.invalidateSession();return Result.success("登出成功");}/*** 获取在线用户数量*/@GetMapping("/onlineCount")@ApiOperation("获取在线用户数量")public Result getOnlineCount() {return Result.success(sessionService.getOnlineUserCount());}@Datapublic static class LoginRequest {private String username;private String password;}@Datapublic static class UserInfo {private String userId;private String username;private String nickname;private String role;}
}

30. 如何使用 Redis 实现分布式锁?

分布式锁是分布式系统中控制共享资源访问的一种机制,用于解决多个进程或服务同时操作共享资源导致的数据不一致问题。Redis 凭借其高性能和原子性操作,是实现分布式锁的常用选择。

分布式锁的核心要求
  1. 互斥性:同一时间只能有一个线程持有锁
  2. 安全性:锁只能被持有锁的线程释放
  3. 可用性:即使 Redis 节点故障,也能保证锁的可用性
  4. 幂等性:锁的获取和释放操作具有幂等性
  5. 防死锁:具备自动释放锁的机制,防止死锁
Redis 实现分布式锁(基于 SET 命令)
/*** Redis分布式锁服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redis分布式锁")
public class RedisDistributedLockService {private final RedisTemplate<String, Object> redisTemplate;// 锁的默认过期时间(30秒)private static final long DEFAULT_EXPIRE = 30000;// 锁的前缀private static final String LOCK_PREFIX = "distributed:lock:";// 线程本地存储当前持有的锁private final ThreadLocal<Map<String, Integer>> localLocks = ThreadLocal.withInitial(Maps::newHashMap);/*** 获取分布式锁* @param lockKey 锁的键* @param expire 过期时间(毫秒)* @param timeout 获取锁的超时时间(毫秒)* @return 是否获取成功*/@ApiOperation("获取分布式锁")public boolean tryLock(String lockKey, long expire, long timeout) {if (!StringUtils.hasText(lockKey)) {throw new IllegalArgumentException("锁的键不能为空");}if (expire <= 0) {expire = DEFAULT_EXPIRE;}String realLockKey = LOCK_PREFIX + lockKey;String lockValue = UUID.randomUUID().toString(); // 唯一值,用于标识锁的持有者long startTime = System.currentTimeMillis();// 循环获取锁,直到超时while (true) {// 使用SET命令的NX(不存在则设置)和PX(过期时间毫秒)参数实现原子操作Boolean success = redisTemplate.opsForValue().setIfAbsent(realLockKey, lockValue, expire, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {// 获取锁成功,记录到本地线程Map<String, Integer> locks = localLocks.get();locks.put(realLockKey, locks.getOrDefault(realLockKey, 0) + 1);log.info("获取分布式锁成功,lockKey:{}, value:{}", realLockKey, lockValue);// 启动锁续约线程,防止业务执行时间超过锁过期时间startLockRenewal(realLockKey, lockValue, expire);return true;}// 检查是否超时if (System.currentTimeMillis() - startTime >= timeout) {log.warn("获取分布式锁超时,lockKey:{}", realLockKey);return false;}// 短暂休眠后重试,避免频繁请求Redistry {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}/*** 获取分布式锁(使用默认参数)* @param lockKey 锁的键* @return 是否获取成功*/@ApiOperation("获取分布式锁(默认参数)")public boolean tryLock(String lockKey) {return tryLock(lockKey, DEFAULT_EXPIRE, 5000);}/*** 释放分布式锁* @param lockKey 锁的键* @return 是否释放成功*/@ApiOperation("释放分布式锁")public boolean unlock(String lockKey) {if (!StringUtils.hasText(lockKey)) {return false;}String realLockKey = LOCK_PREFIX + lockKey;Map<String, Integer> locks = localLocks.get();// 检查当前线程是否持有锁if (!locks.containsKey(realLockKey) || locks.get(realLockKey) <= 0) {log.warn("当前线程未持有锁,无法释放,lockKey:{}", realLockKey);return false;}// 减少重入计数int count = locks.get(realLockKey) - 1;if (count > 0) {locks.put(realLockKey, count);log.info("重入锁计数减少,lockKey:{}, 剩余计数:{}", realLockKey, count);return true;}// 计数为0,真正释放锁locks.remove(realLockKey);// 使用Lua脚本保证释放锁的原子性String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"    return redis.call('del', KEYS[1]) " +"else " +"    return 0 " +"end";String lockValue = (String) redisTemplate.opsForValue().get(realLockKey);DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);Long result = redisTemplate.execute(script, Collections.singletonList(realLockKey), lockValue);boolean success = Boolean.TRUE.equals(result != null && result > 0);if (success) {log.info("释放分布式锁成功,lockKey:{}", realLockKey);// 停止锁续约线程stopLockRenewal(realLockKey);} else {log.warn("释放分布式锁失败,lockKey:{}", realLockKey);}return success;}/*** 锁续约(防止业务执行时间超过锁过期时间)* @param lockKey 锁的键* @param lockValue 锁的值* @param expire 过期时间(毫秒)*/private void startLockRenewal(String lockKey, String lockValue, long expire) {// 续约线程,每隔1/3过期时间执行一次续约ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "lock-renewal-" + lockKey));long period = expire / 3;executor.scheduleAtFixedRate(() -> {try {// 检查锁是否还存在且属于当前线程String currentValue = (String) redisTemplate.opsForValue().get(lockKey);if (lockValue.equals(currentValue)) {// 续约,重置过期时间redisTemplate.expire(lockKey, expire, TimeUnit.MILLISECONDS);log.debug("分布式锁续约成功,lockKey:{}, 剩余时间:{}ms", lockKey, expire);} else {// 锁已释放或被其他线程获取,停止续约stopLockRenewal(lockKey);}} catch (Exception e) {log.error("分布式锁续约失败", e);stopLockRenewal(lockKey);}}, period, period, TimeUnit.MILLISECONDS);// 存储续约线程,用于后续停止redisTemplate.opsForValue().set(lockKey + ":renewal", executor, expire, TimeUnit.MILLISECONDS);}/*** 停止锁续约* @param lockKey 锁的键*/private void stopLockRenewal(String lockKey) {try {Object executorObj = redisTemplate.opsForValue().get(lockKey + ":renewal");if (executorObj instanceof ScheduledExecutorService) {ScheduledExecutorService executor = (ScheduledExecutorService) executorObj;executor.shutdown();redisTemplate.delete(lockKey + ":renewal");log.debug("分布式锁续约已停止,lockKey:{}", lockKey);}} catch (Exception e) {log.error("停止锁续约失败", e);}}/*** 尝试获取锁并执行任务(自动释放锁)* @param lockKey 锁的键* @param task 要执行的任务* @param <T> 任务返回值类型* @return 任务执行结果*/@ApiOperation("尝试获取锁并执行任务")public <T> T tryLockAndExecute(String lockKey, Supplier<T> task) {return tryLockAndExecute(lockKey, DEFAULT_EXPIRE, 5000, task);}/*** 尝试获取锁并执行任务(自动释放锁)* @param lockKey 锁的键* @param expire 锁过期时间* @param timeout 获取锁超时时间* @param task 要执行的任务* @param <T> 任务返回值类型* @return 任务执行结果*/@ApiOperation("尝试获取锁并执行任务(带参数)")public <T> T tryLockAndExecute(String lockKey, long expire, long timeout, Supplier<T> task) {boolean locked = false;try {locked = tryLock(lockKey, expire, timeout);if (locked) {return task.get();} else {throw new BusinessException("获取分布式锁失败,无法执行任务");}} finally {if (locked) {unlock(lockKey);}}}
}
分布式锁使用示例
/*** 分布式锁使用示例控制器* @author ken*/
@RestController
@RequestMapping("/api/lock")
@Api(tags = "分布式锁使用示例")
@RequiredArgsConstructor
public class LockExampleController {private final RedisDistributedLockService lockService;private final OrderService orderService;/*** 秒杀接口(使用分布式锁防止超卖)*/@PostMapping("/seckill")@ApiOperation("秒杀接口")public Result seckill(@RequestParam String productId, @RequestParam String userId) {// 使用商品ID作为锁的键,确保同一商品同一时间只有一个请求处理String lockKey = "seckill:" + productId;return lockService.tryLockAndExecute(lockKey, () -> {// 执行秒杀逻辑boolean success = orderService.createSeckillOrder(productId, userId);if (success) {return Result.success("秒杀成功");} else {return Result.fail("商品已售罄");}});}/*** 库存扣减接口(使用分布式锁保证原子性)*/@PostMapping("/deductStock")@ApiOperation("库存扣减接口")public Result deductStock(@RequestParam String productId, @RequestParam int quantity) {String lockKey = "stock:" + productId;return lockService.tryLockAndExecute(lockKey, 60000, 3000, () -> {// 扣减库存逻辑boolean success = orderService.deductStock(productId, quantity);if (success) {return Result.success("库存扣减成功");} else {return Result.fail("库存不足");}});}
}
Redisson 分布式锁集成(推荐)

Redisson 是一个基于 Redis 的 Java 驻留内存数据网格,提供了更完善的分布式锁实现,支持可重入锁、公平锁、读写锁等高级特性。

/*** Redisson分布式锁配置* @author ken*/
@Configuration
@Api(tags = "Redisson分布式锁配置")
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 单节点配置config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password").setConnectionPoolSize(16).setConnectionMinimumIdleSize(4).setIdleConnectionTimeout(30000).setConnectTimeout(10000).setTimeout(3000);// 集群配置示例/*config.useClusterServers().addNodeAddress("redis://192.168.1.100:6379","redis://192.168.1.101:6379","redis://192.168.1.102:6379").setPassword("password").setScanInterval(2000);*/return Redisson.create(config);}
}
/*** Redisson分布式锁服务* @author ken*/
@Service
@Slf4j
@RequiredArgsConstructor
@Api(tags = "Redisson分布式锁服务")
public class RedissonLockService {private final RedissonClient redissonClient;/*** 获取可重入锁* @param lockKey 锁的键* @return RLock对象*/@ApiOperation("获取可重入锁")public RLock getReentrantLock(String lockKey) {if (!StringUtils.hasText(lockKey)) {throw new IllegalArgumentException("锁的键不能为空");}return redissonClient.getLock("redisson:lock:" + lockKey);}/*** 获取公平锁* @param lockKey 锁的键* @return RLock对象*/@ApiOperation("获取公平锁")public RLock getFairLock(String lockKey) {if (!StringUtils.hasText(lockKey)) {throw new IllegalArgumentException("锁的键不能为空");}return redissonClient.getFairLock("redisson:fair:lock:" + lockKey);}/*** 获取读写锁* @param lockKey 锁的键* @return RReadWriteLock对象*/@ApiOperation("获取读写锁")public RReadWriteLock getReadWriteLock(String lockKey) {if (!StringUtils.hasText(lockKey)) {throw new IllegalArgumentException("锁的键不能为空");}return redissonClient.getReadWriteLock("redisson:rw:lock:" + lockKey);}/*** 使用可重入锁执行任务* @param lockKey 锁的键* @param expire 锁过期时间(秒)* @param task 要执行的任务* @param <T> 任务返回值类型* @return 任务执行结果*/@ApiOperation("使用可重入锁执行任务")public <T> T executeWithLock(String lockKey, int expire, Supplier<T> task) {RLock lock = getReentrantLock(lockKey);boolean locked = false;try {// 尝试获取锁,最多等待3秒,10秒后自动释放locked = lock.tryLock(3, expire, TimeUnit.SECONDS);if (locked) {return task.get();} else {throw new BusinessException("获取Redisson锁失败,无法执行任务");}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new BusinessException("获取锁被中断");} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();log.info("Redisson锁已释放,lockKey:{}", lockKey);}}}/*** 使用读写锁执行读任务* @param lockKey 锁的键* @param task 读任务* @param <T> 任务返回值类型* @return 任务执行结果*/@ApiOperation("使用读写锁执行读任务")public <T> T executeWithReadLock(String lockKey, Supplier<T> task) {RReadWriteLock rwLock = getReadWriteLock(lockKey);RLock readLock = rwLock.readLock();boolean locked = false;try {locked = readLock.tryLock(3, 60, TimeUnit.SECONDS);if (locked) {return task.get();} else {throw new BusinessException("获取读锁失败");}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new BusinessException("获取读锁被中断");} finally {if (locked && readLock.isHeldByCurrentThread()) {readLock.unlock();}}}/*** 使用读写锁执行写任务* @param lockKey 锁的键* @param task 写任务* @param <T> 任务返回值类型* @return 任务执行结果*/@ApiOperation("使用读写锁执行写任务")public <T> T executeWithWriteLock(String lockKey, Supplier<T> task) {RReadWriteLock rwLock = getReadWriteLock(lockKey);RLock writeLock = rwLock.writeLock();boolean locked = false;try {locked = writeLock.tryLock(3, 60, TimeUnit.SECONDS);if (locked) {return task.get();} else {throw new BusinessException("获取写锁失败");}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new BusinessException("获取写锁被中断");} finally {if (locked && writeLock.isHeldByCurrentThread()) {writeLock.unlock();}}}
}

十一、Redis 最佳实践与总结

31. Redis 使用有哪些最佳实践?

Redis 作为高性能的内存数据库,在实际使用中需要遵循一些最佳实践,以确保其稳定性、安全性和高效性。

1. 键设计最佳实践
  • 使用统一的命名规范:采用 “业务:模块:标识” 的格式(如user:info:10001
  • 避免过长的键名:键名不宜过长,会占用更多内存和带宽
  • 合理使用 Hash 存储对象:将相关字段存储在 Hash 中,减少键的数量
  • 避免大量小键:大量小键会增加 Redis 的管理开销,可适当合并
/*** 键设计示例* @author ken*/
public class RedisKeyGenerator {// 用户信息键:user:info:{userId}public static String getUserInfoKey(String userId) {return "user:info:" + userId;}// 商品库存键:product:stock:{productId}public static String getProductStockKey(String productId) {return "product:stock:" + productId;}// 用户订单列表键:user:orders:{userId}public static String getUserOrdersKey(String userId) {return "user:orders:" + userId;}// 热点数据计数器:counter:hot:{key}public static String getHotCounterKey(String key) {return "counter:hot:" + key;}
}
2. 数据结构选择最佳实践
  • String:适合存储单个值、计数器、分布式锁等
  • Hash:适合存储对象类型数据(如用户信息、商品详情)
  • List:适合实现队列、栈、最新列表等
  • Set:适合实现去重、交集、并集等集合操作
  • Sorted Set:适合实现排行榜、延迟队列、范围查询等
  • Bitmap:适合存储二值状态数据(如签到、在线状态)
  • HyperLogLog:适合进行基数统计(如 UV 统计)
3. 命令使用最佳实践
  • 避免使用 O (N) 复杂度命令:如 KEYS、HGETALL、SMEMBERS 等,改用 SCAN 系列命令
  • 使用批量命令减少网络开销:如 MSET、MGET、HMSET 等
  • 合理使用管道(Pipeline):批量执行命令,减少往返次数
  • 避免在事务中执行过多命令:事务会阻塞其他操作,影响性能
  • 谨慎使用 KEYS 命令:生产环境禁止使用,会阻塞 Redis
4. 内存管理最佳实践
  • 设置合理的过期时间:避免内存无限增长
  • 选择合适的内存淘汰策略:根据业务场景选择 LRU/LFU 等策略
  • 定期清理无效数据:通过脚本或定时任务清理过期数据
  • 避免存储大 Value:大 Value 会增加网络传输时间和内存占用
  • 压缩存储大对象:对大对象进行压缩后再存储
/*** 大Value压缩存储示例* @author ken*/
@Service
public class CompressedStorageService {private final RedisTemplate<String, Object> redisTemplate;private final SnappyCompressor compressor = new SnappyCompressor(); // 使用Snappy压缩@Autowiredpublic CompressedStorageService(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;}/*** 压缩存储对象*/public void setCompressed(String key, Object value, long expireSeconds) {try {// 序列化对象byte[] data = JSON.toJSONBytes(value);// 压缩数据byte[] compressedData = compressor.compress(data);// 存储压缩后的数据redisTemplate.opsForValue().set(key, compressedData, expireSeconds, TimeUnit.SECONDS);log.info("压缩存储成功,key:{}, 原始大小:{} bytes, 压缩后:{} bytes",key, data.length, compressedData.length);} catch (Exception e) {log.error("压缩存储失败", e);throw new RuntimeException("压缩存储失败", e);}}/*** 获取并解压对象*/public <T> T getCompressed(String key, Class<T> clazz) {try {byte[] compressedData = (byte[]) redisTemplate.opsForValue().get(key);if (compressedData == null) {return null;}// 解压数据byte[] data = compressor.decompress(compressedData);// 反序列化对象return JSON.parseObject(data, clazz);} catch (Exception e) {log.error("解压获取失败", e);throw new RuntimeException("解压获取失败", e);}}
}
5. 高可用最佳实践
  • 使用主从复制:至少一主一从,避免单点故障
  • 部署 Redis Cluster:实现数据分片和自动故障转移
  • 合理配置哨兵:监控主从节点,实现自动故障转移
  • 定期备份数据:结合 RDB 和 AOF,确保数据可恢复
  • 跨机房部署:关键业务采用跨机房部署,提高可用性
6. 安全最佳实践
  • 设置密码认证:通过 requirepass 配置 Redis 密码
  • 限制网络访问:通过防火墙限制 Redis 端口的访问来源
  • 禁用危险命令:通过 rename-command 禁用 FLUSHALL、FLUSHDB 等危险命令
  • 不使用 root 用户运行:使用普通用户运行 Redis,减少安全风险
  • 定期更新 Redis 版本:及时修复已知的安全漏洞
# 安全配置示例
requirepass strong_password_here# 禁用危险命令
rename-command FLUSHALL ""
rename-command FLUSHDB ""
rename-command KEYS ""
rename-command CONFIG ""# 绑定特定IP(生产环境建议只绑定内网IP)
bind 192.168.1.100
7. 监控与运维最佳实践
  • 实时监控关键指标:内存使用、命中率、响应时间、连接数等
  • 设置告警阈值:当指标超过阈值时及时告警
  • 定期性能测试:模拟高并发场景,验证 Redis 性能
  • 记录慢查询日志:通过 slowlog 配置记录慢查询,优化性能
  • 制定应急响应预案:针对 Redis 故障制定恢复流程
# 慢查询日志配置
slowlog-log-slower-than 10000  # 记录执行时间超过10ms的命令
slowlog-max-len 1000           # 最多保存1000条慢查询记录

32. Redis 常见问题与解决方案总结

1. 缓存穿透
  • 问题:查询不存在的数据,导致请求穿透到数据库
  • 解决方案
    • 缓存空值,设置较短的过期时间
    • 使用布隆过滤器过滤不存在的键
    • 接口层实现限流和参数校验
/*** 布隆过滤器解决缓存穿透示例* @author ken*/
@Service
public class BloomFilterService {private final RedisTemplate<String, Object> redisTemplate;private final BloomFilter<String> idBloomFilter;@Autowiredpublic BloomFilterService(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;// 初始化布隆过滤器,预计元素100万,误判率0.01this.idBloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 1000000, 0.01);// 预热布隆过滤器(从数据库加载已存在的ID)preloadBloomFilter();}/*** 查询数据(带布隆过滤器校验)*/public Object getData(String id) {// 1. 布隆过滤器校验,如果不存在直接返回if (!idBloomFilter.mightContain(id)) {log.info("ID不存在,布隆过滤器拦截,id:{}", id);return null;}// 2. 查询缓存String key = "data:" + id;Object data = redisTemplate.opsForValue().get(key);if (data != null) {return data;}// 3. 查询数据库data = queryDataFromDB(id);if (data != null) {// 4. 写入缓存redisTemplate.opsForValue().set(key, data, 30, TimeUnit.MINUTES);} else {// 5. 缓存空值,避免缓存穿透redisTemplate.opsForValue().set(key, "", 5, TimeUnit.MINUTES);}return data;}// 从数据库加载已存在的ID到布隆过滤器private void preloadBloomFilter() {// 实际项目中从数据库批量查询IDList<String> existingIds = Arrays.asList("1001", "1002", "1003");existingIds.forEach(idBloomFilter::put);log.info("布隆过滤器预热完成,加载ID数量:{}", existingIds.size());}// 模拟从数据库查询private Object queryDataFromDB(String id) {// 实际项目中实现数据库查询逻辑return "data_" + id;}
}
2. 缓存击穿
  • 问题:热点 key 过期瞬间,大量请求穿透到数据库
  • 解决方案
    • 热点 key 设置永不过期
    • 互斥锁:获取锁的线程更新缓存,其他线程等待
    • 定时任务提前更新过期的热点 key
3. 缓存雪崩
  • 问题:大量缓存同时过期或 Redis 宕机,导致请求全部穿透到数据库
  • 解决方案
    • 缓存过期时间添加随机值,避免同时过期
    • 部署 Redis 集群,提高可用性
    • 服务降级和限流,保护数据库
    • 多级缓存:本地缓存 + 分布式缓存
4. 数据一致性
  • 问题:缓存与数据库数据不一致
  • 解决方案
    • 先更新数据库,再删除缓存(Cache Aside Pattern)
    • 写入缓存时设置合理的过期时间
    • 关键业务采用分布式事务保证一致性
    • 最终一致性:通过消息队列异步更新缓存
5. 大 key 问题
  • 问题:单个 key 存储过大的数据,导致性能问题
  • 解决方案
    • 将大 key 拆分为多个小 key
    • 压缩大 value 后存储
    • 避免使用 HGETALL 等命令获取全部字段
    • 定期清理大 key
/*** 大key拆分示例* @author ken*/
@Service
public class BigKeySplitService {private final RedisTemplate<String, Object> redisTemplate;private static final int SPLIT_SIZE = 1000; // 每个分片的大小@Autowiredpublic BigKeySplitService(RedisTemplate<String, Object> redisTemplate) {this.redisTemplate = redisTemplate;}/*** 存储大列表(拆分存储)*/public void setBigList(String key, List<Object> dataList, long expireSeconds) {if (CollectionUtils.isEmpty(dataList)) {return;}// 计算分片数量int totalSize = dataList.size();int splitCount = (totalSize + SPLIT_SIZE - 1) / SPLIT_SIZE;// 存储分片信息String metaKey = key + ":meta";redisTemplate.opsForValue().set(metaKey, splitCount, expireSeconds, TimeUnit.SECONDS);// 分片存储数据for (int i = 0; i < splitCount; i++) {int start = i * SPLIT_SIZE;int end = Math.min(start + SPLIT_SIZE, totalSize);List<Object> subList = dataList.subList(start, end);String splitKey = key + ":split:" + i;redisTemplate.opsForValue().set(splitKey, subList, expireSeconds, TimeUnit.SECONDS);}log.info("大列表拆分存储完成,key:{}, 总大小:{}, 分片数量:{}", key, totalSize, splitCount);}/*** 获取大列表(合并分片)*/public List<Object> getBigList(String key) {String metaKey = key + ":meta";Integer splitCount = (Integer) redisTemplate.opsForValue().get(metaKey);if (splitCount == null || splitCount <= 0) {return Collections.emptyList();}List<Object> result = Lists.newArrayList();// 合并所有分片for (int i = 0; i < splitCount; i++) {String splitKey = key + ":split:" + i;List<Object> subList = (List<Object>) redisTemplate.opsForValue().get(splitKey);if (CollectionUtils.isNotEmpty(subList)) {result.addAll(subList);}}log.info("大列表合并完成,key:{}, 总大小:{}", key, result.size());return result;}
}
6. 网络问题
  • 问题:Redis 与应用之间的网络延迟或中断
  • 解决方案
    • 使用连接池管理连接,设置合理的超时时间
    • 实现重试机制,处理临时网络故障
    • 部署 Redis 在与应用相同的机房,减少网络延迟
    • 监控网络指标,及时发现网络问题

33. Redis 发展趋势与总结

Redis 发展趋势
  1. 多数据模型增强:Redis 不断丰富数据结构,如 Redis 6.2 引入的 BITFIELD_RO,Redis 7.0 增强的 Stream 功能
  2. 性能持续优化:不断优化内存管理和 IO 模型,提升处理能力
  3. 增强的集群功能:改进分片机制,提高集群稳定性和扩展性
  4. 与云原生融合:更好地支持容器化部署和 Kubernetes 集成
  5. 安全性增强:加强访问控制和数据加密功能
  6. AI 集成:探索在 Redis 中集成机器学习功能,支持实时数据分析
总结

Redis 作为高性能的内存数据库,凭借其丰富的数据结构、卓越的性能和灵活的部署方式,已成为分布式系统中不可或缺的组件。无论是作为缓存、数据库、消息队列还是分布式协调工具,Redis 都能发挥重要作用。

http://www.dtcms.com/a/618304.html

相关文章:

  • component-富文本实现(WangEditor)
  • 烟台城乡住房建设厅网站网站alt标签
  • win11上使用Workbench备份mysql数据库
  • B站评论数据采集:基于Requests的智能爬虫实战
  • 信息学与容斥
  • 网易云音乐评论数据采集:基于Requests的智能爬虫实战
  • 网站空间登录网站建设模式有哪些内容
  • VSCode 中快捷键的使用:(大小写转换快捷键、自动补全函数注释快捷键、代码和注释自动缩进快捷键)
  • 使用 Python 语言 从 0 到 1 搭建完整 Web UI自动化测试学习系列 25--数据驱动--参数化处理 Excel 文件 2
  • SpringCloud微服务笔记
  • 广告公司网站官网安徽网站建设流程
  • 华为OD机试真题2025双机位A卷 --【压缩日志查询】(Python C++ JAVA JS GO)
  • 网站编辑怎么做内容分类手机网站 程序
  • 瑞安建设网站成都vr 网站开发
  • C++多线程【数据共享】之互斥锁
  • Java漏洞集合工具
  • JavaScript 正则表达式详解
  • 【CS创世SD NAND征文】高可靠性数控设备:技术方案与行业展望
  • 深入理解Go语言Slice的append操作:从内存分配到扩容机制
  • Linux---文件控制<fcntl.h> (file control, fcntl)
  • 网站放到服务器珠海市 网站建设
  • 农林科技公司网站模板seo研究中心官网
  • 东莞响应式网站哪家好架设网站开发环境
  • 类似淘宝网站建设有哪些模板wordpress文章图片全屏浏览
  • 技术演进中的开发沉思-194 JavaScript: Prototype 框架
  • Windows MongoDB 安装与配置指南
  • Kafka客户端整合
  • 购物网站建设方案手机建立网站的软件
  • 力扣hot100----1day
  • 二叉树的前序遍历解题思路