当前位置: 首页 > news >正文

Jedis集群管理:深入解析槽位信息的获取与动态更新机制

引言:Redis集群与槽位分片

Redis Cluster通过16384个虚拟槽位实现分布式存储,客户端需维护槽位与节点的映射关系以正确路由请求。作为Java生态最成熟的Redis客户端,Jedis 3.5.2实现了智能的槽位管理机制,本文将结合源码深入剖析其设计精髓。


一、槽位信息获取:从协议层到数据解析

1.1 命令执行链分析

Jedis通过多层封装实现CLUSTER SLOTS命令的发送:

 
 

// 入口方法 public List<Object> clusterSlots() { checkIsInMultiOrPipeline(); client.clusterSlots(); // 委托给Client对象 return client.getObjectMultiBulkReply(); } // 命令构建链 public void clusterSlots() { cluster(Protocol.CLUSTER_SLOTS); // 添加子命令 } public void cluster(final String subcommand) { final byte[][] arg = new byte[1][]; arg[0] = SafeEncoder.encode(subcommand); // 编码为二进制 cluster(arg); } public void cluster(final byte[]... args) { sendCommand(CLUSTER, args); // 核心发送方法 }

1.2 网络层协议实现

sendCommand方法通过Socket连接发送命令:

 
 

public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); // 建立Socket连接 // 协议编码并发送 Protocol.sendCommand(outputStream, cmd, args); } catch (JedisConnectionException ex) { // 异常时尝试读取Redis错误信息 String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } broken = true; // 标记连接损坏 throw ex; } }

​协议层关键点​​:使用Redis序列化协议(RESP)将命令编码为二进制流,错误处理中额外尝试读取错误信息提升可诊断性。

1.3 响应数据结构解析

Redis返回的槽位信息是嵌套列表结构:

 
 

1) 1) 0 // 起始槽位 2) 5460 // 结束槽位 3) 1) "192.168.0.1" // 主节点IP 2) 6379 // 主节点端口 3) "d31d7c55" // 节点ID 4) 1) "192.168.0.2" // 从节点信息 2) 6380 3) "e73b8f41"

解析流程通过递归式类型转换实现:

 
 

List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; // 1. 提取槽位范围 [startSlot, endSlot] List<Integer> slotNums = getAssignedSlotArray(slotInfo); // 2. 遍历节点信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); // 构建主机端口对象 HostAndPort targetNode = new HostAndPort( SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue() ); // 3. 仅主节点承担槽位 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } }

​数据处理技巧​​:通过MASTER_NODE_INDEX常量(值为2)定位主节点信息,避开硬编码索引。


二、槽位映射的初始化与动态更新

2.1 集群连接初始化流程

当创建JedisCluster实例时,触发槽位缓存初始化:

 
 

private void initializeSlotsCache(Set<HostAndPort> startNodes, ...) { for (HostAndPort hostAndPort : startNodes) { try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), ...)) { // 认证与客户端设置 if (password != null) jedis.auth(password); if (clientName != null) jedis.clientSetname(clientName); // 关键操作:发现节点与槽位 cache.discoverClusterNodesAndSlots(jedis); return; // 成功即退出 } catch (JedisConnectionException e) { // 尝试下一个种子节点 } } }

​容错设计​​:遍历种子节点直到成功连接,保证即使部分节点不可用仍能初始化。

2.2 槽位分配的并发控制

槽位映射操作通过读写锁保证线程安全:

 
 

public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); // 获取写锁 try { reset(); // 清空旧数据 List<Object> slots = jedis.clusterSlots(); // ... 解析过程 ... } finally { w.unlock(); // 释放锁 } } public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); // 更新槽位映射 } } finally { w.unlock(); } }

​并发策略​​:ReentrantReadWriteLock实现读写分离,操作槽位映射时使用写锁,请求路由时使用读锁。

2.3 异常驱动的动态更新机制

当发生节点故障或集群拓扑变更时,触发槽位刷新:

​MOVED重定向更新​

 
 

catch (JedisMovedDataException jre) { this.connectionHandler.renewSlotCache(connection); releaseConnection(connection); return runWithRetries(slot, attempts - 1, false, jre); }

​连接异常更新​

 
 

catch (JedisConnectionException jce) { releaseConnection(connection); if (attempts <= 1) { this.connectionHandler.renewSlotCache(); // 强制刷新 } return runWithRetries(slot, attempts - 1, true, null); }

2.4 槽位刷新的容错策略

renewClusterSlots方法实现了多级回退机制:

 
 

public void renewClusterSlots(Jedis jedis) { if (!rediscovering) { // 避免重复更新 w.lock(); try { rediscovering = true; // 第一优先级:使用现有连接 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { /* 忽略 */ } } // 第二优先级:遍历节点池(随机顺序) for (JedisPool jp : getShuffledNodesPool()) { try (Jedis j = jp.getResource()) { discoverClusterSlots(j); return; } catch (Exception e) { /* 忽略继续尝试 */ } } } finally { rediscovering = false; w.unlock(); } } }

​优化亮点​​:通过getShuffledNodesPool()随机化节点访问顺序,避免总是从固定节点获取数据。


三、架构设计哲学与实践启示

3.1 分层抽象设计

Jedis采用清晰的三层架构:

  1. ​应用层​​:JedisCluster提供集群API
  2. ​路由层​​:ConnectionHandler管理槽位映射
  3. ​连接层​​:JedisSlotCache维护节点连接池
 
 

graph TD A[JedisCluster] -->|调用| B[ConnectionHandler] B -->|读写| C[JedisSlotCache] C -->|维护| D[槽位-节点映射] C -->|管理| E[节点连接池]

调用

读写

维护

管理

JedisCluster

ConnectionHandler

JedisSlotCache

槽位-节点映射

节点连接池

3.2 资源管控最佳实践

Jedis严格遵循"借-还"模式:

 
 

try { connection = connectionHandler.getConnectionFromSlot(slot); return execute(connection); } finally { releaseConnection(connection); // 确保释放 }

通过finally块保证任何执行路径下连接都会被归还,防止连接泄漏。

3.3 生产环境调优建议
  1. ​超时参数配置​

     
     

    new JedisPoolConfig().setMaxWaitMillis(500); new JedisCluster(nodes, 2000, 3000, 5, password); // 参数:连接超时/读写超时/最大重试次数

  2. ​监控关键指标​

    • 槽位刷新频率(突增预示集群不稳定)
    • MOVED/ASK重定向比例
    • 节点连接失败率
  3. ​自定义故障转移策略​

     
     

    public class CustomConnectionHandler extends JedisSlotBasedConnectionHandler { @Override public void renewSlotCache() { // 添加自定义节点选择逻辑 } }

##源码

jedis 3.5.2版本
向服务器发送请求,获取服务器槽点信息
public List<Object> clusterSlots() {checkIsInMultiOrPipeline();client.clusterSlots();return client.getObjectMultiBulkReply();}public void clusterSlots() {cluster(Protocol.CLUSTER_SLOTS);}  public void cluster(final String subcommand) {final byte[][] arg = new byte[1][];arg[0] = SafeEncoder.encode(subcommand);cluster(arg);}  public void cluster(final byte[]... args) {sendCommand(CLUSTER, args);}public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {try {connect();Protocol.sendCommand(outputStream, cmd, args);} catch (JedisConnectionException ex) {/** When client send request which formed by invalid protocol, Redis send back error message* before close connection. We try to read it to provide reason of failure.*/try {String errorMessage = Protocol.readErrorLineIfPossible(inputStream);if (errorMessage != null && errorMessage.length() > 0) {ex = new JedisConnectionException(errorMessage, ex.getCause());}} catch (Exception e) {/** Catch any IOException or JedisConnectionException occurred from InputStream#read and just* ignore. This approach is safe because reading error message is optional and connection* will eventually be closed.*/}// Any other exceptions related to connection?broken = true;throw ex;}}   初始化客户端槽点信息  
private void initializeSlotsCache(Set<HostAndPort> startNodes,int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {for (HostAndPort hostAndPort : startNodes) {try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) { if (user != null) {jedis.auth(user, password);} else if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}cache.discoverClusterNodesAndSlots(jedis);return;} catch (JedisConnectionException e) {// try next nodes}}}     public void discoverClusterNodesAndSlots(Jedis jedis) {w.lock();try {reset();List<Object> slots = jedis.clusterSlots();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosint size = slotInfo.size();for (int i = MASTER_NODE_INDEX; i < size; i++) {List<Object> hostInfos = (List<Object>) slotInfo.get(i);if (hostInfos.isEmpty()) {continue;}HostAndPort targetNode = generateHostAndPort(hostInfos);setupNodeIfNotExist(targetNode);if (i == MASTER_NODE_INDEX) {assignSlotsToNode(slotNums, targetNode);}}}} finally {w.unlock();}}public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {w.lock();try {JedisPool targetPool = setupNodeIfNotExist(targetNode);for (Integer slot : targetSlots) {slots.put(slot, targetPool);}} finally {w.unlock();}}  服务端redis集群有节点宕机异常,重新请求槽点信息
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");}Jedis connection = null;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {// TODO: Pipeline asking with the original command to make it faster....connection.asking();}} else {if (tryRandomNode) {connection = connectionHandler.getConnection();} else {connection = connectionHandler.getConnectionFromSlot(slot);}}return execute(connection);} catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {// release current connection before recursionreleaseConnection(connection);connection = null;if (attempts <= 1) {//We need this because if node is not reachable anymore - we need to finally initiate slots//renewing, or we can stuck with cluster state without one node in opposite case.//But now if maxAttempts = [1 or 2] we will do it too often.//TODO make tracking of successful/unsuccessful operations for node - do renewing only//if there were no successful responses from this node last few secondsthis.connectionHandler.renewSlotCache();}return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);} catch (JedisRedirectionException jre) {// if MOVED redirection occurred,if (jre instanceof JedisMovedDataException) {// it rebuilds cluster's slot cache recommended by Redis cluster specificationthis.connectionHandler.renewSlotCache(connection);}// release current connection before recursionreleaseConnection(connection);connection = null;return runWithRetries(slot, attempts - 1, false, jre);} finally {releaseConnection(connection);}}public void renewSlotCache() {cache.renewClusterSlots(null);}public void renewClusterSlots(Jedis jedis) {//If rediscovering is already in process - no need to start one more same rediscovering, just returnif (!rediscovering) {try {w.lock();if (!rediscovering) {rediscovering = true;try {if (jedis != null) {try {discoverClusterSlots(jedis);return;} catch (JedisException e) {//try nodes from all pools}}for (JedisPool jp : getShuffledNodesPool()) {Jedis j = null;try {j = jp.getResource();discoverClusterSlots(j);return;} catch (JedisConnectionException e) {// try next nodes} finally {if (j != null) {j.close();}}}} finally {rediscovering = false;      }}} finally {w.unlock();}}}  private void discoverClusterSlots(Jedis jedis) {List<Object> slots = jedis.clusterSlots();this.slots.clear();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosList<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);if (hostInfos.isEmpty()) {continue;}// at this time, we just use master, discard slave informationHostAndPort targetNode = generateHostAndPort(hostInfos);assignSlotsToNode(slotNums, targetNode);}}  

相关文章:

  • C++ 对 C 的兼容性
  • Spring注解原理深度解析:从入门到精通
  • 【Linux】Ubuntu 创建应用图标的方式汇总,deb/appimage/通用方法
  • Java高级 | 【实验六】Springboot文件上传和下载
  • 《递推》题集
  • 【C++进阶篇】C++11新特性(下篇)
  • OpenLayers 从后端服务加载 GeoJSON 数据
  • 基于Spring Boot的云音乐平台设计与实现
  • day26-计算机网络-4
  • 新时代AI发展,更好的做自己
  • 8.库制作与原理
  • DDPM优化目标公式推导
  • 【Java开发日记】说一说 SpringBoot 中 CommandLineRunner
  • 【强连通分量 缩点 最长路 拓扑排序】P2656 采蘑菇|普及+
  • 游戏常用运行库合集 | GRLPackage 游戏运行库!
  • 机器学习期末复习
  • Dynamics 365 Finance + Power Automate 自动化凭证审核
  • day029-Shell自动化编程-计算与while循环
  • SOC-ESP32S3部分:33-声学前端模型ESP-SR
  • ViiTor实时翻译 2.4.2 | 完全免费的同声传译软件 实测识别率非常高 可以识别视频生成字幕
  • 网站建设补充协议模板/今日新闻最新头条10条
  • 怎么做网站页面代码搜索/域名交易域名出售
  • 做平台外卖的网站需要什么资质/网站搭建外贸
  • 南充网站建设/qq群推广网站免费
  • 厦门网页制作厦门小程序app/seo管理工具
  • 传媒公司主要做什么/优化网络培训