当前位置: 首页 > wzjs >正文

金塔凯元建设集团有限公司官方网站百度客服人工在线咨询电话

金塔凯元建设集团有限公司官方网站,百度客服人工在线咨询电话,济南科技市场做网站,网站设计与开发的基本步骤包括哪些引言:Redis集群与槽位分片 Redis Cluster通过16384个虚拟槽位实现分布式存储,客户端需维护槽位与节点的映射关系以正确路由请求。作为Java生态最成熟的Redis客户端,Jedis 3.5.2实现了智能的槽位管理机制,本文将结合源码深入剖析其…

引言:Redis集群与槽位分片

Redis Cluster通过16384个虚拟槽位实现分布式存储,客户端需维护槽位与节点的映射关系以正确路由请求。作为Java生态最成熟的Redis客户端,Jedis 3.5.2实现了智能的槽位管理机制,本文将结合源码深入剖析其设计精髓。


一、槽位信息获取:从协议层到数据解析

1.1 命令执行链分析

Jedis通过多层封装实现CLUSTER SLOTS命令的发送:

 
 

// 入口方法 public List<Object> clusterSlots() { checkIsInMultiOrPipeline(); client.clusterSlots(); // 委托给Client对象 return client.getObjectMultiBulkReply(); } // 命令构建链 public void clusterSlots() { cluster(Protocol.CLUSTER_SLOTS); // 添加子命令 } public void cluster(final String subcommand) { final byte[][] arg = new byte[1][]; arg[0] = SafeEncoder.encode(subcommand); // 编码为二进制 cluster(arg); } public void cluster(final byte[]... args) { sendCommand(CLUSTER, args); // 核心发送方法 }

1.2 网络层协议实现

sendCommand方法通过Socket连接发送命令:

 
 

public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { connect(); // 建立Socket连接 // 协议编码并发送 Protocol.sendCommand(outputStream, cmd, args); } catch (JedisConnectionException ex) { // 异常时尝试读取Redis错误信息 String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } broken = true; // 标记连接损坏 throw ex; } }

​协议层关键点​​:使用Redis序列化协议(RESP)将命令编码为二进制流,错误处理中额外尝试读取错误信息提升可诊断性。

1.3 响应数据结构解析

Redis返回的槽位信息是嵌套列表结构:

 
 

1) 1) 0 // 起始槽位 2) 5460 // 结束槽位 3) 1) "192.168.0.1" // 主节点IP 2) 6379 // 主节点端口 3) "d31d7c55" // 节点ID 4) 1) "192.168.0.2" // 从节点信息 2) 6380 3) "e73b8f41"

解析流程通过递归式类型转换实现:

 
 

List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; // 1. 提取槽位范围 [startSlot, endSlot] List<Integer> slotNums = getAssignedSlotArray(slotInfo); // 2. 遍历节点信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); // 构建主机端口对象 HostAndPort targetNode = new HostAndPort( SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue() ); // 3. 仅主节点承担槽位 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } }

​数据处理技巧​​:通过MASTER_NODE_INDEX常量(值为2)定位主节点信息,避开硬编码索引。


二、槽位映射的初始化与动态更新

2.1 集群连接初始化流程

当创建JedisCluster实例时,触发槽位缓存初始化:

 
 

private void initializeSlotsCache(Set<HostAndPort> startNodes, ...) { for (HostAndPort hostAndPort : startNodes) { try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), ...)) { // 认证与客户端设置 if (password != null) jedis.auth(password); if (clientName != null) jedis.clientSetname(clientName); // 关键操作:发现节点与槽位 cache.discoverClusterNodesAndSlots(jedis); return; // 成功即退出 } catch (JedisConnectionException e) { // 尝试下一个种子节点 } } }

​容错设计​​:遍历种子节点直到成功连接,保证即使部分节点不可用仍能初始化。

2.2 槽位分配的并发控制

槽位映射操作通过读写锁保证线程安全:

 
 

public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); // 获取写锁 try { reset(); // 清空旧数据 List<Object> slots = jedis.clusterSlots(); // ... 解析过程 ... } finally { w.unlock(); // 释放锁 } } public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); // 更新槽位映射 } } finally { w.unlock(); } }

​并发策略​​:ReentrantReadWriteLock实现读写分离,操作槽位映射时使用写锁,请求路由时使用读锁。

2.3 异常驱动的动态更新机制

当发生节点故障或集群拓扑变更时,触发槽位刷新:

​MOVED重定向更新​

 
 

catch (JedisMovedDataException jre) { this.connectionHandler.renewSlotCache(connection); releaseConnection(connection); return runWithRetries(slot, attempts - 1, false, jre); }

​连接异常更新​

 
 

catch (JedisConnectionException jce) { releaseConnection(connection); if (attempts <= 1) { this.connectionHandler.renewSlotCache(); // 强制刷新 } return runWithRetries(slot, attempts - 1, true, null); }

2.4 槽位刷新的容错策略

renewClusterSlots方法实现了多级回退机制:

 
 

public void renewClusterSlots(Jedis jedis) { if (!rediscovering) { // 避免重复更新 w.lock(); try { rediscovering = true; // 第一优先级:使用现有连接 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { /* 忽略 */ } } // 第二优先级:遍历节点池(随机顺序) for (JedisPool jp : getShuffledNodesPool()) { try (Jedis j = jp.getResource()) { discoverClusterSlots(j); return; } catch (Exception e) { /* 忽略继续尝试 */ } } } finally { rediscovering = false; w.unlock(); } } }

​优化亮点​​:通过getShuffledNodesPool()随机化节点访问顺序,避免总是从固定节点获取数据。


三、架构设计哲学与实践启示

3.1 分层抽象设计

Jedis采用清晰的三层架构:

  1. ​应用层​​:JedisCluster提供集群API
  2. ​路由层​​:ConnectionHandler管理槽位映射
  3. ​连接层​​:JedisSlotCache维护节点连接池
 
 

graph TD A[JedisCluster] -->|调用| B[ConnectionHandler] B -->|读写| C[JedisSlotCache] C -->|维护| D[槽位-节点映射] C -->|管理| E[节点连接池]

调用

读写

维护

管理

JedisCluster

ConnectionHandler

JedisSlotCache

槽位-节点映射

节点连接池

3.2 资源管控最佳实践

Jedis严格遵循"借-还"模式:

 
 

try { connection = connectionHandler.getConnectionFromSlot(slot); return execute(connection); } finally { releaseConnection(connection); // 确保释放 }

通过finally块保证任何执行路径下连接都会被归还,防止连接泄漏。

3.3 生产环境调优建议
  1. ​超时参数配置​

     
     

    new JedisPoolConfig().setMaxWaitMillis(500); new JedisCluster(nodes, 2000, 3000, 5, password); // 参数:连接超时/读写超时/最大重试次数

  2. ​监控关键指标​

    • 槽位刷新频率(突增预示集群不稳定)
    • MOVED/ASK重定向比例
    • 节点连接失败率
  3. ​自定义故障转移策略​

     
     

    public class CustomConnectionHandler extends JedisSlotBasedConnectionHandler { @Override public void renewSlotCache() { // 添加自定义节点选择逻辑 } }

##源码

jedis 3.5.2版本
向服务器发送请求,获取服务器槽点信息
public List<Object> clusterSlots() {checkIsInMultiOrPipeline();client.clusterSlots();return client.getObjectMultiBulkReply();}public void clusterSlots() {cluster(Protocol.CLUSTER_SLOTS);}  public void cluster(final String subcommand) {final byte[][] arg = new byte[1][];arg[0] = SafeEncoder.encode(subcommand);cluster(arg);}  public void cluster(final byte[]... args) {sendCommand(CLUSTER, args);}public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {try {connect();Protocol.sendCommand(outputStream, cmd, args);} catch (JedisConnectionException ex) {/** When client send request which formed by invalid protocol, Redis send back error message* before close connection. We try to read it to provide reason of failure.*/try {String errorMessage = Protocol.readErrorLineIfPossible(inputStream);if (errorMessage != null && errorMessage.length() > 0) {ex = new JedisConnectionException(errorMessage, ex.getCause());}} catch (Exception e) {/** Catch any IOException or JedisConnectionException occurred from InputStream#read and just* ignore. This approach is safe because reading error message is optional and connection* will eventually be closed.*/}// Any other exceptions related to connection?broken = true;throw ex;}}   初始化客户端槽点信息  
private void initializeSlotsCache(Set<HostAndPort> startNodes,int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {for (HostAndPort hostAndPort : startNodes) {try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) { if (user != null) {jedis.auth(user, password);} else if (password != null) {jedis.auth(password);}if (clientName != null) {jedis.clientSetname(clientName);}cache.discoverClusterNodesAndSlots(jedis);return;} catch (JedisConnectionException e) {// try next nodes}}}     public void discoverClusterNodesAndSlots(Jedis jedis) {w.lock();try {reset();List<Object> slots = jedis.clusterSlots();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosint size = slotInfo.size();for (int i = MASTER_NODE_INDEX; i < size; i++) {List<Object> hostInfos = (List<Object>) slotInfo.get(i);if (hostInfos.isEmpty()) {continue;}HostAndPort targetNode = generateHostAndPort(hostInfos);setupNodeIfNotExist(targetNode);if (i == MASTER_NODE_INDEX) {assignSlotsToNode(slotNums, targetNode);}}}} finally {w.unlock();}}public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {w.lock();try {JedisPool targetPool = setupNodeIfNotExist(targetNode);for (Integer slot : targetSlots) {slots.put(slot, targetPool);}} finally {w.unlock();}}  服务端redis集群有节点宕机异常,重新请求槽点信息
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {if (attempts <= 0) {throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");}Jedis connection = null;try {if (redirect != null) {connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());if (redirect instanceof JedisAskDataException) {// TODO: Pipeline asking with the original command to make it faster....connection.asking();}} else {if (tryRandomNode) {connection = connectionHandler.getConnection();} else {connection = connectionHandler.getConnectionFromSlot(slot);}}return execute(connection);} catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {// release current connection before recursionreleaseConnection(connection);connection = null;if (attempts <= 1) {//We need this because if node is not reachable anymore - we need to finally initiate slots//renewing, or we can stuck with cluster state without one node in opposite case.//But now if maxAttempts = [1 or 2] we will do it too often.//TODO make tracking of successful/unsuccessful operations for node - do renewing only//if there were no successful responses from this node last few secondsthis.connectionHandler.renewSlotCache();}return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);} catch (JedisRedirectionException jre) {// if MOVED redirection occurred,if (jre instanceof JedisMovedDataException) {// it rebuilds cluster's slot cache recommended by Redis cluster specificationthis.connectionHandler.renewSlotCache(connection);}// release current connection before recursionreleaseConnection(connection);connection = null;return runWithRetries(slot, attempts - 1, false, jre);} finally {releaseConnection(connection);}}public void renewSlotCache() {cache.renewClusterSlots(null);}public void renewClusterSlots(Jedis jedis) {//If rediscovering is already in process - no need to start one more same rediscovering, just returnif (!rediscovering) {try {w.lock();if (!rediscovering) {rediscovering = true;try {if (jedis != null) {try {discoverClusterSlots(jedis);return;} catch (JedisException e) {//try nodes from all pools}}for (JedisPool jp : getShuffledNodesPool()) {Jedis j = null;try {j = jp.getResource();discoverClusterSlots(j);return;} catch (JedisConnectionException e) {// try next nodes} finally {if (j != null) {j.close();}}}} finally {rediscovering = false;      }}} finally {w.unlock();}}}  private void discoverClusterSlots(Jedis jedis) {List<Object> slots = jedis.clusterSlots();this.slots.clear();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosList<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);if (hostInfos.isEmpty()) {continue;}// at this time, we just use master, discard slave informationHostAndPort targetNode = generateHostAndPort(hostInfos);assignSlotsToNode(slotNums, targetNode);}}  

http://www.dtcms.com/wzjs/28381.html

相关文章:

  • yy直播赚钱吗优化什么
  • 企业内部网站建设网站网络推广公司运作
  • 重庆seo网站推广费用电脑培训学校网站
  • 设计网站导航大全怎么自己做一个网站
  • 电子商务网站建设也管理搜索app下载安装
  • 网站内容及实现方式谷歌自然排名优化
  • 做设计一般用的素材网站是什么意思百度推广开户怎么开
  • 为什么几年前做的网站视频看不了哪家培训机构好
  • 做网站的人属于什么行业佛山抖音seo
  • 淄博网站网站建设网站页面布局和样式设计
  • 专业商城网站建设报价百度app下载官方免费下载最新版
  • 标准物质网站建设seo入门培训教程
  • 成都高端网站制作公司云南网络营销seo
  • 做旅游网站多少钱网络营销的推广
  • 网站中的文章可以做排名吗厦门搜索引擎优化
  • 杂志网站建设google搜索引擎入口 镜像
  • 深鑫辉网站建设seo网站优化排名
  • 修文县生态文明建设局网站昆明seo建站
  • 婚纱网站推广app佣金平台正规
  • 蔡甸做网站创意营销案例
  • 住房和城乡建设部建造师网站我是新手如何做电商
  • 骨骼型的网站成功的网络营销案例有哪些
  • 视频营销发布平台包括鞍山seo优化
  • 自己做网站微商今日广东头条新闻
  • 黄页推广网页深圳百度搜索排名优化
  • 网站建设 上海福州seo招聘
  • 荔湾网站建设公司网络运营培训课程
  • 广告设计用的软件广州seo网站
  • 国外真人做爰直播聊天平台网站网站是怎么做的
  • 免费装修设计图福州百度推广优化排名