Redis高级篇之最佳实践
摘要:本文围绕 Redis 高级实践展开,从键值设计、批处理优化、服务端优化(涵盖持久化、慢查询、命令安全、内存配置)到集群优化等多个关键方面,详细阐述了 Redis 在实际应用中的最佳实践方案。旨在帮助用户规范 Redis 使用,规避安全风险,提升 Redis 的性能、稳定性与可用性,确保其在生产环境中高效、安全地运行。
文章超详细思维导图
1、Redis键值设计
1.1、优雅的key结构
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
-
遵循基本格式:[业务名称]:[数据名]:[id]
-
长度不超过44字节
-
不包含特殊字符
例如:我们的登录业务,保存用户信息,其key可以设计成如下格式:
这样设计的好处:
-
可读性强
-
避免key冲突
-
方便管理
-
更节省内存
Redis 的 String 类型键值,其底层根据值的大小和内容采用三种编码格式:int
、embstr
和 raw
。
embstr
编码: 当字符串值长度小于等于 44 字节时使用。其最大优势在于 Redis Object (robj) 和对应的 SDS (Simple Dynamic String) 结构体被分配在一块连续的内存空间中。这种布局不仅内存占用更小(减少了单独分配 SDS 头的开销),而且得益于内存连续性,数据访问效率更高。
raw
编码: 当字符串值长度超过 44 字节时,会自动转换为 raw
编码。在此编码下,Redis Object 和 SDS 结构体会被分配在两块独立的内存空间中。Redis Object 内部仅存储一个指向 SDS 内存块的指针。这种非连续存储方式带来了两方面影响:
-
访问性能降低: 读取数据需要额外的指针跳转(两次内存访问),不如
embstr
直接。 -
潜在内存碎片: SDS 空间单独分配,增大了内存分配器的管理负担,易产生内存碎片。
1.2、拒绝BigKey
BigKey通常以Key的大小和Key中成员的数量来综合判定,例如:
-
Key本身的数据量过大:一个String类型的Key,它的值为5 MB
-
Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个
-
Key中成员的数据量过大:一个Hash类型的Key,其成员的Value(值)总大小为100 MB
推荐值:
-
单个key的value小于10KB
-
对于集合类型的key,建议元素数量小于1000
BigKey危害
1.2.2、如何发现BigKey
①redis-cli --bigkeys
利用redis-cli提供的--bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的big key
#命令
redis-cli -a 密码 --bigkeys
②scan扫描
自己编程,利用scan扫描Redis中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)
import com.heima.jedis.util.JedisConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanResult;import java.util.HashMap;
import java.util.List;
import java.util.Map;public class JedisTest {private Jedis jedis;@BeforeEachvoid setUp() {// 1.建立连接// jedis = new Jedis("192.168.150.101", 6379);jedis = JedisConnectionFactory.getJedis();// 2.设置密码jedis.auth("123321");// 3.选择库jedis.select(0);}final static int STR_MAX_LEN = 10 * 1024;final static int HASH_MAX_LEN = 500;@Testvoid testScan() {int maxLen = 0;long len = 0;String cursor = "0";do {// 扫描并获取一部分keyScanResult<String> result = jedis.scan(cursor);// 记录cursorcursor = result.getCursor();List<String> list = result.getResult();if (list == null || list.isEmpty()) {break;}// 遍历for (String key : list) {// 判断key的类型String type = jedis.type(key);switch (type) {case "string":len = jedis.strlen(key);maxLen = STR_MAX_LEN;break;case "hash":len = jedis.hlen(key);maxLen = HASH_MAX_LEN;break;case "list":len = jedis.llen(key);maxLen = HASH_MAX_LEN;break;case "set":len = jedis.scard(key);maxLen = HASH_MAX_LEN;break;case "zset":len = jedis.zcard(key);maxLen = HASH_MAX_LEN;break;default:break;}//打印BigKeyif (len >= maxLen) {System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len);}}} while (!cursor.equals("0"));}@AfterEachvoid tearDown() {if (jedis != null) {jedis.close();}}}
③第三方工具
-
利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况
-
https://github.com/sripathikrishnan/redis-rdb-tools
④网络监控
-
自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
-
一般阿里云搭建的云服务器就有相关监控页面
1.2.3、如何删除BigKey
BigKey内存占用较多,即便时删除这样的key也需要耗费很长时间,导致Redis主线程阻塞,引发一系列问题。
- redis 3.0 及以下版本:
- 如果是集合类型,则遍历BigKey的元素,先逐个删除子元素,最后删除BigKey
- Redis 4.0以后:
-
Redis在4.0后提供了异步删除的命令:unlink
1.3、恰当的数据类型
例1:比如存储一个User对象,我们有三种存储方式:
①方式一:json字符串
{user:1 | {"name": "Jack", "age": 21}}
优点:实现简单粗暴
缺点:数据耦合,不够灵活
②方式二:字段打散
user:1:name | Jack |
---|---|
user:1:age | 21 |
优点:可以灵活访问对象任意字段
缺点:占用空间大、没办法做统一控制
③方式三:hash(推荐)
user:1 | name | jack |
age | 21 |
优点:底层使用ziplist,空间占用小,可以灵活访问对象的任意字段
缺点:代码相对复杂
例2:假如有hash类型的key,其中有100万对field和value,field是自增id,这个key存在什么问题?如何优化?
key | field | value |
someKey | id:0 | value0 |
..... | ..... | |
id:999999 | value999999 |
存在的问题:
-
hash的entry数量超过500时,会使用哈希表而不是ZipList,内存占用较多
-
可以通过hash-max-ziplist-entries配置entry上限,但是如果entry过多就会导致BigKey问题
-
方案一:拆分为string类型
key | value |
id:0 | value0 |
..... | ..... |
id:999999 | value999999 |
存在的问题:
-
string结构底层没有太多内存优化,内存占用较多
-
想要批量获取这些数据比较麻烦
方案二:拆分为小的hash,将 id / 100 作为key, 将id % 100 作为field
key | field | value |
key:0 | id:00 | value0 |
..... | ..... | |
id:99 | value99 | |
key:1 | id:00 | value100 |
..... | ..... | |
id:99 | value199 | |
.... | ||
key:9999 | id:00 | value999900 |
..... | ..... | |
id:99 | value999999 |
代码实现:
package com.heima.test;
import com.heima.jedis.util.JedisConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanResult;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JedisTest {private Jedis jedis;
@BeforeEachvoid setUp() {// 1.建立连接// jedis = new Jedis("192.168.150.101", 6379);jedis = JedisConnectionFactory.getJedis();// 2.设置密码jedis.auth("123321");// 3.选择库jedis.select(0);}
@Testvoid testSetBigKey() {Map<String, String> map = new HashMap<>();for (int i = 1; i <= 650; i++) {map.put("hello_" + i, "world!");}jedis.hmset("m2", map);}
@Testvoid testBigHash() {Map<String, String> map = new HashMap<>();for (int i = 1; i <= 100000; i++) {map.put("key_" + i, "value_" + i);}jedis.hmset("test:big:hash", map);}
@Testvoid testBigString() {for (int i = 1; i <= 100000; i++) {jedis.set("test:str:key_" + i, "value_" + i);}}
@Testvoid testSmallHash() {int hashSize = 100;Map<String, String> map = new HashMap<>(hashSize);for (int i = 1; i <= 100000; i++) {int k = (i - 1) / hashSize;int v = i % hashSize;map.put("key_" + v, "value_" + v);if (v == 0) {jedis.hmset("test:small:hash_" + k, map);}}}
@AfterEachvoid tearDown() {if (jedis != null) {jedis.close();}}
}
2、批处理优化
2.1、Pipeline
2.1.1、我们的客户端与redis服务器是这样交互的
单个命令的执行流程
N条命令的执行流程
多条指令批量的传输流程
2.1.2、MSet
利用mset批量插入10万条数据
@Test
void testMxx() {String[] arr = new String[2000];int j;long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {j = (i % 1000) << 1;arr[j] = "test:key_" + i;arr[j + 1] = "value_" + i;if (j == 0) {jedis.mset(arr);}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
}
注意:不要在一次处理中传输太多命令,否则单次命令占用宽带过多,会导致网络阻塞
2.1.3、Pipeline
MSET虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用Pipeline
@Test
void testPipeline() {// 创建管道Pipeline pipeline = jedis.pipelined();long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {// 放入命令到管道pipeline.set("test:key_" + i, "value_" + i);if (i % 1000 == 0) {// 每放入1000条命令,批量执行pipeline.sync();}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
}
2.2、集群下的批处理
Redis 集群要求批处理命令(如 MSET
或包含多 Key 的 Pipeline)中的所有 Key 必须位于同一个哈希槽 (Slot),否则会执行失败。这在实际应用中是个挑战,因为批处理的数据 Key 天然可能分布在不同的 Slot 和节点上。
针对此限制,主要有四种解决方案:
-
串行单命令执行:
-
做法: 将批处理拆分成独立的单条命令,依次发送执行。
-
优点: 实现极其简单。
-
缺点: 网络往返次数最多,耗时最长,性能最差,通常没有实用价值。
-
-
按 Slot 分组串行执行:
-
做法: 客户端预先计算每个 Key 所属的 Slot。将相同 Slot 的 Key 分组。对每个分组分别执行 Pipeline 批处理。各组命令串行执行(即一组执行完再执行下一组)。
-
优点: 比方案 1 大幅减少网络往返次数(每组内 Pipeline 优化),耗时显著降低。
-
缺点: 实现比方案 1 复杂。不同 Slot 组的命令仍需等待,存在串行等待时间,非最优。
-
-
按 Slot 分组并行执行:
-
做法: 同样是预先按 Slot 分组 Key。关键区别在于,不同 Slot 组的 Pipeline 命令是并行发送执行的(例如利用多线程或异步 I/O)。
-
优点: 在方案 2 的基础上,消除了不同 Slot 组间的串行等待时间,耗时最短(仅受最慢的那个 Slot 组执行时间影响)。
-
缺点: 实现最复杂,需要客户端管理并发和结果聚合。
-
-
使用 Hash Tag:
-
做法: 设计 Key 时加入
{hashtag}
。Redis 集群仅根据{
和}
之间的字符串计算 Slot。确保相关 Key 使用相同的{hashtag}
部分,它们就会落在同一个 Slot。 -
优点: 实现相对简单(主要在 Key 设计)。一次 Pipeline 即可完成所有操作,网络开销最小,耗时最低。
-
缺点: 强依赖 Key 设计。如果大量 Key 使用相同的
{hashtag}
,会导致数据分布不均(数据倾斜/热点问题),影响集群扩展性和性能。
-
总结与推荐:
-
方案 4 (Hash Tag) 性能最优且实现较简单,是首选方案,但务必谨慎设计
{hashtag}
以避免严重的数据倾斜(例如,使用用户 ID 后几位作为 tag 的一部分)。 -
方案 3 (按 Slot 分组并行执行) 在无法使用 Hash Tag 或必须避免潜在倾斜风险时,是实现高性能批处理的推荐方案。它克服了方案 2 的串行瓶颈,性能接近方案 4。(最为推荐)
-
方案 2 (按 Slot 分组串行执行) 是方案 3 的简化版,在并行实现困难时可作为过渡方案,但其串行特性限制了性能上限。
-
方案 1 (串行单命令) 性能过低,不推荐。
2.2.1 串行化执行代码实践
public class JedisClusterTest {private JedisCluster jedisCluster;@BeforeEachvoid setUp() {// 配置连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);poolConfig.setMaxWaitMillis(1000);HashSet<HostAndPort> nodes = new HashSet<>();nodes.add(new HostAndPort("192.168.150.101", 7001));nodes.add(new HostAndPort("192.168.150.101", 7002));nodes.add(new HostAndPort("192.168.150.101", 7003));nodes.add(new HostAndPort("192.168.150.101", 8001));nodes.add(new HostAndPort("192.168.150.101", 8002));nodes.add(new HostAndPort("192.168.150.101", 8003));jedisCluster = new JedisCluster(nodes, poolConfig);}@Testvoid testMSet() {jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");}@Testvoid testMSet2() {Map<String, String> map = new HashMap<>(3);map.put("name", "Jack");map.put("age", "21");map.put("sex", "Male");//对Map数据进行分组。根据相同的slot放在一个分组//key就是slot,value就是一个组Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet().stream().collect(Collectors.groupingBy(entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())));//串行的去执行mset的逻辑for (List<Map.Entry<String, String>> list : result.values()) {String[] arr = new String[list.size() * 2];int j = 0;for (int i = 0; i < list.size(); i++) {j = i<<2;Map.Entry<String, String> e = list.get(0);arr[j] = e.getKey();arr[j + 1] = e.getValue();}jedisCluster.mset(arr);}}@AfterEachvoid tearDown() {if (jedisCluster != null) {jedisCluster.close();}}
}
2.2.2 Spring集群环境下批处理代码
@Testvoid testMSetInCluster() {Map<String, String> map = new HashMap<>(3);map.put("name", "Rose");map.put("age", "21");map.put("sex", "Female");stringRedisTemplate.opsForValue().multiSet(map);List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));strings.forEach(System.out::println);}
原理分析
在 RedisAdvancedClusterAsyncCommandsImpl
类中的核心流程:
-
按 Slot 分组数据
根据 key 的哈希槽(slot)将数据分区到Map<Integer, ...>
中:-
Key → 目标 slot 值
-
Value → 相同 slot 的所有键值对集合
-
-
异步批量操作
通过RedisFuture<String> mset = super.mset(op)
将分组后的数据异步发送到对应 slot 所在的集群节点执行。
3、服务器端优化-持久化配置
4、服务器端优化-慢查询优化
4.1 什么是慢查询
慢查询的危害:由于Redis是单线程的,所以当客户端发出指令后,他们都会进入到redis底层的queue来执行,如果此时有一些慢查询的数据,就会导致大量请求阻塞,从而引起报错,所以我们需要解决慢查询问题。
慢查询的阈值可以通过配置指定:
slowlog-log-slower-than:慢查询阈值,单位是微秒。默认是10000,建议1000
slowlog-max-len:慢查询日志(本质是一个队列)的长度。默认是128,建议1000
慢查询会被放入慢查询日志中,日志的长度有上限,可以通过配置指定:
4.2 如何查看慢查询
知道了以上内容之后,那么咱们如何去查看慢查询日志列表呢:
-
slowlog len:查询慢查询日志长度
-
slowlog get [n]:读取n条慢查询日志
-
slowlog reset:清空慢查询列表
也可使用redis客户端进行查看
5、服务器端优化-命令及安全配置
安全可以说是服务器端一个非常重要的话题,如果安全出现了问题,那么一旦这个漏洞被一些坏人知道了之后,并且进行攻击,那么这就会给咱们的系统带来很多的损失,所以我们这节课就来解决这个问题。
进入
6、服务器端优化-Redis内存划分和内存配置
接下来我们看到了这些配置,最关键的缓存区内存如何定位和解决呢?
7、服务器端集群优化-集群还是主从
问题1:集群完整性问题
问题2:集群带宽问题
那我们到底是集群还是主从
单体Redis(主从Redis)已经能达到万级别的QPS,并且也具备很强的高可用特性。如果主从能满足业务需求的情况下,所以如果不是在万不得已的情况下,尽量不搭建Redis集群
至此,大功告成!🎉🎉🎉