项目学习总结(5)
文章目录
- 概述
- 公用接口及类:
- 轮询算法及其实现:
- 一致性hash算法及其实现
- 心跳检测的实现
- 总结
概述
今天项目主要完成负载均衡算法(在客户端发起请求时进行负载均衡)的处理,实现了一致性hash,轮询,最短响应时间
公用接口及类:
1. LoadBalancer 接口:负载均衡的顶层规范
public interface LoadBalancer {/*** 根据服务名获取一个可用的服务地址* @param serviceName 服务名(如"com.htrpc.UserService")* @return 服务地址(IP+端口,InetSocketAddress类型)*/InetSocketAddress selectServiceAddress(String serviceName);
}
核心职责:定义 “根据服务名选择服务地址” 的顶层规范,不关心具体用什么算法(轮询、随机等),只要求最终返回一个可用的服务地址。
为什么这样设计:RPC 客户端需要的是 “给定服务名,拿到一个能调用的服务地址”,至于用什么算法选,由实现类决定,符合 “面向接口编程” 思想。
2. Selector 接口:具体选择算法的规范
public interface Selector {/*** 根据服务列表执行算法,获取下一个服务节点* @return 服务节点(InetSocketAddress)*/InetSocketAddress getNext();/*** 服务动态上下线时,重新平衡(如服务列表变化后重新初始化算法状态)*/void reBalance();
}
核心职责:规范 “具体选择算法” 的行为 ——getNext() 负责用特定算法(如轮询)从服务列表中选一个节点;reBalance() 用于服务列表变化时(如服务上线 / 下线)重新调整算法状态(今天还没做)。
为什么与 LoadBalancer 分离:一个服务名可能对应多个服务节点(如集群部署),Selector 专注于 “对同一服务的多个节点做选择”,而 LoadBalancer 专注于 “管理不同服务的选择器”,职责分离更清晰。
二、AbstractLoadBalancer:抽象类的 “模板 + 缓存” 作用
这个抽象类是整个设计的 “中间层”,实现了 LoadBalancer 接口,并封装了通用逻辑(如缓存管理),把 “具体算法的创建” 延迟到子类实现(模板方法设计模式)。
- 核心属性:cache 缓存服务名与选择器的映射
// 键:服务名(如"com.htrpc.UserService"),值:该服务对应的选择器(Selector)
private Map<String, Selector> cache = new ConcurrentHashMap<>(8);
缓存的意义:同一个服务名的服务列表可能被多次查询,缓存 Selector 避免重复创建(比如每次调用都重新从注册中心拉取服务列表、重新初始化选择器),提升性能。
2. 核心方法:selectServiceAddress 实现负载均衡的整体流程
@Override
public InetSocketAddress selectServiceAddress(String serviceName) {// 1. 从缓存中获取该服务名对应的选择器Selector selector = cache.get(serviceName);// 2. 如果缓存中没有,创建选择器并缓存if (selector == null) {// 2.1 从注册中心查询该服务名对应的所有可用服务节点List<InetSocketAddress> serviceList = htrpcBootstrap.getInstance().getRegistry().lookup(serviceName);// 2.2 调用抽象方法getSelector,由子类(如轮询算法)创建具体的选择器selector = getSelector(serviceList);// 2.3 存入缓存,下次直接使用cache.put(serviceName, selector);}// 3. 调用选择器的getNext()方法,获取最终的服务地址return selector.getNext();
}
流程拆解:
① 先查缓存,有则直接用已有的 Selector;
② 无则从注册中心拉取服务列表(lookup(serviceName)),再由子类创建对应算法的 Selector(如轮询的 RoundRobinSelector);
③ 最终通过 Selector 的 getNext() 拿到具体服务地址。
模板方法模式体现:getSelector 是抽象方法(protected abstract Selector getSelector(…)),由子类实现具体算法的选择器创建,父类控制整体流程,子类负责填充细节。
轮询算法及其实现:
有了上面的一些类之后就可以方便我们在选择负载均衡算法时进行扩展,下面我们先来看下最简单的轮询算法:
public class RoundRobinLoadBalancer extends AbstractLoadBalancer {@Overrideprotected Selector getSelector(List<InetSocketAddress> serviceList) {return new RoundRobinSelector(serviceList);}private static class RoundRobinSelector implements Selector{private List<InetSocketAddress> serviceList;private AtomicInteger index;public RoundRobinSelector(List<InetSocketAddress> serviceList) {this.serviceList = serviceList;this.index = new AtomicInteger(0);}@Overridepublic InetSocketAddress getNext() {if(serviceList == null || serviceList.size() == 0){log.error("进行负载均衡选取节点时发现服务列表为空");throw new LoadBalancerExecption();}InetSocketAddress address = serviceList.get(index.get());if(index.get() == serviceList.size() - 1){index.set(0);}else{index.incrementAndGet();}return address;}@Overridepublic void reBalance() {}}
}
下面来看下其具体流程:
1. getSelector 方法:创建轮询选择器
@Override
protected Selector getSelector(List<InetSocketAddress> serviceList) {return new RoundRobinSelector(serviceList);
}
作用:将 “服务列表” 传递给轮询选择器,由选择器负责后续的轮询逻辑,实现 “算法与负载均衡器” 的解耦。
2. 内部类 RoundRobinSelector:轮询算法的核心
这是轮询逻辑的具体实现,维护服务列表和一个计数器,通过 “计数器递增 + 边界重置” 实现轮流选择。
(1)核心属性
private List<InetSocketAddress> serviceList; // 服务节点列表(如[192.168.1.1:8080, 192.168.1.2:8080])
private AtomicInteger index; // 原子计数器(多线程安全,记录当前选择到的索引)
AtomicInteger index:为什么用原子类?因为 RPC 客户端可能多线程并发调用 getNext(),AtomicInteger 保证 index 的递增和重置是线程安全的(避免两个线程同时读取到相同的 index,导致选择同一个服务节点)。
(2)getNext 方法:轮询选择的核心逻辑
@Override
public InetSocketAddress getNext() {// 边界检查:服务列表为空时抛异常if (serviceList == null || serviceList.size() == 0) {log.error("进行负载均衡选取节点时发现服务列表为空");throw new LoadBalancerExecption();}// 1. 根据当前index获取服务节点InetSocketAddress address = serviceList.get(index.get());// 2. 更新index:如果是最后一个节点,重置为0;否则自增1if (index.get() == serviceList.size() - 1) {index.set(0);} else {index.incrementAndGet();}return address;
}
这样轮询算法就实现完了,简单来说就是在可选的服务列表中挨个选就好了,遇到边界就回到开头
一致性hash算法及其实现
首先简单来看下一致性hash算法的大概实现:
1.构建哈希环:将所有可能的哈希值(0~2³²-1)想象成一个环形空间(哈希环)。
2.节点映射:将服务节点(或其虚拟节点)通过哈希计算映射到环上的某个位置。
3.请求映射:将请求(如 RPC 的请求 ID)通过同样的哈希算法映射到环上,然后顺时针查找最近的服务节点作为目标节点。
优势:当服务节点上下线时,只有环上该节点附近的请求会受影响,大部分请求的目标节点不变,稳定性更高。
下面来看下具体代码:
@Slf4j
public class ConsisentHashBalancer extends AbstractLoadBalancer {@Overrideprotected Selector getSelector(List<InetSocketAddress> serviceList) {return new ConsisentHashSelector(serviceList,128);}/*** 一致性hash算法的具体实现*/private static class ConsisentHashSelector implements Selector{//hash环用来存储服务器节点private SortedMap<Integer,InetSocketAddress> circle = new TreeMap<>();private int virtualNodes;public ConsisentHashSelector(List<InetSocketAddress> serviceList,int virtualNodes) {//将节点转为虚拟节点进行挂载this.virtualNodes = virtualNodes;for (InetSocketAddress inetSocketAddress : serviceList) {//把每个节点传入到hash环中addNodeToCircle(inetSocketAddress);}}@Overridepublic InetSocketAddress getNext() {htrpcRequest htrpcRequest = htrpcBootstrap.REQUEST_THREAD_LOCAL.get();String requestId = Long.toString(htrpcRequest.getRequestId());int hash = hash(requestId);// 判断该hash值是否能直接落在一个服务器上if(!circle.containsKey(hash)){//寻找最近的节点SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();}return circle.get(hash);}/*** 将每个节点挂载到hash环上* @param inetSocketAddress 节点地址*/private void addNodeToCircle(InetSocketAddress inetSocketAddress) {//为每个节点生成匹配的虚拟节点进行挂载for (int i = 0; i < virtualNodes; i++) {int hash = hash(inetSocketAddress.toString() + "-" + i);circle.put(hash,inetSocketAddress);if(log.isDebugEnabled()){log.debug("hash为【{}】的节点已经挂载到环上",hash);}}}private int hash(String s){MessageDigest md;try{md = MessageDigest.getInstance("MD5");}catch (NoSuchAlgorithmException e){throw new RuntimeException(e);}byte[] digest = md.digest(s.getBytes());int res = 0;for (int i = 0; i < 4; i++) {res = res << 8;if(digest[i] < 0){res = res | (digest[i] & 255);}else{res = res | digest[i];}}return res;}private void removeNodeFromCircle(InetSocketAddress inetSocketAddress) {//为每个节点生成匹配的虚拟节点进行挂载for (int i = 0; i < virtualNodes; i++) {int hash = hash(inetSocketAddress.toString() + "-" + i);circle.remove(hash);}}
接下来我们重点看下每一步具体是怎么实现的:
1. 核心属性与初始化
// 用TreeMap作为哈希环(有序Map,key是哈希值,value是服务节点,方便快速查找)
private SortedMap<Integer, InetSocketAddress> circle = new TreeMap<>();
// 每个真实节点对应的虚拟节点数量
private int virtualNodes;// 构造方法:初始化时将所有服务节点(含虚拟节点)挂载到哈希环
public ConsisentHashSelector(List<InetSocketAddress> serviceList, int virtualNodes) {this.virtualNodes = virtualNodes;for (InetSocketAddress node : serviceList) {addNodeToCircle(node); // 挂载节点(含虚拟节点)}
}
TreeMap 的作用:SortedMap 提供了 tailMap 方法,能快速找到 “大于等于某个哈希值的所有节点”,是实现 “顺时针查找最近节点” 的关键。
虚拟节点(virtualNodes):解决 “真实节点少导致的哈希环分布不均” 问题。例如 1 个真实节点对应 128 个虚拟节点,这些虚拟节点在环上分散分布,让请求分配更均匀。
2. 挂载节点到哈希环(addNodeToCircle)
private void addNodeToCircle(InetSocketAddress node) {// 为每个真实节点生成virtualNodes个虚拟节点for (int i = 0; i < virtualNodes; i++) {// 虚拟节点标识:真实节点地址 + "-" + 序号(如"192.168.1.1:8080-0")String virtualNodeKey = node.toString() + "-" + i;// 计算虚拟节点的哈希值int hash = hash(virtualNodeKey);// 将哈希值与真实节点绑定,放入哈希环circle.put(hash, node);}
}
每个虚拟节点通过唯一标识(真实节点地址 + 序号)计算哈希,最终映射到环上,但实际指向的还是同一个真实节点。
例如:真实节点 A 对应 128 个虚拟节点,这些虚拟节点在环上的 128 个位置都会指向 A,提高 A 被请求命中的概率(概率与虚拟节点数量正相关)。
3. 选择目标服务节点(getNext)
@Override
public InetSocketAddress getNext() {// 1. 获取当前请求(从ThreadLocal中,确保线程安全)htrpcRequest request = htrpcBootstrap.REQUEST_THREAD_LOCAL.get();// 2. 用请求ID作为哈希计算的key(确保同一请求始终映射到同一节点)String requestId = Long.toString(request.getRequestId());int hash = hash(requestId); // 计算请求的哈希值// 3. 查找环上匹配的节点if (!circle.containsKey(hash)) { // 3.1 若哈希环上没有该哈希值,找“大于等于该哈希值”的子环SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);// 3.2 若子环为空(说明请求哈希值在环的末尾),取环的第一个节点;否则取子环的第一个节点hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();}// 4. 返回哈希值对应的服务节点return circle.get(hash);
}
核心逻辑:请求哈希值在环上 “找不到精确匹配时,顺时针找最近的节点”。例如:
哈希环上有节点 A(hash=100)、B(hash=200)、C(hash=300),请求哈希 = 150,则顺时针找到最近的 B(200)。
三、哈希值计算方法(hash 方法)
这是代码的核心细节,作用是将字符串(如虚拟节点标识、请求 ID)转换为一个 32 位整数(哈希值),对应哈希环上的位置。步骤拆解:
private int hash(String s) {MessageDigest md;try {md = MessageDigest.getInstance("MD5"); // 1. 使用MD5算法生成摘要} catch (NoSuchAlgorithmException e) {throw new RuntimeException(e);}byte[] digest = md.digest(s.getBytes()); // 2. 计算字符串的MD5摘要(16字节数组)int res = 0;for (int i = 0; i < 4; i++) { // 3. 取前4字节,转换为32位整数res = res << 8; // 左移8位,为新字节腾出位置if (digest[i] < 0) {res = res | (digest[i] & 255); // 处理负数字节} else {res = res | digest[i]; // 处理正数字节}}return res;
}
分步解析哈希计算:
MD5 摘要生成:
MessageDigest.getInstance(“MD5”) 获取 MD5 算法实例,md.digest(s.getBytes()) 对输入字符串的字节数组计算 MD5 摘要,得到一个 16 字节(128 位)的字节数组。MD5 的特点是输出固定长度,且分布均匀,适合作为哈希算法。
截取前 4 字节转换为 32 位整数:
16 字节的 MD5 摘要太长,我们只需要 32 位整数(对应哈希环的 0~2³²-1 范围),因此取前 4 字节(32 位)转换为 int:
循环处理 4 个字节:
res = res << 8:每次将结果左移 8 位(腾出一个字节的位置)。
res = res | …:将当前字节的值 “或” 到结果中,拼接成 32 位整数。
为什么处理负数字节?:
Java 中byte是有符号的(范围 - 128~127),例如字节0xFF在 Java 中是 - 1。通过 digest[i] & 255(255 是 0xFF),将有符号 byte 转为无符号 int(范围 0~255)。例如:
若digest[i] = -1(二进制11111111),-1 & 255 = 255(二进制00000000 00000000 00000000 11111111)。
举例:
假设前 4 字节是 [0x12, 0x34, 0x56, 0x78]:
第一次循环(i=0):res = 0 << 8 | 0x12 → 0x12
第二次循环(i=1):res = 0x12 << 8 | 0x34 → 0x1234
第三次循环(i=2):res = 0x1234 << 8 | 0x56 → 0x123456
第四次循环(i=3):res = 0x123456 << 8 | 0x78 → 0x12345678
最终得到哈希值 0x12345678(十进制对应 190887432)。
心跳检测的实现
在实现最短响应时间算法前我们需要先通过心跳检测得到每个服务节点的响应时间,由于之前的项目中已经实现过发请求了,心跳检测就只需要构造一个消息体为空的请求发出即可,注意在编解码器中做非空判断就好。同时我们不希望主线程每隔几秒发一次检测,因此新开了一个定时线程执行任务。代码如下:
public static void detectHeartbeat(String ServiceName){//1.从注册中心拉去列表并建立连接Registry registry = htrpcBootstrap.getInstance().getRegistry();List<InetSocketAddress> addresses = registry.lookup(ServiceName);//2.对连接进行缓存for (InetSocketAddress address : addresses) {try {if(!htrpcBootstrap.CHANNEL_CACHE.containsKey(address)) {Channel channel = NettyBootstrapInitilizer.getBootstrap().connect(address).sync().channel();htrpcBootstrap.CHANNEL_CACHE.put(address,channel);}} catch (InterruptedException e) {throw new RuntimeException(e);}}//3.定期发送消息Thread thread = new Thread(() ->new Timer().scheduleAtFixedRate(new MyTimerTask(), 0, 2000),"HeartbeatDetector-thread");thread.setDaemon(true);thread.start();}private static class MyTimerTask extends TimerTask{@Overridepublic void run() {htrpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();//遍历所有channelMap<InetSocketAddress, Channel> cache = htrpcBootstrap.CHANNEL_CACHE;for (Map.Entry<InetSocketAddress, Channel> entry : cache.entrySet()) {Channel channel = entry.getValue();long start = System.currentTimeMillis();//构建一个心跳请求htrpcRequest htrpcrequest = htrpcRequest.builder().requestId(RequestType.HEART_BEAT.getId()).compressType(CompressorFactory.getCompressor(htrpcBootstrap.COMPRESS_TYPE).getCode()).requestType(RequestType.HEART_BEAT.getId()).serializeType(SerializerFactory.getSerializer(htrpcBootstrap.SERIALIZE_TYPE).getCode()).timeStamp(start).build();CompletableFuture<Object> completableFuture = new CompletableFuture<>();// 需要将completabaFuture暴露出去htrpcBootstrap.PENDING_REQUEST.put(htrpcrequest.getRequestId(),completableFuture);channel.writeAndFlush(htrpcrequest).addListener((ChannelFutureListener) promise -> {if (!promise.isSuccess()) {completableFuture.completeExceptionally(promise.cause());}});Long endTime = 0L;try {completableFuture.get();endTime = System.currentTimeMillis();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}Long time = endTime - start;htrpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time,channel);log.debug("和服务器的响应时间为" + time);}}}
接下来由于我们使用的是TreeMap进行存储(可排序),因此只需取出其中第一个进行处理即可:
public InetSocketAddress getNext() {Map.Entry<Long, Channel> entry = htrpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.firstEntry();if(entry != null){return (InetSocketAddress) entry.getValue().remoteAddress();}Channel channel = (Channel)htrpcBootstrap.CHANNEL_CACHE.values().toArray()[0];return (InetSocketAddress) channel.remoteAddress();}
总结
今天主要完成了负载均衡以及心跳检测,但服务动态上下线感知还未实现