当前位置: 首页 > news >正文

Redis高级篇之最佳实践

摘要:本文围绕 Redis 高级实践展开,从键值设计、批处理优化、服务端优化(涵盖持久化、慢查询、命令安全、内存配置)到集群优化等多个关键方面,详细阐述了 Redis 在实际应用中的最佳实践方案。旨在帮助用户规范 Redis 使用,规避安全风险,提升 Redis 的性能、稳定性与可用性,确保其在生产环境中高效、安全地运行。                                   


文章超详细思维导图

1、Redis键值设计

1.1、优雅的key结构

Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:

  • 遵循基本格式:[业务名称]:[数据名]:[id]

  • 长度不超过44字节

  • 不包含特殊字符

例如:我们的登录业务,保存用户信息,其key可以设计成如下格式:

这样设计的好处:

  • 可读性强

  • 避免key冲突

  • 方便管理

  • 更节省内存

Redis 的 String 类型键值,其底层根据值的大小和内容采用三种编码格式:intembstr 和 raw

embstr 编码 当字符串值长度小于等于 44 字节时使用。其最大优势在于 Redis Object (robj) 和对应的 SDS (Simple Dynamic String) 结构体被分配在一块连续的内存空间中。这种布局不仅内存占用更小(减少了单独分配 SDS 头的开销),而且得益于内存连续性,数据访问效率更高

raw 编码: 当字符串值长度超过 44 字节时,会自动转换为 raw 编码。在此编码下,Redis Object 和 SDS 结构体会被分配在两块独立的内存空间中。Redis Object 内部仅存储一个指向 SDS 内存块的指针。这种非连续存储方式带来了两方面影响:

  • 访问性能降低: 读取数据需要额外的指针跳转(两次内存访问),不如 embstr 直接。

  • 潜在内存碎片: SDS 空间单独分配,增大了内存分配器的管理负担,易产生内存碎片。

1.2、拒绝BigKey

BigKey通常以Key的大小和Key中成员的数量来综合判定,例如:

  • Key本身的数据量过大:一个String类型的Key,它的值为5 MB

  • Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个

  • Key中成员的数据量过大:一个Hash类型的Key,其成员的Value(值)总大小为100 MB

推荐值:

  • 单个key的value小于10KB

  • 对于集合类型的key,建议元素数量小于1000

BigKey危害

1.2.2、如何发现BigKey


①redis-cli --bigkeys

利用redis-cli提供的--bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的big key

#命令
redis-cli -a 密码 --bigkeys


②scan扫描

自己编程,利用scan扫描Redis中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)

import com.heima.jedis.util.JedisConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanResult;import java.util.HashMap;
import java.util.List;
import java.util.Map;public class JedisTest {private Jedis jedis;@BeforeEachvoid setUp() {// 1.建立连接// jedis = new Jedis("192.168.150.101", 6379);jedis = JedisConnectionFactory.getJedis();// 2.设置密码jedis.auth("123321");// 3.选择库jedis.select(0);}final static int STR_MAX_LEN = 10 * 1024;final static int HASH_MAX_LEN = 500;@Testvoid testScan() {int maxLen = 0;long len = 0;String cursor = "0";do {// 扫描并获取一部分keyScanResult<String> result = jedis.scan(cursor);// 记录cursorcursor = result.getCursor();List<String> list = result.getResult();if (list == null || list.isEmpty()) {break;}// 遍历for (String key : list) {// 判断key的类型String type = jedis.type(key);switch (type) {case "string":len = jedis.strlen(key);maxLen = STR_MAX_LEN;break;case "hash":len = jedis.hlen(key);maxLen = HASH_MAX_LEN;break;case "list":len = jedis.llen(key);maxLen = HASH_MAX_LEN;break;case "set":len = jedis.scard(key);maxLen = HASH_MAX_LEN;break;case "zset":len = jedis.zcard(key);maxLen = HASH_MAX_LEN;break;default:break;}//打印BigKeyif (len >= maxLen) {System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len);}}} while (!cursor.equals("0"));}@AfterEachvoid tearDown() {if (jedis != null) {jedis.close();}}}

③第三方工具
  • 利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况

  • https://github.com/sripathikrishnan/redis-rdb-tools


④网络监控
  • 自定义工具,监控进出Redis的网络数据,超出预警值时主动告警

  • 一般阿里云搭建的云服务器就有相关监控页面

1.2.3、如何删除BigKey

BigKey内存占用较多,即便时删除这样的key也需要耗费很长时间,导致Redis主线程阻塞,引发一系列问题。

  •  redis 3.0 及以下版本:
  • 如果是集合类型,则遍历BigKey的元素,先逐个删除子元素,最后删除BigKey
  • Redis 4.0以后:
  • Redis在4.0后提供了异步删除的命令:unlink


1.3、恰当的数据类型

例1:比如存储一个User对象,我们有三种存储方式:

①方式一:json字符串
{user:1 | {"name": "Jack", "age": 21}}

优点:实现简单粗暴

缺点:数据耦合,不够灵活


②方式二:字段打散
user:1:nameJack
user:1:age21

优点:可以灵活访问对象任意字段

缺点:占用空间大、没办法做统一控制


③方式三:hash(推荐)
user:1namejack
age21

优点:底层使用ziplist,空间占用小,可以灵活访问对象的任意字段

缺点:代码相对复杂


例2:假如有hash类型的key,其中有100万对field和value,field是自增id,这个key存在什么问题?如何优化?

keyfieldvalue
someKeyid:0value0
..........
id:999999value999999

​存在的问题:

  • hash的entry数量超过500时,会使用哈希表而不是ZipList,内存占用较多

  • 可以通过hash-max-ziplist-entries配置entry上限,但是如果entry过多就会导致BigKey问题


方案一:拆分为string类型

keyvalue
id:0value0
..........
id:999999value999999

​存在的问题:

  • string结构底层没有太多内存优化,内存占用较多

  • 想要批量获取这些数据比较麻烦


    方案二:拆分为小的hash,将 id / 100 作为key, 将id % 100 作为field

    keyfieldvalue
    key:0id:00value0
    ..........
    id:99value99
    key:1id:00value100
    ..........
    id:99value199
    ....
    key:9999id:00value999900
    ..........
    id:99value999999

    ​代码实现:

    package com.heima.test;
    ​
    import com.heima.jedis.util.JedisConnectionFactory;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.Pipeline;
    import redis.clients.jedis.ScanResult;
    ​
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    ​
    public class JedisTest {private Jedis jedis;
    ​@BeforeEachvoid setUp() {// 1.建立连接// jedis = new Jedis("192.168.150.101", 6379);jedis = JedisConnectionFactory.getJedis();// 2.设置密码jedis.auth("123321");// 3.选择库jedis.select(0);}
    ​@Testvoid testSetBigKey() {Map<String, String> map = new HashMap<>();for (int i = 1; i <= 650; i++) {map.put("hello_" + i, "world!");}jedis.hmset("m2", map);}
    ​@Testvoid testBigHash() {Map<String, String> map = new HashMap<>();for (int i = 1; i <= 100000; i++) {map.put("key_" + i, "value_" + i);}jedis.hmset("test:big:hash", map);}
    ​@Testvoid testBigString() {for (int i = 1; i <= 100000; i++) {jedis.set("test:str:key_" + i, "value_" + i);}}
    ​@Testvoid testSmallHash() {int hashSize = 100;Map<String, String> map = new HashMap<>(hashSize);for (int i = 1; i <= 100000; i++) {int k = (i - 1) / hashSize;int v = i % hashSize;map.put("key_" + v, "value_" + v);if (v == 0) {jedis.hmset("test:small:hash_" + k, map);}}}
    ​@AfterEachvoid tearDown() {if (jedis != null) {jedis.close();}}
    }

    2、批处理优化

    2.1、Pipeline

    2.1.1、我们的客户端与redis服务器是这样交互的

    单个命令的执行流程 

    N条命令的执行流程

    多条指令批量的传输流程

    2.1.2、MSet

    利用mset批量插入10万条数据

    @Test
    void testMxx() {String[] arr = new String[2000];int j;long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {j = (i % 1000) << 1;arr[j] = "test:key_" + i;arr[j + 1] = "value_" + i;if (j == 0) {jedis.mset(arr);}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
    }

    注意:不要在一次处理中传输太多命令,否则单次命令占用宽带过多,会导致网络阻塞


    2.1.3、Pipeline

    MSET虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用Pipeline

    @Test
    void testPipeline() {// 创建管道Pipeline pipeline = jedis.pipelined();long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {// 放入命令到管道pipeline.set("test:key_" + i, "value_" + i);if (i % 1000 == 0) {// 每放入1000条命令,批量执行pipeline.sync();}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
    }

    2.2、集群下的批处理

    Redis 集群要求批处理命令(如 MSET 或包含多 Key 的 Pipeline)中的所有 Key 必须位于同一个哈希槽 (Slot),否则会执行失败。这在实际应用中是个挑战,因为批处理的数据 Key 天然可能分布在不同的 Slot 和节点上

    针对此限制,主要有四种解决方案

    1. 串行单命令执行:

      • 做法: 将批处理拆分成独立的单条命令,依次发送执行。

      • 优点: 实现极其简单。

      • 缺点: 网络往返次数最多,耗时最长,性能最差,通常没有实用价值。

    2. 按 Slot 分组串行执行:

      • 做法: 客户端预先计算每个 Key 所属的 Slot。将相同 Slot 的 Key 分组。对每个分组分别执行 Pipeline 批处理。各组命令串行执行(即一组执行完再执行下一组)。

      • 优点: 比方案 1 大幅减少网络往返次数(每组内 Pipeline 优化),耗时显著降低。

      • 缺点: 实现比方案 1 复杂。不同 Slot 组的命令仍需等待,存在串行等待时间,非最优。

    3. 按 Slot 分组并行执行:

      • 做法: 同样是预先按 Slot 分组 Key。关键区别在于,不同 Slot 组的 Pipeline 命令是并行发送执行的(例如利用多线程或异步 I/O)。

      • 优点: 在方案 2 的基础上,消除了不同 Slot 组间的串行等待时间,耗时最短(仅受最慢的那个 Slot 组执行时间影响)。

      • 缺点: 实现最复杂,需要客户端管理并发和结果聚合。

    4. 使用 Hash Tag:

      • 做法: 设计 Key 时加入 {hashtag}。Redis 集群仅根据 { 和 } 之间的字符串计算 Slot。确保相关 Key 使用相同的 {hashtag} 部分,它们就会落在同一个 Slot。

      • 优点: 实现相对简单(主要在 Key 设计)。一次 Pipeline 即可完成所有操作,网络开销最小,耗时最低。

      • 缺点: 强依赖 Key 设计。如果大量 Key 使用相同的 {hashtag},会导致数据分布不均(数据倾斜/热点问题),影响集群扩展性和性能。

    总结与推荐:

    • 方案 4 (Hash Tag) 性能最优且实现较简单,是首选方案,但务必谨慎设计 {hashtag} 以避免严重的数据倾斜(例如,使用用户 ID 后几位作为 tag 的一部分)。

    • 方案 3 (按 Slot 分组并行执行) 在无法使用 Hash Tag 或必须避免潜在倾斜风险时,是实现高性能批处理的推荐方案。它克服了方案 2 的串行瓶颈,性能接近方案 4。(最为推荐)

    • 方案 2 (按 Slot 分组串行执行) 是方案 3 的简化版,在并行实现困难时可作为过渡方案,但其串行特性限制了性能上限。

    • 方案 1 (串行单命令) 性能过低,不推荐

    2.2.1 串行化执行代码实践
    public class JedisClusterTest {private JedisCluster jedisCluster;@BeforeEachvoid setUp() {// 配置连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);poolConfig.setMaxWaitMillis(1000);HashSet<HostAndPort> nodes = new HashSet<>();nodes.add(new HostAndPort("192.168.150.101", 7001));nodes.add(new HostAndPort("192.168.150.101", 7002));nodes.add(new HostAndPort("192.168.150.101", 7003));nodes.add(new HostAndPort("192.168.150.101", 8001));nodes.add(new HostAndPort("192.168.150.101", 8002));nodes.add(new HostAndPort("192.168.150.101", 8003));jedisCluster = new JedisCluster(nodes, poolConfig);}@Testvoid testMSet() {jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");}@Testvoid testMSet2() {Map<String, String> map = new HashMap<>(3);map.put("name", "Jack");map.put("age", "21");map.put("sex", "Male");//对Map数据进行分组。根据相同的slot放在一个分组//key就是slot,value就是一个组Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet().stream().collect(Collectors.groupingBy(entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())));//串行的去执行mset的逻辑for (List<Map.Entry<String, String>> list : result.values()) {String[] arr = new String[list.size() * 2];int j = 0;for (int i = 0; i < list.size(); i++) {j = i<<2;Map.Entry<String, String> e = list.get(0);arr[j] = e.getKey();arr[j + 1] = e.getValue();}jedisCluster.mset(arr);}}@AfterEachvoid tearDown() {if (jedisCluster != null) {jedisCluster.close();}}
    }
    2.2.2 Spring集群环境下批处理代码
       @Testvoid testMSetInCluster() {Map<String, String> map = new HashMap<>(3);map.put("name", "Rose");map.put("age", "21");map.put("sex", "Female");stringRedisTemplate.opsForValue().multiSet(map);List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));strings.forEach(System.out::println);}

    原理分析

     RedisAdvancedClusterAsyncCommandsImpl 类中的核心流程:

    1. 按 Slot 分组数据
      根据 key 的哈希槽(slot)将数据分区到 Map<Integer, ...> 中:

      • Key → 目标 slot 值

      • Value → 相同 slot 的所有键值对集合

    2. 异步批量操作
      通过 RedisFuture<String> mset = super.mset(op) 将分组后的数据异步发送到对应 slot 所在的集群节点执行。


    3、服务器端优化-持久化配置

    4、服务器端优化-慢查询优化

    4.1 什么是慢查询

    慢查询的危害:由于Redis是单线程的,所以当客户端发出指令后,他们都会进入到redis底层的queue来执行,如果此时有一些慢查询的数据,就会导致大量请求阻塞,从而引起报错,所以我们需要解决慢查询问题。

    慢查询的阈值可以通过配置指定:

    slowlog-log-slower-than:慢查询阈值,单位是微秒。默认是10000,建议1000

    slowlog-max-len:慢查询日志(本质是一个队列)的长度。默认是128,建议1000

    慢查询会被放入慢查询日志中,日志的长度有上限,可以通过配置指定:

    4.2 如何查看慢查询

    知道了以上内容之后,那么咱们如何去查看慢查询日志列表呢:

    • slowlog len:查询慢查询日志长度

    • slowlog get [n]:读取n条慢查询日志

    • slowlog reset:清空慢查询列表

    也可使用redis客户端进行查看

    5、服务器端优化-命令及安全配置

    安全可以说是服务器端一个非常重要的话题,如果安全出现了问题,那么一旦这个漏洞被一些坏人知道了之后,并且进行攻击,那么这就会给咱们的系统带来很多的损失,所以我们这节课就来解决这个问题。

    进入

    6、服务器端优化-Redis内存划分和内存配置

      接下来我们看到了这些配置,最关键的缓存区内存如何定位和解决呢?

      7、服务器端集群优化-集群还是主从

      问题1:集群完整性问题

      问题2:集群带宽问题

      那我们到底是集群还是主从

      单体Redis(主从Redis)已经能达到万级别的QPS,并且也具备很强的高可用特性。如果主从能满足业务需求的情况下,所以如果不是在万不得已的情况下,尽量不搭建Redis集群


      至此,大功告成!🎉🎉🎉

      http://www.dtcms.com/a/292232.html

      相关文章:

    • VUE 中父级组件使用JSON.stringify 序列化子组件传递循环引用错误
    • TDengine时序数据库 详解
    • 扣子Coze智能体实战:自动化拆解抖音对标账号,输出完整分析报告(喂饭级教程)
    • STM32-SPI全双工同步通信
    • 什么是分布式事务,分布式事务的解决方案有哪些?
    • PyTorch 模型开发全栈指南:从定义、修改到保存的完整闭环
    • 自编码器表征学习:重构误差与隐空间拓扑结构的深度解析
    • vue2.0 + elementui + i18n:实现多语言功能
    • 智能Agent场景实战指南 Day 18:Agent决策树与规划能力
    • SpringBoot+Mybatis+MySQL+Vue+ElementUI前后端分离版:权限管理(三)
    • Class10简洁实现
    • 图解Spring的循环依赖
    • 2025茶吧机语音控制集成方案
    • 深入解析Hadoop中的推测执行:原理、算法与策略
    • 【华为机试】684. 冗余连接
    • Python编程进阶知识之第三课处理数据(numpy)
    • LSTM+Transformer炸裂创新 精准度至95.65%
    • 【C++】复习重点-汇总2-面向对象(三大特性、类/对象、构造函数、继承与派生、多态、抽象类、this/对象指针、友元、运算符重载、static、类/结构体)
    • vscode gdb调试c语言过程
    • IDEA-自动格式化代码
    • IDEA全局Maven配置
    • 【IDEA】如何在IDEA中通过git创建项目?
    • 【C++】nlohmann/json
    • 哔哩哔哩视觉算法面试30问全景精解
    • Kafka单条消息长度限制详解及Java实战指南
    • 新品如何通过广告投放精准获取流量实现快速增长
    • 【RAG优化】PDF复杂表格解析问题分析
    • 北宋政治模拟(deepseek)
    • 力扣面试150题--寻找峰值
    • 如何为每个参数案例自动执行当前数据集