Jedis集群管理:深入解析槽位信息的获取与动态更新机制
引言:Redis集群与槽位分片
Redis Cluster通过16384个虚拟槽位实现分布式存储,客户端需维护槽位与节点的映射关系以正确路由请求。作为Java生态最成熟的Redis客户端,Jedis 3.5.2实现了智能的槽位管理机制,本文将结合源码深入剖析其设计精髓。
一、槽位信息获取:从协议层到数据解析
1.1 命令执行链分析
Jedis通过多层封装实现CLUSTER SLOTS
命令的发送:
// 入口方法 public List<Object> clusterSlots() { checkIsInMultiOrPipeline(); client.clusterSlots(); // 委托给Client对象 return client.getObjectMultiBulkReply(); } // 命令构建链 public void clusterSlots() { cluster(Protocol.CLUSTER_SLOTS); // 添加子命令 } public void cluster(final String subcommand) { final byte[][] arg = new byte[1][]; arg[0] = SafeEncoder.encode(subcommand); // 编码为二进制 cluster(arg); } public void cluster(final byte[]... args) { sendCommand(CLUSTER, args); // 核心发送方法 }
1.2 网络层协议实现
sendCommand
方法通过Socket连接发送命令:
public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); // 建立Socket连接 // 协议编码并发送 Protocol.sendCommand(outputStream, cmd, args); } catch (JedisConnectionException ex) { // 异常时尝试读取Redis错误信息 String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } broken = true; // 标记连接损坏 throw ex; } }
协议层关键点:使用Redis序列化协议(RESP)将命令编码为二进制流,错误处理中额外尝试读取错误信息提升可诊断性。
1.3 响应数据结构解析
Redis返回的槽位信息是嵌套列表结构:
1) 1) 0 // 起始槽位 2) 5460 // 结束槽位 3) 1) "192.168.0.1" // 主节点IP 2) 6379 // 主节点端口 3) "d31d7c55" // 节点ID 4) 1) "192.168.0.2" // 从节点信息 2) 6380 3) "e73b8f41"
解析流程通过递归式类型转换实现:
List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; // 1. 提取槽位范围 [startSlot, endSlot] List<Integer> slotNums = getAssignedSlotArray(slotInfo); // 2. 遍历节点信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); // 构建主机端口对象 HostAndPort targetNode = new HostAndPort( SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue() ); // 3. 仅主节点承担槽位 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } }
数据处理技巧:通过MASTER_NODE_INDEX常量(值为2)定位主节点信息,避开硬编码索引。
二、槽位映射的初始化与动态更新
2.1 集群连接初始化流程
当创建JedisCluster
实例时,触发槽位缓存初始化:
private void initializeSlotsCache(Set<HostAndPort> startNodes, ...) { for (HostAndPort hostAndPort : startNodes) { try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), ...)) { // 认证与客户端设置 if (password != null) jedis.auth(password); if (clientName != null) jedis.clientSetname(clientName); // 关键操作:发现节点与槽位 cache.discoverClusterNodesAndSlots(jedis); return; // 成功即退出 } catch (JedisConnectionException e) { // 尝试下一个种子节点 } } }
容错设计:遍历种子节点直到成功连接,保证即使部分节点不可用仍能初始化。
2.2 槽位分配的并发控制
槽位映射操作通过读写锁保证线程安全:
public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); // 获取写锁 try { reset(); // 清空旧数据 List<Object> slots = jedis.clusterSlots(); // ... 解析过程 ... } finally { w.unlock(); // 释放锁 } } public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); // 更新槽位映射 } } finally { w.unlock(); } }
并发策略:
ReentrantReadWriteLock
实现读写分离,操作槽位映射时使用写锁,请求路由时使用读锁。
2.3 异常驱动的动态更新机制
当发生节点故障或集群拓扑变更时,触发槽位刷新:
MOVED重定向更新
catch (JedisMovedDataException jre) { this.connectionHandler.renewSlotCache(connection); releaseConnection(connection); return runWithRetries(slot, attempts - 1, false, jre); }
连接异常更新
catch (JedisConnectionException jce) { releaseConnection(connection); if (attempts <= 1) { this.connectionHandler.renewSlotCache(); // 强制刷新 } return runWithRetries(slot, attempts - 1, true, null); }
2.4 槽位刷新的容错策略
renewClusterSlots
方法实现了多级回退机制:
public void renewClusterSlots(Jedis jedis) { if (!rediscovering) { // 避免重复更新 w.lock(); try { rediscovering = true; // 第一优先级:使用现有连接 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { /* 忽略 */ } } // 第二优先级:遍历节点池(随机顺序) for (JedisPool jp : getShuffledNodesPool()) { try (Jedis j = jp.getResource()) { discoverClusterSlots(j); return; } catch (Exception e) { /* 忽略继续尝试 */ } } } finally { rediscovering = false; w.unlock(); } } }
优化亮点:通过
getShuffledNodesPool()
随机化节点访问顺序,避免总是从固定节点获取数据。
三、架构设计哲学与实践启示
3.1 分层抽象设计
Jedis采用清晰的三层架构:
- 应用层:
JedisCluster
提供集群API - 路由层:
ConnectionHandler
管理槽位映射 - 连接层:
JedisSlotCache
维护节点连接池
graph TD A[JedisCluster] -->|调用| B[ConnectionHandler] B -->|读写| C[JedisSlotCache] C -->|维护| D[槽位-节点映射] C -->|管理| E[节点连接池]
调用
读写
维护
管理
JedisCluster
ConnectionHandler
JedisSlotCache
槽位-节点映射
节点连接池
3.2 资源管控最佳实践
Jedis严格遵循"借-还"模式:
try { connection = connectionHandler.getConnectionFromSlot(slot); return execute(connection); } finally { releaseConnection(connection); // 确保释放 }
通过finally
块保证任何执行路径下连接都会被归还,防止连接泄漏。
3.3 生产环境调优建议
-
超时参数配置
new JedisPoolConfig().setMaxWaitMillis(500); new JedisCluster(nodes, 2000, 3000, 5, password); // 参数:连接超时/读写超时/最大重试次数
-
监控关键指标
- 槽位刷新频率(突增预示集群不稳定)
- MOVED/ASK重定向比例
- 节点连接失败率
-
自定义故障转移策略
public class CustomConnectionHandler extends JedisSlotBasedConnectionHandler { @Override public void renewSlotCache() { // 添加自定义节点选择逻辑 } }
##源码
jedis 3.5.2版本
向服务器发送请求,获取服务器槽点信息
public List<Object> clusterSlots() {checkIsInMultiOrPipeline();client.clusterSlots();return client.getObjectMultiBulkReply();}public void clusterSlots() {cluster(Protocol.CLUSTER_SLOTS);} public void cluster(final String subcommand) {final byte[][] arg = new byte[1][];arg[0] = SafeEncoder.encode(subcommand);cluster(arg);} public void cluster(final byte[]... args) {sendCommand(CLUSTER, args);}public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {try {connect();Protocol.sendCommand(outputStream, cmd, args);} catch (JedisConnectionException ex) {/** When client send request which formed by invalid protocol, Redis send back error message* before close connection. We try to read it to provide reason of failure.*/try {String errorMessage = Protocol.readErrorLineIfPossible(inputStream);if (errorMessage != null && errorMessage.length() > 0) {ex = new JedisConnectionException(errorMessage, ex.getCause());}} catch (Exception e) {/** Catch any IOException or JedisConnectionException occurred from InputStream#read and just* ignore. This approach is safe because reading error message is optional and connection* will eventually be closed.*/}// Any other exceptions related to connection?broken = true;throw ex;}} 初始化客户端槽点信息
private void initializeSlotsCache(Set<HostAndPort> startNodes,int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {for (HostAndPort hostAndPort : startNodes) {try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) { if (user != null) {jedis.auth(user, password);} else if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}cache.discoverClusterNodesAndSlots(jedis);return;} catch (JedisConnectionException e) {// try next nodes}}} public void discoverClusterNodesAndSlots(Jedis jedis) {w.lock();try {reset();List<Object> slots = jedis.clusterSlots();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosint size = slotInfo.size();for (int i = MASTER_NODE_INDEX; i < size; i++) {List<Object> hostInfos = (List<Object>) slotInfo.get(i);if (hostInfos.isEmpty()) {continue;}HostAndPort targetNode = generateHostAndPort(hostInfos);setupNodeIfNotExist(targetNode);if (i == MASTER_NODE_INDEX) {assignSlotsToNode(slotNums, targetNode);}}}} finally {w.unlock();}}public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {w.lock();try {JedisPool targetPool = setupNodeIfNotExist(targetNode);for (Integer slot : targetSlots) {slots.put(slot, targetPool);}} finally {w.unlock();}} 服务端redis集群有节点宕机异常,重新请求槽点信息
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");}Jedis connection = null;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {// TODO: Pipeline asking with the original command to make it faster....connection.asking();}} else {if (tryRandomNode) {connection = connectionHandler.getConnection();} else {connection = connectionHandler.getConnectionFromSlot(slot);}}return execute(connection);} catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {// release current connection before recursionreleaseConnection(connection);connection = null;if (attempts <= 1) {//We need this because if node is not reachable anymore - we need to finally initiate slots//renewing, or we can stuck with cluster state without one node in opposite case.//But now if maxAttempts = [1 or 2] we will do it too often.//TODO make tracking of successful/unsuccessful operations for node - do renewing only//if there were no successful responses from this node last few secondsthis.connectionHandler.renewSlotCache();}return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);} catch (JedisRedirectionException jre) {// if MOVED redirection occurred,if (jre instanceof JedisMovedDataException) {// it rebuilds cluster's slot cache recommended by Redis cluster specificationthis.connectionHandler.renewSlotCache(connection);}// release current connection before recursionreleaseConnection(connection);connection = null;return runWithRetries(slot, attempts - 1, false, jre);} finally {releaseConnection(connection);}}public void renewSlotCache() {cache.renewClusterSlots(null);}public void renewClusterSlots(Jedis jedis) {//If rediscovering is already in process - no need to start one more same rediscovering, just returnif (!rediscovering) {try {w.lock();if (!rediscovering) {rediscovering = true;try {if (jedis != null) {try {discoverClusterSlots(jedis);return;} catch (JedisException e) {//try nodes from all pools}}for (JedisPool jp : getShuffledNodesPool()) {Jedis j = null;try {j = jp.getResource();discoverClusterSlots(j);return;} catch (JedisConnectionException e) {// try next nodes} finally {if (j != null) {j.close();}}}} finally {rediscovering = false; }}} finally {w.unlock();}}} private void discoverClusterSlots(Jedis jedis) {List<Object> slots = jedis.clusterSlots();this.slots.clear();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosList<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);if (hostInfos.isEmpty()) {continue;}// at this time, we just use master, discard slave informationHostAndPort targetNode = generateHostAndPort(hostInfos);assignSlotsToNode(slotNums, targetNode);}}