手写 RPC 框架
一、RPC 是什么?为什么需要 RPC?
在分布式系统中,我们经常需要调用其他服务器上的方法,比如用户服务需要调用订单服务的 “创建订单” 接口。如果直接通过 HTTP 调用,会面临序列化、网络通信、服务发现、负载均衡等一系列问题。RPC(Remote Procedure Call,远程过程调用) 就是为解决这些问题而生的技术 —— 它能让我们像调用本地方法一样调用远程服务,屏蔽分布式通信的复杂细节。
举个通俗的例子:你在公司电脑上想查看家里电脑的文件,不需要自己手动建立网络连接、处理数据传输,只需要双击 “远程桌面” 图标,就能像操作本地电脑一样操作家里的电脑。RPC 就相当于分布式系统中的 “远程桌面”,让跨服务调用变得简单。
1.1 RPC 的核心价值
- 透明化远程调用:调用远程方法和调用本地方法语法一致,开发者无需关注网络通信细节。
- 高性能通信:相比 HTTP,RPC 通常采用更轻量的协议(如自定义 TCP 协议)和更高效的序列化方式(如 Protobuf、FastJSON2),性能更高。
- 服务化支撑:RPC 框架通常集成服务注册发现、负载均衡、容错等能力,是微服务架构的基础。
1.2 RPC 调用的核心流程
为了让大家直观理解 RPC 的工作原理,我们用 Mermaid 流程图展示一次完整的 RPC 调用过程:

二、RPC 框架的核心架构设计
一个成熟的 RPC 框架需要包含 6 大核心组件,各组件职责清晰、协同工作。我们先通过架构图了解整体结构:

各组件的核心职责如下:
- 服务注册中心:存储服务名称与服务地址的映射关系(如 ZooKeeper、Nacos),支持服务注册和发现。
- 服务提供者:暴露本地服务,将服务信息注册到注册中心,接收并处理消费者的远程调用请求。
- 服务消费者:从注册中心获取服务地址,通过动态代理发起远程调用。
- 动态代理层:为消费者生成服务代理对象,将 “调用代理方法” 转为 “远程调用请求”。
- 网络传输层:负责跨节点的二进制数据传输(通常基于 Netty 的 NIO 模型实现)。
- 序列化层:将 Java 对象与二进制数据相互转换(解决 “对象不能跨网络传输” 的问题)。
三、环境搭建:确定技术栈与依赖
在开始编码前,我们先明确技术栈选型,所有组件均采用最新稳定版本,确保兼容性和性能:
| 组件 | 技术选型 | 版本 | 用途说明 |
|---|---|---|---|
| JDK | Oracle JDK | 17 | 基础开发环境 |
| 项目管理 | Maven | 3.9.6 | 依赖管理和项目构建 |
| 网络通信 | Netty | 4.1.100.Final | 高性能 NIO 通信框架,实现 TCP 传输 |
| 服务注册中心 | ZooKeeper | 3.9.2 | 存储服务地址,支持服务发现 |
| 序列化 | FastJSON2 | 2.0.48 | 高效 JSON 序列化 / 反序列化框架 |
| 动态代理 | JDK 动态代理 | 内置 | 生成服务代理对象 |
| 工具类 | Lombok、Spring Utils | Lombok 1.18.30 | 简化代码(@Slf4j)、判空工具等 |
| 接口文档 | Swagger3 | 2.2.0 | 生成 API 文档,方便测试 |
| 持久层(可选) | MyBatis-Plus | 3.5.5 | 若服务需要操作数据库,用于数据访问 |
3.1 核心依赖配置(pom.xml)
创建 Maven 项目,在pom.xml中添加以下依赖,所有版本均为 2024 年最新稳定版:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.ken.rpc</groupId><artifactId>ken-rpc-framework</artifactId><version>1.0.0</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!-- 依赖版本管理 --><netty.version>4.1.100.Final</netty.version><zookeeper.version>3.9.2</zookeeper.version><fastjson2.version>2.0.48</fastjson2.version><lombok.version>1.18.30</lombok.version><spring-context.version>6.1.5</spring-context.version><swagger.version>2.2.0</swagger.version><mybatis-plus.version>3.5.5</mybatis-plus.version><mysql.version>8.0.36</mysql.version></properties><dependencies><!-- 1. 基础工具类 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring-context.version}</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.1.0-jre</version></dependency><!-- 2. 网络通信:Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency><!-- 3. 服务注册中心:ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>${zookeeper.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><!-- 4. 序列化:FastJSON2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version></dependency><!-- 5. 接口文档:Swagger3 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>${swagger.version}</version></dependency><!-- 6. 持久层:MyBatis-Plus + MySQL(服务实现需操作数据库时使用) --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version><scope>runtime</scope></dependency></dependencies>
</project>
四、核心模块实现:从底层到上层
我们按照 “底层支撑→核心逻辑→上层应用” 的顺序实现 RPC 框架,每个模块都包含 “设计思路 + 可运行代码”,确保读者能直接复用。
4.1 序列化层:解决 “对象跨网络传输” 问题
设计思路:Java 对象不能直接通过网络传输,需要先转为二进制数据(序列化);服务端接收后,再将二进制转成 Java 对象(反序列化)。我们选择 FastJSON2 作为序列化工具(相比 JSON、Protobuf,它兼顾性能和易用性),定义统一的序列化接口,方便后续替换其他序列化方式。
4.1.1 序列化接口定义
package com.ken.rpc.serialize;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.ken.rpc.exception.SerializeException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;/*** 序列化工具类(基于FastJSON2实现)* 负责Java对象与二进制数据的相互转换** @author ken*/
@Slf4j
public class FastJson2Serializer implements Serializer {/*** 序列化:将Java对象转为字节数组** @param obj 待序列化的对象* @return 序列化后的字节数组* @throws SerializeException 序列化失败时抛出*/@Overridepublic byte[] serialize(Object obj) throws SerializeException {if (ObjectUtils.isEmpty(obj)) {log.error("序列化对象为空,无法执行序列化操作");throw new SerializeException("序列化对象不能为空");}try {// FastJSON2序列化:对象→JSON字符串→字节数组(指定UTF-8编码)return JSON.toJSONBytes(obj);} catch (Exception e) {log.error("对象序列化失败,对象类型:{},异常信息:{}", obj.getClass().getName(), e.getMessage(), e);throw new SerializeException("序列化失败:" + e.getMessage());}}/*** 反序列化:将字节数组转为指定类型的Java对象** @param bytes 待反序列化的字节数组* @param clazz 目标对象的Class类型* @param <T> 目标对象的泛型* @return 反序列化后的Java对象* @throws SerializeException 反序列化失败时抛出*/@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) throws SerializeException {if (ObjectUtils.isEmpty(bytes)) {log.error("反序列化字节数组为空,无法执行反序列化操作");throw new SerializeException("反序列化字节数组不能为空");}if (ObjectUtils.isEmpty(clazz)) {log.error("反序列化目标类型为空,无法执行反序列化操作");throw new SerializeException("反序列化目标类型不能为空");}try {// FastJSON2反序列化:字节数组→指定类型对象return JSON.parseObject(bytes, clazz);} catch (Exception e) {log.error("字节数组反序列化失败,目标类型:{},异常信息:{}", clazz.getName(), e.getMessage(), e);throw new SerializeException("反序列化失败:" + e.getMessage());}}/*** 反序列化(支持泛型类型,如List<User>、Map<String, Object>)** @param bytes 待反序列化的字节数组* @param typeReference FastJSON2的TypeReference,用于指定泛型类型* @param <T> 目标对象的泛型* @return 反序列化后的Java对象* @throws SerializeException 反序列化失败时抛出*/@Overridepublic <T> T deserialize(byte[] bytes, TypeReference<T> typeReference) throws SerializeException {if (ObjectUtils.isEmpty(bytes)) {log.error("反序列化字节数组为空,无法执行反序列化操作");throw new SerializeException("反序列化字节数组不能为空");}if (ObjectUtils.isEmpty(typeReference)) {log.error("反序列化泛型类型为空,无法执行反序列化操作");throw new SerializeException("反序列化泛型类型不能为空");}try {// 支持泛型的反序列化(如List<User>)return JSON.parseObject(bytes, typeReference);} catch (Exception e) {log.error("字节数组泛型反序列化失败,异常信息:{}", e.getMessage(), e);throw new SerializeException("泛型反序列化失败:" + e.getMessage());}}
}
4.1.2 序列化异常定义
为了统一异常处理,定义序列化相关的自定义异常:
package com.ken.rpc.exception;/*** 序列化/反序列化异常* 当对象与字节数组转换过程中出现错误时抛出** @author ken*/
public class SerializeException extends RuntimeException {public SerializeException(String message) {super(message);}public SerializeException(String message, Throwable cause) {super(message, cause);}
}
4.2 网络传输层:基于 Netty 实现高性能通信
设计思路:网络传输是 RPC 的核心底层支撑,需要解决 “高并发”“低延迟” 问题。Netty 是基于 NIO 的高性能通信框架,我们用它实现 TCP 服务端(服务提供者)和客户端(服务消费者),并定义统一的 “请求 / 响应” 数据结构,确保数据传输的完整性。
4.2.1 定义 RPC 请求 / 响应实体
首先定义 RPC 调用的 “请求” 和 “响应” 格式,所有网络传输的数据都遵循这个结构:
package com.ken.rpc.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** RPC请求实体* 封装消费者向提供者发送的调用信息** @author ken*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {/*** 请求ID:唯一标识一次RPC调用(用于解决请求与响应的对应问题)*/private String requestId;/*** 服务接口名称(如com.ken.rpc.service.UserService)* 用于服务提供者找到对应的实现类*/private String serviceName;/*** 方法名称(如getUserById)*/private String methodName;/*** 方法参数类型列表(如[java.lang.Long])* 用于服务提供者找到重载的方法*/private Class<?>[] parameterTypes;/*** 方法参数值列表(如[1001])*/private Object[] parameters;/*** 服务版本号(用于处理服务升级,如1.0.0、2.0.0)*/private String serviceVersion;
}
package com.ken.rpc.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** RPC响应实体* 封装服务提供者向消费者返回的结果** @author ken*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse implements Serializable {/*** 请求ID:与RpcRequest的requestId对应,确保响应匹配*/private String requestId;/*** 调用成功:返回的方法执行结果*/private Object result;/*** 调用失败:返回的异常信息*/private Throwable exception;/*** 判断本次RPC调用是否成功** @return true-成功,false-失败*/public boolean isSuccess() {return exception == null;}
}
4.2.2 Netty 服务端实现(服务提供者)
服务提供者需要启动 Netty 服务端,监听指定端口,接收消费者的请求并处理:
package com.ken.rpc.transport.netty;import com.ken.rpc.entity.RpcRequest;
import com.ken.rpc.entity.RpcResponse;
import com.ken.rpc.handler.RpcRequestHandler;
import com.ken.rpc.serialize.FastJson2Serializer;
import com.ken.rpc.serialize.Serializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;import java.net.InetAddress;
import java.net.UnknownHostException;/*** Netty RPC服务端* 负责启动服务、监听端口、接收并处理消费者的RPC请求** @author ken*/
@Slf4j
public class NettyRpcServer {/*** 服务端口号*/private final int port;/*** 服务注册中心地址(如zookeeper://127.0.0.1:2181)*/private final String registryAddress;/*** 序列化工具(默认使用FastJSON2)*/private final Serializer serializer;/*** RPC请求处理器(负责调用本地服务方法)*/private final RpcRequestHandler rpcRequestHandler;public NettyRpcServer(int port, String registryAddress) {this(port, registryAddress, new FastJson2Serializer(), new RpcRequestHandler());}public NettyRpcServer(int port, String registryAddress, Serializer serializer, RpcRequestHandler rpcRequestHandler) {this.port = port;this.registryAddress = registryAddress;this.serializer = serializer;this.rpcRequestHandler = rpcRequestHandler;}/*** 启动Netty服务端*/public void start() {// 1. 校验参数if (port <= 0 || port > 65535) {log.error("服务端口号非法,端口:{}(合法范围:1-65535)", port);throw new IllegalArgumentException("服务端口号非法:" + port);}if (!StringUtils.hasText(registryAddress)) {log.error("服务注册中心地址为空,无法启动服务");throw new IllegalArgumentException("服务注册中心地址不能为空");}// 2. 初始化Netty线程组// BossGroup:负责接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// WorkerGroup:负责处理客户端的IO请求EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 3. 配置服务端启动参数ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 配置服务端通道类型(NIO).channel(NioServerSocketChannel.class)// 日志打印(DEBUG级别,方便调试).handler(new LoggingHandler(LogLevel.DEBUG))// 配置客户端连接的通道初始化器.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// ① 粘包/拆包处理器:基于消息长度字段解决TCP粘包问题// 解码:读取消息时,先读取4字节的长度字段,再读取对应长度的消息体pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));// 编码:发送消息时,在消息体前添加4字节的长度字段pipeline.addLast(new LengthFieldPrepender(4));// ② 序列化/反序列化处理器:将字节数组转为Java对象(请求),将Java对象转为字节数组(响应)pipeline.addLast(new NettySerializerHandler(serializer, RpcRequest.class, RpcResponse.class));// ③ RPC请求处理器:处理请求,调用本地服务方法,生成响应pipeline.addLast(new NettyRpcServerHandler(rpcRequestHandler));}})// 服务端接收连接的队列大小.option(ChannelOption.SO_BACKLOG, 128)// 保持TCP连接(防止连接超时).childOption(ChannelOption.SO_KEEPALIVE, true);// 4. 绑定端口,启动服务(同步阻塞,直到服务启动完成)ChannelFuture channelFuture = serverBootstrap.bind(port).sync();log.info("Netty RPC服务端启动成功,监听端口:{},注册中心地址:{}", port, registryAddress);// 5. 注册服务到注册中心(后续章节实现)registerService();// 6. 阻塞等待通道关闭(服务停止时才会继续执行)channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("Netty RPC服务端启动过程被中断,异常信息:{}", e.getMessage(), e);} finally {// 7. 优雅关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();log.info("Netty RPC服务端已关闭");}}/*** 注册服务到注册中心(后续章节实现服务注册逻辑)*/private void registerService() {try {// 获取本地IP地址(服务提供者的IP)String localIp = InetAddress.getLocalHost().getHostAddress();log.info("准备注册服务到注册中心,服务地址:{}:{}", localIp, port);// TODO:后续章节实现ZooKeeper注册逻辑} catch (UnknownHostException e) {log.error("获取本地IP地址失败,无法注册服务,异常信息:{}", e.getMessage(), e);throw new RuntimeException("获取本地IP地址失败:" + e.getMessage());}}
}
4.2.3 Netty 客户端实现(服务消费者)
服务消费者需要通过 Netty 客户端发送 RPC 请求,并接收响应:
package com.ken.rpc.transport.netty;import com.ken.rpc.entity.RpcRequest;
import com.ken.rpc.entity.RpcResponse;
import com.ken.rpc.serialize.FastJson2Serializer;
import com.ken.rpc.serialize.Serializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** Netty RPC客户端* 负责与服务提供者建立连接,发送RPC请求,接收响应** @author ken*/
@Slf4j
public class NettyRpcClient {/*** 服务端地址(IP:端口)*/private final InetSocketAddress serverAddress;/*** 序列化工具(默认使用FastJSON2)*/private final Serializer serializer;/*** 存储未处理的RPC请求(key:requestId,value:CompletableFuture<RpcResponse>)* 用于将响应与请求匹配(Netty是异步通信,需要通过requestId关联)*/private final ConcurrentMap<String, CompletableFuture<RpcResponse>> unprocessedRequests;/*** Netty客户端通道(与服务端的连接通道)*/private Channel channel;/*** Netty事件循环组(负责处理IO事件)*/private final EventLoopGroup eventLoopGroup;public NettyRpcClient(InetSocketAddress serverAddress) {this(serverAddress, new FastJson2Serializer());}public NettyRpcClient(InetSocketAddress serverAddress, Serializer serializer) {this.serverAddress = serverAddress;this.serializer = serializer;this.unprocessedRequests = new ConcurrentHashMap<>();this.eventLoopGroup = new NioEventLoopGroup();// 初始化客户端连接this.initialize();}/*** 初始化Netty客户端,与服务端建立连接*/private void initialize() {// 1. 校验服务端地址if (ObjectUtils.isEmpty(serverAddress) || StringUtils.isEmpty(serverAddress.getHostName())) {log.error("服务端地址非法,无法建立连接");throw new IllegalArgumentException("服务端地址非法");}try {// 2. 配置客户端启动参数Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup)// 客户端通道类型(NIO).channel(NioSocketChannel.class)// 连接超时时间(3秒).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)// 保持TCP连接.option(ChannelOption.SO_KEEPALIVE, true)// 禁用Nagle算法(减少延迟,适合RPC小数据传输).option(ChannelOption.TCP_NODELAY, true)// 配置通道初始化器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// ① 粘包/拆包处理器(与服务端对应)pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));// ② 序列化/反序列化处理器(与服务端对应)pipeline.addLast(new NettySerializerHandler(serializer, RpcRequest.class, RpcResponse.class));// ③ 客户端响应处理器(接收服务端的响应,完成CompletableFuture)pipeline.addLast(new NettyRpcClientHandler(unprocessedRequests));}});// 3. 与服务端建立连接(同步阻塞,直到连接建立完成)ChannelFuture channelFuture = bootstrap.connect(serverAddress).sync();log.info("Netty RPC客户端与服务端建立连接成功,服务端地址:{}:{}",serverAddress.getHostName(), serverAddress.getPort());// 4. 保存通道引用,用于后续发送请求channel = channelFuture.channel();// 5. 监听通道关闭事件(连接断开时打印日志)channel.closeFuture().addListener(future -> {log.info("Netty RPC客户端与服务端连接断开,服务端地址:{}:{}",serverAddress.getHostName(), serverAddress.getPort());eventLoopGroup.shutdownGracefully();});} catch (InterruptedException e) {log.error("Netty RPC客户端建立连接过程被中断,服务端地址:{}:{},异常信息:{}",serverAddress.getHostName(), serverAddress.getPort(), e.getMessage(), e);eventLoopGroup.shutdownGracefully();}}/*** 发送RPC请求,返回CompletableFuture(异步获取响应)** @param rpcRequest RPC请求实体* @return 包含RPC响应的CompletableFuture*/public CompletableFuture<RpcResponse> sendRequest(RpcRequest rpcRequest) {// 1. 校验请求和通道if (ObjectUtils.isEmpty(rpcRequest) || StringUtils.isEmpty(rpcRequest.getRequestId())) {log.error("RPC请求非法(请求ID为空),无法发送请求");throw new IllegalArgumentException("RPC请求ID不能为空");}if (ObjectUtils.isEmpty(channel) || !channel.isActive()) {log.error("客户端通道未建立或已关闭,无法发送请求,服务端地址:{}:{}",serverAddress.getHostName(), serverAddress.getPort());throw new RuntimeException("客户端通道不可用,无法发送请求");}// 2. 创建CompletableFuture,用于接收响应CompletableFuture<RpcResponse> resultFuture = new CompletableFuture<>();// 3. 将请求ID与CompletableFuture关联,存入未处理请求MapunprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);// 4. 发送请求(Netty的writeAndFlush是异步操作,不会阻塞)channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {log.info("RPC请求发送成功,请求ID:{},服务名称:{},方法名称:{}",rpcRequest.getRequestId(), rpcRequest.getServiceName(), rpcRequest.getMethodName());} else {// 发送失败:移除未处理请求,完成CompletableFuture(异常)unprocessedRequests.remove(rpcRequest.getRequestId());resultFuture.completeExceptionally(future.cause());log.error("RPC请求发送失败,请求ID:{},异常信息:{}",rpcRequest.getRequestId(), future.cause().getMessage(), future.cause());}});return resultFuture;}/*** 关闭客户端(优雅关闭线程组和通道)*/public void close() {if (!ObjectUtils.isEmpty(channel) && channel.isActive()) {channel.close();}eventLoopGroup.shutdownGracefully();log.info("Netty RPC客户端已关闭,服务端地址:{}:{}",serverAddress.getHostName(), serverAddress.getPort());}
}
4.2.4 序列化 / 反序列化处理器
Netty 的ChannelHandler负责在通道中处理数据,我们需要自定义处理器,将 Netty 的ByteBuf与 RPC 的RpcRequest/RpcResponse相互转换:
package com.ken.rpc.transport.netty;import com.ken.rpc.entity.RpcRequest;
import com.ken.rpc.entity.RpcResponse;
import com.ken.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;import java.util.List;/*** Netty序列化/反序列化处理器* 包含编码器(Request→ByteBuf)和解码器(ByteBuf→Response)** @author ken*/
@Slf4j
public class NettySerializerHandler {/*** RPC请求编码器:将RpcRequest对象转为ByteBuf(发送给服务端)*/public static class RequestEncoder extends MessageToByteEncoder<RpcRequest> {private final Serializer serializer;public RequestEncoder(Serializer serializer) {this.serializer = serializer;}@Overrideprotected void encode(ChannelHandlerContext ctx, RpcRequest msg, ByteBuf out) throws Exception {if (ObjectUtils.isEmpty(msg)) {log.error("RPC请求为空,无法编码");throw new IllegalArgumentException("RPC请求不能为空");}// 1. 序列化RpcRequest为字节数组byte[] bytes = serializer.serialize(msg);// 2. 将字节数组写入ByteBuf(Netty会自动处理后续传输)out.writeBytes(bytes);log.debug("RPC请求编码完成,请求ID:{},字节长度:{}", msg.getRequestId(), bytes.length);}}/*** RPC响应解码器:将ByteBuf转为RpcResponse对象(接收服务端响应)* 使用ReplayingDecoder简化解码逻辑(无需手动处理半包问题)*/public static class ResponseDecoder extends ReplayingDecoder<Void> {private final Serializer serializer;public ResponseDecoder(Serializer serializer) {this.serializer = serializer;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (ObjectUtils.isEmpty(in)) {log.error("RPC响应ByteBuf为空,无法解码");return;}// 1. 读取ByteBuf中的所有字节int length = in.readableBytes();byte[] bytes = new byte[length];in.readBytes(bytes);// 2. 反序列化为RpcResponse对象RpcResponse rpcResponse = serializer.deserialize(bytes, RpcResponse.class);// 3. 将解码后的对象加入输出列表(交给下一个Handler处理)out.add(rpcResponse);log.debug("RPC响应解码完成,请求ID:{},响应是否成功:{}",rpcResponse.getRequestId(), rpcResponse.isSuccess());}}/*** 通用构造方法:创建编码器和解码器** @param serializer 序列化工具* @param requestCls 请求类(RpcRequest)* @param responseCls 响应类(RpcResponse)*/public NettySerializerHandler(Serializer serializer, Class<?> requestCls, Class<?> responseCls) {// 此处仅为统一初始化,实际编码器和解码器已在Server和Client的ChannelPipeline中添加if (!RpcRequest.class.equals(requestCls) || !RpcResponse.class.equals(responseCls)) {log.warn("不支持的请求/响应类型,建议使用默认的RpcRequest和RpcResponse");}}
}
4.3 服务注册中心:基于 ZooKeeper 实现服务发现
设计思路:服务注册中心是 RPC 框架的 “导航地图”,服务提供者启动时将 “服务名称→服务地址” 注册到 ZooKeeper,服务消费者通过服务名称从 ZooKeeper 获取所有可用的服务地址。我们采用 ZooKeeper 的持久节点存储服务根路径,临时节点存储具体的服务地址(服务下线时临时节点自动删除,实现服务健康检测)。
4.3.1 ZooKeeper 节点设计
定义 ZooKeeper 的节点结构,确保服务注册和发现的逻辑清晰:
- 根节点:
/ken-rpc(持久节点,框架的根路径) - 服务节点:
/ken-rpc/com.ken.rpc.service.UserService(持久节点,每个服务对应一个节点,名称为服务接口全限定名) - 服务地址节点:
/ken-rpc/com.ken.rpc.service.UserService/192.168.1.100:8080(临时节点,存储服务提供者的 IP 和端口,服务下线时自动删除)
4.3.2 ZooKeeper 工具类实现
封装 ZooKeeper 的连接、节点创建、节点监听等操作,提供统一的服务注册和发现接口:
package com.ken.rpc.registry.zookeeper;import com.ken.rpc.exception.RegistryException;
import com.ken.rpc.registry.ServiceRegistry;
import com.ken.rpc.util.NetUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 基于ZooKeeper的服务注册与发现实现** @author ken*/
@Slf4j
public class ZookeeperServiceRegistry implements ServiceRegistry, Watcher {/*** ZooKeeper会话超时时间(默认30秒)*/private static final int SESSION_TIMEOUT = 30000;/*** RPC框架在ZooKeeper中的根节点路径*/private static final String RPC_ROOT_PATH = "/ken-rpc";/*** ZooKeeper客户端实例*/private final ZooKeeper zookeeper;/*** 用于等待ZooKeeper连接建立的计数器*/private final CountDownLatch connectLatch;/*** 服务注册中心地址(如127.0.0.1:2181)*/private final String registryAddress;/*** 初始化ZooKeeper客户端并建立连接** @param registryAddress 服务注册中心地址(格式:IP:端口,多个地址用逗号分隔)* @throws RegistryException 注册中心连接失败时抛出*/public ZookeeperServiceRegistry(String registryAddress) throws RegistryException {if (!StringUtils.hasText(registryAddress)) {log.error("ZooKeeper注册中心地址为空,无法初始化客户端");throw new RegistryException("ZooKeeper注册中心地址不能为空");}this.registryAddress = registryAddress;this.connectLatch = new CountDownLatch(1);try {// 初始化ZooKeeper客户端(this为Watcher,监听连接状态)this.zookeeper = new ZooKeeper(registryAddress, SESSION_TIMEOUT, this);// 等待连接建立完成(最多等待10秒)boolean connected = connectLatch.await(10, TimeUnit.SECONDS);if (!connected) {log.error("ZooKeeper连接超时,地址:{},超时时间:10秒", registryAddress);throw new RegistryException("ZooKeeper连接超时:" + registryAddress);}log.info("ZooKeeper客户端初始化成功,注册中心地址:{}", registryAddress);// 检查并创建根节点(/ken-rpc)createRootNode();} catch (IOException e) {log.error("ZooKeeper客户端初始化失败(IO异常),地址:{},异常信息:{}", registryAddress, e.getMessage(), e);throw new RegistryException("ZooKeeper客户端初始化失败:" + e.getMessage());} catch (InterruptedException e) {log.error("ZooKeeper连接等待被中断,地址:{},异常信息:{}", registryAddress, e.getMessage(), e);Thread.currentThread().interrupt();throw new RegistryException("ZooKeeper连接等待被中断:" + e.getMessage());}}/*** 创建RPC根节点(/ken-rpc),持久节点*/private void createRootNode() {try {Stat stat = zookeeper.exists(RPC_ROOT_PATH, false);if (stat == null) {// 创建根节点:持久节点,开放所有权限zookeeper.create(RPC_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("ZooKeeper RPC根节点创建成功,路径:{}", RPC_ROOT_PATH);} else {log.info("ZooKeeper RPC根节点已存在,路径:{}", RPC_ROOT_PATH);}} catch (KeeperException | InterruptedException e) {log.error("ZooKeeper创建根节点失败,路径:{},异常信息:{}", RPC_ROOT_PATH, e.getMessage(), e);throw new RegistryException("创建ZooKeeper根节点失败:" + e.getMessage());}}/*** 服务注册:将服务名称与服务地址注册到ZooKeeper** @param serviceName 服务接口全限定名(如com.ken.rpc.service.UserService)* @param serviceAddress 服务地址(格式:IP:端口,如192.168.1.100:8080)* @throws RegistryException 服务注册失败时抛出*/@Overridepublic void register(String serviceName, String serviceAddress) throws RegistryException {// 1. 校验参数if (!StringUtils.hasText(serviceName)) {log.error("服务名称为空,无法注册服务");throw new RegistryException("服务名称不能为空");}if (!StringUtils.hasText(serviceAddress)) {log.error("服务地址为空,无法注册服务,服务名称:{}", serviceName);throw new RegistryException("服务地址不能为空");}if (!NetUtils.isValidAddress(serviceAddress)) {log.error("服务地址格式非法,无法注册服务,服务名称:{},地址:{}", serviceName, serviceAddress);throw new RegistryException("服务地址格式非法:" + serviceAddress);}try {// 2. 创建服务节点(/ken-rpc/服务名称),持久节点String servicePath = RPC_ROOT_PATH + "/" + serviceName;Stat serviceStat = zookeeper.exists(servicePath, false);if (serviceStat == null) {zookeeper.create(servicePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("ZooKeeper服务节点创建成功,路径:{}", servicePath);} else {log.info("ZooKeeper服务节点已存在,路径:{}", servicePath);}// 3. 创建服务地址节点(/ken-rpc/服务名称/IP:端口),临时节点(服务下线时自动删除)String addressPath = servicePath + "/" + serviceAddress;Stat addressStat = zookeeper.exists(addressPath, false);if (addressStat == null) {zookeeper.create(addressPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);log.info("ZooKeeper服务地址节点创建成功,路径:{},服务名称:{}", addressPath, serviceName);} else {log.info("ZooKeeper服务地址节点已存在,路径:{},服务名称:{}", addressPath, serviceName);}} catch (KeeperException | InterruptedException e) {log.error("服务注册失败,服务名称:{},地址:{},异常信息:{}", serviceName, serviceAddress, e.getMessage(), e);throw new RegistryException("服务注册失败:" + e.getMessage());}}/*** 服务发现:根据服务名称从ZooKeeper获取所有可用的服务地址** @param serviceName 服务接口全限定名* @return 服务地址列表(格式:IP:端口),无可用服务时返回空列表* @throws RegistryException 服务发现失败时抛出*/@Overridepublic List<String> discover(String serviceName) throws RegistryException {// 1. 校验参数if (!StringUtils.hasText(serviceName)) {log.error("服务名称为空,无法发现服务");throw new RegistryException("服务名称不能为空");}try {// 2. 拼接服务节点路径String servicePath = RPC_ROOT_PATH + "/" + serviceName;Stat serviceStat = zookeeper.exists(servicePath, false);if (serviceStat == null) {log.warn("ZooKeeper中不存在该服务节点,服务名称:{},路径:{}", serviceName, servicePath);return new ArrayList<>();}// 3. 获取服务节点下的所有地址节点(临时节点),并监听节点变化List<String> addressList = zookeeper.getChildren(servicePath, this);if (CollectionUtils.isEmpty(addressList)) {log.warn("服务无可用地址,服务名称:{},路径:{}", serviceName, servicePath);return new ArrayList<>();}// 4. 整理服务地址列表(地址节点名称即为服务地址)List<String> serviceAddressList = new ArrayList<>(addressList.size());for (String addressNode : addressList) {serviceAddressList.add(addressNode);}log.info("服务发现成功,服务名称:{},可用地址数量:{},地址列表:{}",serviceName, serviceAddressList.size(), serviceAddressList);return serviceAddressList;} catch (KeeperException | InterruptedException e) {log.error("服务发现失败,服务名称:{},异常信息:{}", serviceName, e.getMessage(), e);throw new RegistryException("服务发现失败:" + e.getMessage());}}/*** ZooKeeper事件监听回调(处理连接状态和节点变化)** @param event ZooKeeper事件*/@Overridepublic void process(WatchedEvent event) {Event.KeeperState state = event.getState();Event.EventType type = event.getType();String path = event.getPath();log.debug("ZooKeeper事件触发,状态:{},类型:{},路径:{}", state, type, path);// 处理连接状态事件if (state == Event.KeeperState.SyncConnected) {// 连接建立成功,倒计时器减1(释放等待线程)if (type == Event.EventType.None) {connectLatch.countDown();log.info("ZooKeeper连接建立成功,状态:{}", state);}} else if (state == Event.KeeperState.Disconnected) {log.warn("ZooKeeper连接已断开,状态:{}", state);} else if (state == Event.KeeperState.Expired) {log.error("ZooKeeper会话已过期,需要重新连接,状态:{}", state);// TODO:会话过期后重新初始化客户端(可选,增强可用性)}// 处理节点变化事件(服务地址新增/删除)if (type == Event.EventType.NodeChildrenChanged && path != null && path.startsWith(RPC_ROOT_PATH)) {log.info("服务地址节点发生变化,路径:{},重新发现服务", path);// 提取服务名称(路径格式:/ken-rpc/服务名称)String serviceName = path.substring(RPC_ROOT_PATH.length() + 1);if (StringUtils.hasText(serviceName)) {// 重新发现服务(更新本地服务地址缓存)discover(serviceName);}}}/*** 关闭ZooKeeper客户端连接*/public void close() {if (zookeeper != null) {try {zookeeper.close();log.info("ZooKeeper客户端已关闭,注册中心地址:{}", registryAddress);} catch (InterruptedException e) {log.error("ZooKeeper客户端关闭失败,注册中心地址:{},异常信息:{}", registryAddress, e.getMessage(), e);Thread.currentThread().interrupt();}}}
}
4.3.3 服务注册异常定义
package com.ken.rpc.exception;/*** 服务注册/发现异常* 当服务与注册中心交互(注册、发现、连接)过程中出现错误时抛出** @author ken*/
public class RegistryException extends RuntimeException {public RegistryException(String message) {super(message);}public RegistryException(String message, Throwable cause) {super(message, cause);}
}
4.4 动态代理层:让远程调用像本地调用一样简单
设计思路:服务消费者不能直接调用远程方法,需要通过 “代理对象” 间接调用。我们使用 JDK 动态代理(基于接口)生成代理对象,当消费者调用代理对象的方法时,代理对象会自动封装 RPC 请求、发送网络请求、接收响应并返回结果,从而实现 “本地调用” 的体验。
4.4.1 动态代理工厂实现
package com.ken.rpc.proxy;import com.ken.rpc.entity.RpcRequest;
import com.ken.rpc.entity.RpcResponse;
import com.ken.rpc.exception.RpcException;
import com.ken.rpc.loadbalance.LoadBalancer;
import com.ken.rpc.loadbalance.impl.RoundRobinLoadBalancer;
import com.ken.rpc.registry.ServiceRegistry;
import com.ken.rpc.registry.zookeeper.ZookeeperServiceRegistry;
import com.ken.rpc.transport.ClientTransport;
import com.ken.rpc.transport.netty.NettyRpcClient;
import com.ken.rpc.util.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;/*** RPC动态代理工厂* 为服务消费者生成代理对象,将本地方法调用转为远程RPC调用** @author ken*/
@Slf4j
public class RpcProxyFactory implements InvocationHandler {/*** 服务接口Class对象*/private final Class<?> serviceClass;/*** 服务版本号(默认1.0.0)*/private final String serviceVersion;/*** 服务注册中心(用于发现服务地址)*/private final ServiceRegistry serviceRegistry;/*** 负载均衡器(默认轮询算法)*/private final LoadBalancer loadBalancer;/*** 客户端传输层(用于发送RPC请求)*/private ClientTransport clientTransport;/*** 构造方法(默认使用ZooKeeper注册中心和轮询负载均衡)** @param serviceClass 服务接口Class对象* @param serviceVersion 服务版本号* @param registryAddress 注册中心地址*/public RpcProxyFactory(Class<?> serviceClass, String serviceVersion, String registryAddress) {this(serviceClass, serviceVersion,new ZookeeperServiceRegistry(registryAddress),new RoundRobinLoadBalancer());}public RpcProxyFactory(Class<?> serviceClass, String serviceVersion,ServiceRegistry serviceRegistry, LoadBalancer loadBalancer) {// 1. 校验参数if (ObjectUtils.isEmpty(serviceClass) || !serviceClass.isInterface()) {log.error("服务类非法,必须是接口类型,类名:{}", serviceClass);throw new IllegalArgumentException("服务类必须是接口类型:" + serviceClass);}this.serviceClass = serviceClass;// 2. 服务版本号默认1.0.0this.serviceVersion = StringUtils.hasText(serviceVersion) ? serviceVersion : "1.0.0";// 3. 校验注册中心和负载均衡器this.serviceRegistry = ObjectUtils.isEmpty(serviceRegistry) ? new ZookeeperServiceRegistry("127.0.0.1:2181") : serviceRegistry;this.loadBalancer = ObjectUtils.isEmpty(loadBalancer) ? new RoundRobinLoadBalancer() : loadBalancer;}/*** 创建代理对象** @return 服务接口的代理对象*/@SuppressWarnings("unchecked")public <T> T getProxy() {// 使用JDK动态代理生成代理对象return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),new Class[]{serviceClass},this);}/*** 代理对象方法调用回调(核心逻辑)* 当消费者调用代理对象的方法时,会执行此方法** @param proxy 代理对象* @param method 被调用的方法* @param args 方法参数* @return 远程方法执行结果* @throws Throwable 调用过程中出现的异常*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {log.info("开始执行RPC代理调用,服务接口:{},方法名称:{},参数数量:{}",serviceClass.getName(), method.getName(), args == null ? 0 : args.length);try {// 1. 从注册中心发现服务地址String serviceName = serviceClass.getName();List<String> serviceAddressList = serviceRegistry.discover(serviceName);if (ObjectUtils.isEmpty(serviceAddressList)) {log.error("服务无可用地址,无法执行RPC调用,服务名称:{}", serviceName);throw new RpcException("服务无可用地址:" + serviceName);}// 2. 负载均衡:选择一个服务地址String selectedAddress = loadBalancer.select(serviceAddressList, serviceName);log.info("负载均衡选择服务地址,服务名称:{},选中地址:{}", serviceName, selectedAddress);// 3. 解析服务地址为IP和端口String[] addressParts = selectedAddress.split(":");if (addressParts.length != 2) {log.error("服务地址格式非法,无法解析,地址:{}", selectedAddress);throw new RpcException("服务地址格式非法:" + selectedAddress);}String serverIp = addressParts[0];int serverPort = Integer.parseInt(addressParts[1]);InetSocketAddress serverAddress = new InetSocketAddress(serverIp, serverPort);// 4. 创建客户端传输层(Netty客户端)if (ObjectUtils.isEmpty(clientTransport) || !clientTransport.isConnected()) {clientTransport = new NettyRpcClient(serverAddress);log.info("创建Netty客户端连接,服务地址:{}:{}", serverIp, serverPort);}// 5. 构建RPC请求对象RpcRequest rpcRequest = RpcRequest.builder().requestId(IdUtils.generateRequestId()) // 生成唯一请求ID.serviceName(serviceName) // 服务接口名称.methodName(method.getName()) // 方法名称.parameterTypes(method.getParameterTypes()) // 方法参数类型.parameters(args) // 方法参数值.serviceVersion(serviceVersion) // 服务版本号.build();// 6. 发送RPC请求,获取响应(异步转同步,等待响应结果)CompletableFuture<RpcResponse> responseFuture = clientTransport.sendRequest(rpcRequest);RpcResponse rpcResponse = responseFuture.get(); // 阻塞等待响应// 7. 处理响应结果if (ObjectUtils.isEmpty(rpcResponse)) {log.error("RPC响应为空,请求ID:{}", rpcRequest.getRequestId());throw new RpcException("RPC响应为空,请求ID:" + rpcRequest.getRequestId());}if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {log.error("RPC请求与响应ID不匹配,请求ID:{},响应ID:{}",rpcRequest.getRequestId(), rpcResponse.getRequestId());throw new RpcException("请求与响应ID不匹配:" + rpcRequest.getRequestId() + " vs " + rpcResponse.getRequestId());}// 8. 判断调用是否成功,返回结果或抛出异常if (rpcResponse.isSuccess()) {log.info("RPC调用成功,请求ID:{},服务接口:{},方法名称:{}",rpcRequest.getRequestId(), serviceName, method.getName());return rpcResponse.getResult();} else {log.error("RPC调用失败,请求ID:{},异常信息:{}",rpcRequest.getRequestId(), rpcResponse.getException().getMessage());throw new RpcException("RPC调用失败:" + rpcResponse.getException().getMessage(), rpcResponse.getException());}} catch (Exception e) {log.error("RPC代理调用异常,服务接口:{},方法名称:{},异常信息:{}",serviceClass.getName(), method.getName(), e.getMessage(), e);throw new RpcException("RPC调用异常:" + e.getMessage(), e);}}/*** 关闭客户端传输层连接*/public void close() {if (!ObjectUtils.isEmpty(clientTransport)) {clientTransport.close();log.info("RPC代理客户端连接已关闭,服务接口:{}", serviceClass.getName());}}
}
4.4.2 工具类:生成唯一请求 ID
为了确保 RPC 请求与响应的对应(Netty 异步通信),需要生成唯一的requestId,这里使用 UUID 简化实现:
package com.ken.rpc.util;import java.util.UUID;/*** ID生成工具类* 生成RPC请求ID、服务ID等唯一标识** @author ken*/
public class IdUtils {/*** 生成RPC请求ID(基于UUID,去除横杠)** @return 唯一请求ID(32位字符串)*/public static String generateRequestId() {// UUID格式:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx,去除横杠后为32位return UUID.randomUUID().toString().replace("-", "");}
}
4.5 负载均衡层:实现服务调用的负载分发
设计思路:当一个服务有多个提供者(如 UserService 部署在 3 台服务器上)时,消费者需要选择其中一台调用,这就是负载均衡的作用。我们定义负载均衡接口,实现常见的负载均衡算法(轮询、随机、加权随机),让框架支持灵活切换。
4.5.1 负载均衡接口定义
package com.ken.rpc.loadbalance;import com.ken.rpc.exception.LoadBalanceException;
import org.springframework.util.CollectionUtils;import java.util.List;/*** 负载均衡接口* 定义从服务地址列表中选择一个地址的方法** @author ken*/
public interface LoadBalancer {/*** 从服务地址列表中选择一个地址** @param serviceAddressList 服务地址列表(格式:IP:端口)* @param serviceName 服务名称(用于区分不同服务的负载均衡状态)* @return 选中的服务地址* @throws LoadBalanceException 负载均衡失败时抛出*/String select(List<String> serviceAddressList, String serviceName) throws LoadBalanceException;/*** 校验服务地址列表(通用方法,所有实现类可复用)** @param serviceAddressList 服务地址列表* @param serviceName 服务名称* @throws LoadBalanceException 地址列表为空时抛出*/default void validateAddressList(List<String> serviceAddressList, String serviceName) throws LoadBalanceException {if (CollectionUtils.isEmpty(serviceAddressList)) {throw new LoadBalanceException("服务无可用地址,无法执行负载均衡,服务名称:" + serviceName);}}
}
4.5.2 轮询算法实现(最常用)
轮询算法:按顺序依次选择服务地址,如地址列表为 [A,B,C],则调用顺序为 A→B→C→A→B→C...
package com.ken.rpc.loadbalance.impl;import com.ken.rpc.exception.LoadBalanceException;
import com.ken.rpc.loadbalance.LoadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;/*** 轮询负载均衡算法* 按顺序依次选择服务地址,适用于服务节点性能相近的场景** @author ken*/
@Slf4j
public class RoundRobinLoadBalancer implements LoadBalancer {/*** 每个服务的轮询计数器(key:服务名称,value:当前计数)* 使用AtomicInteger保证线程安全*/private final ConcurrentMap<String, AtomicInteger> countMap = new ConcurrentHashMap<>();@Overridepublic String select(List<String> serviceAddressList, String serviceName) throws LoadBalanceException {// 1. 校验地址列表validateAddressList(serviceAddressList, serviceName);int addressCount = serviceAddressList.size();if (addressCount == 1) {// 只有一个地址,直接返回String address = serviceAddressList.get(0);log.debug("轮询负载均衡:服务只有一个地址,直接返回,服务名称:{},地址:{}", serviceName, address);return address;}// 2. 获取当前服务的轮询计数器(不存在则初始化)AtomicInteger count = countMap.computeIfAbsent(serviceName, k -> new AtomicInteger(0));// 3. 原子递增计数,并取模(避免计数溢出)int currentCount = count.getAndIncrement();// 取模确保选择的索引在地址列表范围内int selectedIndex = currentCount % addressCount;// 4. 处理计数溢出(当计数超过Integer.MAX_VALUE时重置为0)if (currentCount > Integer.MAX_VALUE - 1000) {count.set(0);log.debug("轮询计数器即将溢出,重置为0,服务名称:{},当前计数:{}", serviceName, currentCount);}// 5. 返回选中的地址String selectedAddress = serviceAddressList.get(selectedIndex);log.debug("轮询负载均衡选择完成,服务名称:{},地址列表大小:{},当前计数:{},选中索引:{},选中地址:{}",serviceName, addressCount, currentCount, selectedIndex, selectedAddress);return selectedAddress;}
}
4.5.3 负载均衡异常定义
package com.ken.rpc.exception;/*** 负载均衡异常* 当从服务地址列表选择地址过程中出现错误时抛出** @author ken*/
public class LoadBalanceException extends RuntimeException {public LoadBalanceException(String message) {super(message);}public LoadBalanceException(String message, Throwable cause) {super(message, cause);}
}
4.6 服务提供者与消费者:注解驱动开发
设计思路:为了简化框架使用,我们采用注解驱动开发:服务提供者用@RpcService注解标记服务实现类,框架自动扫描并注册服务;服务消费者用@RpcReference注解标记服务接口,框架自动生成代理对象,消费者直接注入使用。
4.6.1 服务提供者注解:@RpcService
package com.ken.rpc.annotation;import org.springframework.stereotype.Component;import java.lang.annotation.*;/*** RPC服务提供者注解* 标记服务实现类,框架自动扫描并注册到注册中心** @author ken*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component // 继承Spring的@Component,让Spring能扫描到该类
public @interface RpcService {/*** 服务接口Class对象(默认当前类实现的第一个接口)*/Class<?> interfaceClass() default void.class;/*** 服务接口名称(与interfaceClass二选一,优先使用interfaceClass)*/String interfaceName() default "";/*** 服务版本号(默认1.0.0)*/String serviceVersion() default "1.0.0";
}
4.6.2 服务消费者注解:@RpcReference
package com.ken.rpc.annotation;import org.springframework.beans.factory.annotation.Autowired;import java.lang.annotation.*;/*** RPC服务消费者注解* 标记服务接口字段,框架自动生成代理对象并注入到Spring Bean中** @author ken*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired // 结合Spring自动注入机制,触发代理对象创建
public @interface RpcReference {/*** 服务接口Class对象(默认取字段的类型)*/Class<?> interfaceClass() default void.class;/*** 服务版本号(默认1.0.0,需与服务提供者版本一致)*/String serviceVersion() default "1.0.0";/*** 注册中心地址(默认从全局配置读取,可单独指定)*/String registryAddress() default "";
}
4.6.3 服务扫描与注册实现(核心)
需要实现两个核心逻辑:服务提供者扫描(识别@RpcService注解并注册服务)和服务消费者注入(识别@RpcReference注解并注入代理对象)。我们借助 Spring 的BeanPostProcessor机制,在 Bean 初始化阶段完成这些操作。
4.6.3.1 服务映射器:存储服务接口与实现的对应关系
服务端接收 RPC 请求后,需要根据 “服务名称 + 版本号” 找到对应的实现类,因此需要一个线程安全的服务映射器:
package com.ken.rpc.service;import com.ken.rpc.exception.RpcException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** RPC服务映射器* 存储“服务接口+版本号”与服务实现类的对应关系,供服务端处理请求时查询** @author ken*/
@Slf4j
public class RpcServiceMapper {/*** 服务映射表:key=服务接口全限定名:版本号(如com.ken.rpc.service.UserService:1.0.0),value=服务实现类实例*/private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();/*** 注册服务到映射表** @param serviceInterface 服务接口Class* @param serviceVersion 服务版本号* @param serviceImpl 服务实现类实例*/public void registerService(Class<?> serviceInterface, String serviceVersion, Object serviceImpl) {// 1. 校验参数if (ObjectUtils.isEmpty(serviceInterface) || !serviceInterface.isInterface()) {log.error("服务接口非法,必须是接口类型,接口类:{}", serviceInterface);throw new RpcException("服务接口必须是接口类型:" + serviceInterface);}if (ObjectUtils.isEmpty(serviceImpl)) {log.error("服务实现类实例为空,接口类:{},版本号:{}", serviceInterface.getName(), serviceVersion);throw new RpcException("服务实现类实例不能为空");}if (!serviceInterface.isAssignableFrom(serviceImpl.getClass())) {log.error("服务实现类未实现指定接口,接口类:{},实现类:{}", serviceInterface.getName(), serviceImpl.getClass().getName());throw new RpcException("服务实现类未实现接口:" + serviceInterface.getName());}// 2. 构建服务key(接口名:版本号)String serviceKey = buildServiceKey(serviceInterface.getName(), serviceVersion);// 3. 注册服务(若已存在则覆盖,支持服务热更新)if (serviceMap.containsKey(serviceKey)) {log.warn("服务已存在,将覆盖旧服务,服务key:{},旧实现类:{},新实现类:{}",serviceKey, serviceMap.get(serviceKey).getClass().getName(), serviceImpl.getClass().getName());}serviceMap.put(serviceKey, serviceImpl);log.info("服务注册到映射表成功,服务key:{},接口类:{},实现类:{},版本号:{}",serviceKey, serviceInterface.getName(), serviceImpl.getClass().getName(), serviceVersion);}/*** 根据服务接口名和版本号获取服务实现类** @param serviceName 服务接口全限定名* @param serviceVersion 服务版本号* @return 服务实现类实例* @throws RpcException 服务不存在时抛出*/public Object getService(String serviceName, String serviceVersion) {// 1. 校验参数if (ObjectUtils.isEmpty(serviceName)) {log.error("服务名称为空,无法获取服务实现");throw new RpcException("服务名称不能为空");}String serviceKey = buildServiceKey(serviceName, serviceVersion);// 2. 查询服务Object serviceImpl = serviceMap.get(serviceKey);if (ObjectUtils.isEmpty(serviceImpl)) {log.error("服务不存在,服务key:{},可用服务列表:{}", serviceKey, serviceMap.keySet());throw new RpcException("服务不存在:" + serviceKey);}log.debug("获取服务实现成功,服务key:{},实现类:{}", serviceKey, serviceImpl.getClass().getName());return serviceImpl;}/*** 构建服务key(接口名:版本号)* 版本号为空时,默认使用"1.0.0"** @param serviceName 服务接口名* @param serviceVersion 服务版本号* @return 服务key*/private String buildServiceKey(String serviceName, String serviceVersion) {String version = ObjectUtils.isEmpty(serviceVersion) ? "1.0.0" : serviceVersion;return serviceName + ":" + version;}/*** 获取所有已注册的服务key** @return 服务key列表*/public Map<String, Object> getServiceMap() {return serviceMap;}
}
4.6.3.2 服务提供者扫描处理器
实现BeanPostProcessor,在 Spring 初始化@RpcService标记的 Bean 后,将其注册到RpcServiceMapper和 ZooKeeper:
package com.ken.rpc.processor;import com.ken.rpc.annotation.RpcService;
import com.ken.rpc.registry.ServiceRegistry;
import com.ken.rpc.service.RpcServiceMapper;
import com.ken.rpc.transport.netty.NettyRpcServer;
import com.ken.rpc.util.NetUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.lang.reflect.Method;/*** RPC服务提供者扫描处理器* 扫描带有@RpcService注解的Bean,完成服务注册(映射表+注册中心)并启动Netty服务端** @author ken*/
@Slf4j
@Component
public class RpcServiceBeanPostProcessor implements BeanPostProcessor {/*** 服务映射器(注入Spring容器)*/private final RpcServiceMapper rpcServiceMapper;/*** 服务注册中心(注入Spring容器,默认ZooKeeper实现)*/private final ServiceRegistry serviceRegistry;/*** Netty服务端端口(从配置文件读取,默认8080)*/@Value("${rpc.server.port:8080}")private int serverPort;/*** 注册中心地址(从配置文件读取,默认127.0.0.1:2181)*/@Value("${rpc.registry.address:127.0.0.1:2181}")private String registryAddress;/*** Netty服务端实例(用于启动和关闭)*/private NettyRpcServer nettyRpcServer;public RpcServiceBeanPostProcessor(RpcServiceMapper rpcServiceMapper, ServiceRegistry serviceRegistry) {this.rpcServiceMapper = rpcServiceMapper;this.serviceRegistry = serviceRegistry;}/*** Bean初始化后执行:处理@RpcService注解的Bean** @param bean Bean实例* @param beanName Bean名称* @return 原Bean实例(不修改Bean)* @throws BeansException Bean处理异常*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 1. 判断Bean是否带有@RpcService注解RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);if (ObjectUtils.isEmpty(rpcService)) {return bean; // 非RPC服务Bean,直接返回}// 2. 解析@RpcService注解的属性Class<?> serviceInterface = rpcService.interfaceClass();String serviceInterfaceName = rpcService.interfaceName();String serviceVersion = rpcService.serviceVersion();// 3. 确定服务接口(优先使用interfaceClass,若未指定则取Bean实现的第一个接口)if (serviceInterface == void.class) {Class<?>[] interfaces = bean.getClass().getInterfaces();if (ObjectUtils.isEmpty(interfaces)) {log.error("服务实现类未实现任何接口,无法注册服务,Bean名称:{},实现类:{}",beanName, bean.getClass().getName());throw new BeansException("服务实现类未实现接口:" + bean.getClass().getName()) {};}// 默认取第一个实现的接口serviceInterface = interfaces[0];log.warn("未指定服务接口(interfaceClass),默认使用实现类的第一个接口,Bean名称:{},接口类:{}",beanName, serviceInterface.getName());}// 4. 确定服务接口名(优先使用interfaceName,若未指定则取interfaceClass的全限定名)if (!StringUtils.hasText(serviceInterfaceName)) {serviceInterfaceName = serviceInterface.getName();}// 5. 注册服务到RpcServiceMapper(本地映射)rpcServiceMapper.registerService(serviceInterface, serviceVersion, bean);// 6. 注册服务到注册中心(ZooKeeper)String serviceAddress = NetUtils.getLocalIp() + ":" + serverPort;try {serviceRegistry.register(serviceInterfaceName, serviceAddress);log.info("服务注册到注册中心成功,服务接口:{},版本号:{},服务地址:{},Bean名称:{}",serviceInterfaceName, serviceVersion, serviceAddress, beanName);} catch (Exception e) {log.error("服务注册到注册中心失败,服务接口:{},版本号:{},服务地址:{},异常信息:{}",serviceInterfaceName, serviceVersion, serviceAddress, e.getMessage(), e);throw new BeansException("服务注册失败:" + serviceInterfaceName) {};}return bean;}/*** 初始化方法:启动Netty服务端(在所有Bean初始化前执行)*/@PostConstructpublic void startNettyServer() {if (!ObjectUtils.isEmpty(nettyRpcServer)) {log.warn("Netty服务端已启动,无需重复启动,端口:{}", serverPort);return;}// 启动Netty服务端(单独线程启动,避免阻塞Spring上下文初始化)new Thread(() -> {nettyRpcServer = new NettyRpcServer(serverPort, registryAddress);nettyRpcServer.start(); // 启动服务端(阻塞直到服务关闭)}, "netty-rpc-server-thread-" + serverPort).start();log.info("Netty服务端启动线程已创建,端口:{},注册中心地址:{}", serverPort, registryAddress);}/*** 销毁方法:关闭Netty服务端和注册中心连接(Spring上下文关闭时执行)*/@PreDestroypublic void stopNettyServer() {// 1. 关闭Netty服务端if (!ObjectUtils.isEmpty(nettyRpcServer)) {nettyRpcServer.close(); // 自定义close方法,优雅关闭Netty线程组log.info("Netty服务端已关闭,端口:{}", serverPort);}// 2. 关闭注册中心连接(若ZooKeeper实现,需调用close方法)if (serviceRegistry instanceof AutoCloseable) {try {((AutoCloseable) serviceRegistry).close();log.info("服务注册中心连接已关闭,地址:{}", registryAddress);} catch (Exception e) {log.error("服务注册中心连接关闭失败,地址:{},异常信息:{}",registryAddress, e.getMessage(), e);}}}/*** 关闭Netty服务端(供外部调用)*/public void close() {stopNettyServer();}
}
4.6.3.3 服务消费者注入处理器
实现BeanPostProcessor,在 Spring 初始化带有@RpcReference字段的 Bean 时,生成代理对象并注入字段:
package com.ken.rpc.processor;import com.ken.rpc.annotation.RpcReference;
import com.ken.rpc.proxy.RpcProxyFactory;
import com.ken.rpc.registry.ServiceRegistry;
import com.ken.rpc.registry.zookeeper.ZookeeperServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** RPC服务消费者注入处理器* 扫描Bean中带有@RpcReference注解的字段,生成代理对象并注入** @author ken*/
@Slf4j
@Component
public class RpcReferenceBeanPostProcessor implements BeanPostProcessor {/*** 代理对象缓存:key=服务接口名:版本号,value=代理对象(避免重复生成)*/private final ConcurrentMap<String, Object> proxyCache = new ConcurrentHashMap<>();/*** 全局注册中心地址(从配置文件读取,默认127.0.0.1:2181)*/@Value("${rpc.registry.address:127.0.0.1:2181}")private String globalRegistryAddress;/*** Bean初始化后执行:处理@RpcReference注解的字段** @param bean Bean实例* @param beanName Bean名称* @return 原Bean实例(已注入代理对象)* @throws BeansException Bean处理异常*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 1. 获取Bean的所有字段Field[] fields = bean.getClass().getDeclaredFields();if (ObjectUtils.isEmpty(fields)) {return bean; // 无字段,直接返回}// 2. 遍历字段,处理带有@RpcReference注解的字段for (Field field : fields) {RpcReference rpcReference = field.getAnnotation(RpcReference.class);if (ObjectUtils.isEmpty(rpcReference)) {continue; // 非RPC引用字段,跳过}// 3. 解析@RpcReference注解的属性Class<?> serviceInterface = rpcReference.interfaceClass();String serviceVersion = rpcReference.serviceVersion();String registryAddress = rpcReference.registryAddress();// 4. 确定服务接口(优先使用interfaceClass,若未指定则取字段类型)if (serviceInterface == void.class) {serviceInterface = field.getType();log.warn("未指定服务接口(interfaceClass),默认使用字段类型,Bean名称:{},字段名:{},接口类:{}",beanName, field.getName(), serviceInterface.getName());}// 5. 确定注册中心地址(优先使用注解的registryAddress,若未指定则使用全局配置)if (!StringUtils.hasText(registryAddress)) {registryAddress = globalRegistryAddress;}// 6. 生成或获取代理对象(从缓存获取,避免重复生成)String proxyKey = buildProxyKey(serviceInterface.getName(), serviceVersion);Object proxy = proxyCache.computeIfAbsent(proxyKey, k -> {log.info("生成RPC代理对象,服务接口:{},版本号:{},注册中心地址:{}",serviceInterface.getName(), serviceVersion, registryAddress);// 创建服务注册中心实例(ZooKeeper)ServiceRegistry serviceRegistry = new ZookeeperServiceRegistry(registryAddress);// 创建代理工厂并生成代理对象RpcProxyFactory proxyFactory = new RpcProxyFactory(serviceInterface, serviceVersion, serviceRegistry);return proxyFactory.getProxy();});// 7. 注入代理对象到字段(设置字段可访问,突破private修饰符)ReflectionUtils.makeAccessible(field);try {field.set(bean, proxy);log.info("RPC代理对象注入成功,Bean名称:{},字段名:{},服务接口:{},版本号:{}",beanName, field.getName(), serviceInterface.getName(), serviceVersion);} catch (IllegalAccessException e) {log.error("RPC代理对象注入失败,Bean名称:{},字段名:{},服务接口:{},异常信息:{}",beanName, field.getName(), serviceInterface.getName(), e.getMessage(), e);throw new BeansException("代理对象注入失败:" + field.getName()) {};}}return bean;}/*** 构建代理对象缓存key(服务接口名:版本号)** @param serviceName 服务接口名* @param serviceVersion 服务版本号* @return 代理缓存key*/private String buildProxyKey(String serviceName, String serviceVersion) {String version = ObjectUtils.isEmpty(serviceVersion) ? "1.0.0" : serviceVersion;return serviceName + ":" + version;}
}
4.6.4 框架自动配置类(简化用户配置)
为了让用户无需手动配置 Bean,我们使用 Spring Boot 的自动配置机制,通过@Configuration和@ConditionalOnMissingBean自动注册核心 Bean:
package com.ken.rpc.config;import com.ken.rpc.processor.RpcReferenceBeanPostProcessor;
import com.ken.rpc.processor.RpcServiceBeanPostProcessor;
import com.ken.rpc.registry.ServiceRegistry;
import com.ken.rpc.registry.zookeeper.ZookeeperServiceRegistry;
import com.ken.rpc.service.RpcServiceMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RPC框架自动配置类* 自动注册核心Bean,用户无需手动配置** @author ken*/
@Configuration
public class RpcAutoConfiguration {/*** 注册服务映射器(RpcServiceMapper)* 若用户未自定义,则使用默认实现*/@Bean@ConditionalOnMissingBeanpublic RpcServiceMapper rpcServiceMapper() {return new RpcServiceMapper();}/*** 注册服务注册中心(默认ZooKeeper实现)* 若用户未自定义,则使用ZookeeperServiceRegistry*/@Bean@ConditionalOnMissingBeanpublic ServiceRegistry serviceRegistry() {// 默认注册中心地址:127.0.0.1:2181(用户可通过配置文件覆盖)return new ZookeeperServiceRegistry("127.0.0.1:2181");}/*** 注册服务提供者扫描处理器*/@Bean@ConditionalOnMissingBeanpublic RpcServiceBeanPostProcessor rpcServiceBeanPostProcessor(RpcServiceMapper rpcServiceMapper, ServiceRegistry serviceRegistry) {return new RpcServiceBeanPostProcessor(rpcServiceMapper, serviceRegistry);}/*** 注册服务消费者注入处理器*/@Bean@ConditionalOnMissingBeanpublic RpcReferenceBeanPostProcessor rpcReferenceBeanPostProcessor() {return new RpcReferenceBeanPostProcessor();}
}
4.6.5 完整使用示例(可直接运行)
为了让用户直观理解框架使用流程,我们实现一个 “用户服务” 示例,包含服务接口、服务提供者、服务消费者三部分。
4.6.5.1 1. 定义服务接口(公共模块)
创建user-api模块,定义服务接口(服务提供者和消费者都需依赖此模块):
package com.ken.rpc.service;import com.ken.rpc.entity.User;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;/*** 用户服务RPC接口** @author ken*/
@Tag(name = "UserService", description = "用户服务RPC接口,提供用户查询、创建等功能")
public interface UserService {/*** 根据用户ID查询用户信息** @param userId 用户ID(非负整数)* @return 用户信息(userId不存在时返回null)*/@Operation(summary = "查询用户", description = "根据用户ID查询用户详情,支持ID范围:1-10000")User getUserById(@Parameter(description = "用户ID", required = true, example = "1001") Long userId);/*** 创建新用户** @param user 用户信息(username和age为必填字段)* @return 创建成功的用户ID(自增主键)*/@Operation(summary = "创建用户", description = "创建新用户,返回自增用户ID")Long createUser(@Parameter(description = "用户信息", required = true) User user);
}
用户实体类(需实现Serializable,支持序列化):
package com.ken.rpc.entity;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** 用户实体类** @author ken*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "用户实体类,存储用户基本信息")
public class User implements Serializable {private static final long serialVersionUID = 1L;/*** 用户ID(自增主键)*/@Schema(description = "用户ID(自增主键)", example = "1001")private Long userId;/*** 用户名(非空,长度1-20)*/@Schema(description = "用户名", required = true, example = "zhangsan")private String username;/*** 用户年龄(1-120)*/@Schema(description = "用户年龄", required = true, example = "25")private Integer age;/*** 用户邮箱(可选,格式需符合邮箱规范)*/@Schema(description = "用户邮箱", example = "zhangsan@example.com")private String email;
}
4.6.5.2 2. 服务提供者实现(user-provider 模块)
创建user-provider模块,依赖user-api,实现UserService并通过@RpcService暴露服务。
(1)服务实现类(集成 MyBatis-Plus 操作数据库)
package com.ken.rpc.provider.service.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ken.rpc.annotation.RpcService;
import com.ken.rpc.entity.User;
import com.ken.rpc.mapper.UserMapper;
import com.ken.rpc.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;import javax.annotation.Resource;/*** 用户服务实现类(RPC服务提供者)** @author ken*/
@Slf4j
@Service
@RpcService(interfaceClass = UserService.class, serviceVersion = "1.0.0")
public class UserServiceImpl implements UserService {@Resourceprivate UserMapper userMapper;@Overridepublic User getUserById(Long userId) {// 1. 校验参数if (ObjectUtils.isEmpty(userId) || userId <= 0) {log.error("查询用户失败:用户ID非法,userId:{}", userId);throw new IllegalArgumentException("用户ID必须为正整数:" + userId);}// 2. MyBatis-Plus查询用户LambdaQueryWrapper<User> queryWrapper = Wrappers.lambdaQuery(User.class).eq(User::getUserId, userId);User user = userMapper.selectOne(queryWrapper);// 3. 日志记录if (ObjectUtils.isEmpty(user)) {log.warn("查询用户不存在,userId:{}", userId);} else {log.info("查询用户成功,userId:{},username:{}", userId, user.getUsername());}return user;}@Overridepublic Long createUser(User user) {// 1. 校验参数if (ObjectUtils.isEmpty(user)) {log.error("创建用户失败:用户信息为空");throw new IllegalArgumentException("用户信息不能为空");}if (!StringUtils.hasText(user.getUsername()) || user.getUsername().length() > 20) {log.error("创建用户失败:用户名非法,username:{}", user.getUsername());throw new IllegalArgumentException("用户名必须为1-20位非空字符串:" + user.getUsername());}if (ObjectUtils.isEmpty(user.getAge()) || user.getAge() < 1 || user.getAge() > 120) {log.error("创建用户失败:年龄非法,age:{}", user.getAge());throw new IllegalArgumentException("年龄必须为1-120的整数:" + user.getAge());}// 2. MyBatis-Plus插入用户(自增主键会自动回写)int insertCount = userMapper.insert(user);if (insertCount != 1) {log.error("创建用户失败:插入数据库失败,影响行数:{},用户信息:{}", insertCount, user);throw new RuntimeException("创建用户失败:数据库插入异常");}// 3. 日志记录log.info("创建用户成功,userId:{},username:{}", user.getUserId(), user.getUsername());return user.getUserId();}
}
(2)MyBatis-Plus Mapper 接口
package com.ken.rpc.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ken.rpc.entity.User;
import org.apache.ibatis.annotations.Mapper;/*** 用户Mapper接口(MyBatis-Plus)** @author ken*/
@Mapper
public interface UserMapper extends BaseMapper<User> {
}
(3)数据库表结构(MySQL 8.0)
-- 创建用户表
CREATE TABLE `user` (`user_id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID(自增主键)',`username` varchar(20) NOT NULL COMMENT '用户名',`age` int NOT NULL COMMENT '用户年龄',`email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`user_id`),UNIQUE KEY `uk_username` (`username`) COMMENT '用户名唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';-- 插入测试数据
INSERT INTO `user` (`username`, `age`, `email`) VALUES
('zhangsan', 25, 'zhangsan@example.com'),
('lisi', 30, 'lisi@example.com');
(4)服务提供者配置文件(application.yml)
# Spring配置
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/rpc_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8username: rootpassword: 123456# MyBatis-Plus配置
mybatis-plus:mapper-locations: classpath:mapper/**/*.xmltype-aliases-package: com.ken.rpc.entityconfiguration:map-underscore-to-camel-case: true # 下划线转驼峰log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl # 日志实现# RPC框架配置
rpc:server:port: 8080 # Netty服务端端口registry:address: 127.0.0.1:2181 # ZooKeeper注册中心地址# Swagger3配置
springdoc:api-docs:path: /v3/api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodpackages-to-scan: com.ken.rpc.provider.controller
(5)服务提供者启动类
package com.ken.rpc.provider;import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;/*** RPC服务提供者启动类(用户服务)** @author ken*/
@SpringBootApplication(scanBasePackages = "com.ken.rpc")
@MapperScan("com.ken.rpc.mapper") // 扫描MyBatis Mapper
@EntityScan("com.ken.rpc.entity") // 扫描实体类
@OpenAPIDefinition(info = @Info(title = "User Provider API", version = "1.0", description = "用户服务提供者接口文档"))
public class UserProviderApplication {public static void main(String[] args) {SpringApplication.run(UserProviderApplication.class, args);}
}
4.6.5.3 3. 服务消费者实现(user-consumer 模块)
创建user-consumer模块,依赖user-api,通过@RpcReference注入UserService代理对象,提供 HTTP 接口供测试。
(1)消费者控制器(集成 Swagger3)
package com.ken.rpc.consumer.controller;import com.ken.rpc.annotation.RpcReference;
import com.ken.rpc.entity.User;
import com.ken.rpc.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.util.ObjectUtils;/*** 用户服务消费者控制器(HTTP接口)** @author ken*/
@Slf4j
@RestController
@RequestMapping("/user")
@Tag(name = "UserConsumerController", description = "用户服务消费者接口,通过RPC调用用户服务")
public class UserConsumerController {/*** 注入UserService RPC代理对象(通过@RpcReference自动生成)*/@RpcReference(interfaceClass = UserService.class, serviceVersion = "1.0.0", registryAddress = "127.0.0.1:2181")private UserService userService;/*** HTTP接口:根据用户ID查询用户(调用RPC服务)** @param userId 用户ID* @return 用户信息*/@GetMapping("/{userId}")@Operation(summary = "查询用户", description = "通过HTTP接口调用RPC服务查询用户")public String getUserById(@Parameter(description = "用户ID", required = true, example = "1001")@PathVariable Long userId) {try {log.info("消费者接收查询用户请求,userId:{}", userId);User user = userService.getUserById(userId);if (ObjectUtils.isEmpty(user)) {return "查询结果:用户不存在,userId:" + userId;}return "查询结果:" + user.toString();} catch (Exception e) {log.error("查询用户失败,userId:{},异常信息:{}", userId, e.getMessage(), e);return "查询失败:" + e.getMessage();}}/*** HTTP接口:创建用户(调用RPC服务)** @param user 用户信息* @return 创建结果*/@PostMapping("/create")@Operation(summary = "创建用户", description = "通过HTTP接口调用RPC服务创建用户")public String createUser(@Parameter(description = "用户信息", required = true)@RequestBody User user) {try {log.info("消费者接收创建用户请求,用户信息:{}", user);Long userId = userService.createUser(user);return "创建成功:新用户ID=" + userId;} catch (Exception e) {log.error("创建用户失败,用户信息:{},异常信息:{}", user, e.getMessage(), e);return "创建失败:" + e.getMessage();}}
}
(2)服务消费者配置文件(application.yml)
# Spring配置
spring:application:name: user-consumer# RPC框架配置
rpc:registry:address: 127.0.0.1:2181 # ZooKeeper注册中心地址(全局配置)# 服务器端口(避免与服务提供者冲突)
server:port: 8081# Swagger3配置
springdoc:api-docs:path: /v3/api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodpackages-to-scan: com.ken.rpc.consumer.controller
(3)服务消费者启动类
package com.ken.rpc.consumer;import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;/*** RPC服务消费者启动类(用户服务)** @author ken*/
@SpringBootApplication(scanBasePackages = "com.ken.rpc")
@EntityScan("com.ken.rpc.entity") // 扫描实体类
@OpenAPIDefinition(info = @Info(title = "User Consumer API", version = "1.0", description = "用户服务消费者接口文档"))
public class UserConsumerApplication {public static void main(String[] args) {SpringApplication.run(UserConsumerApplication.class, args);}
}
5. 框架测试与验证
5.1 环境准备
- 启动 ZooKeeper:下载 ZooKeeper 3.9.2,解压后修改
conf/zoo.cfg,设置dataDir,执行bin/zkServer.sh start(Linux)或bin/zkServer.cmd start(Windows)。 - 创建数据库:在 MySQL 8.0 中创建
rpc_db数据库,执行 4.6.5.2 中的 SQL 脚本创建user表并插入测试数据。 - 构建项目:在父项目根目录执行
mvn clean install,确保所有模块编译通过。
5.2 启动与测试步骤
步骤 1:启动服务提供者
- 运行
UserProviderApplication的main方法。 - 查看日志,确认以下信息:
- Netty 服务端启动成功(监听端口 8080)。
- 服务注册到 ZooKeeper 成功(服务 key:
com.ken.rpc.service.UserService:1.0.0)。
步骤 2:启动服务消费者
- 运行
UserConsumerApplication的main方法。 - 查看日志,确认以下信息:
- RPC 代理对象生成成功(
UserService代理)。 - 代理对象注入到
UserConsumerController成功。
- RPC 代理对象生成成功(
步骤 3:通过 Swagger3 测试接口
- 访问消费者 Swagger 地址:
http://localhost:8081/swagger-ui.html。 - 测试 “查询用户” 接口:
- 展开
/user/{userId}接口,输入userId=1001,点击 “Try it out”。 - 预期结果:返回
zhangsan的用户信息。
- 展开
- 测试 “创建用户” 接口:
- 展开
/user/create接口,输入请求体:json
{"username": "wangwu","age": 28,"email": "wangwu@example.com" } - 点击 “Try it out”,预期结果:返回新用户 ID(如 1003)。
- 展开
- 验证数据库:查询
user表,确认wangwu的记录已插入。
5.3 测试结果分析
- 正确性:RPC 调用成功,服务消费者能通过代理对象调用远程服务,数据交互正确。
- 透明性:消费者无需关注网络通信、序列化、服务发现等细节,像调用本地方法一样调用远程服务。
- 可用性:服务提供者下线后,ZooKeeper 会自动删除临时节点,消费者后续调用会选择其他可用节点(需部署多个提供者测试)。
6. 框架优化与扩展方向
当前实现的 RPC 框架已具备核心功能,但在生产环境中还需优化和扩展,以下是常用方向:
6.1 容错机制
- 重试机制:调用失败时自动重试(如网络抖动),可通过
Guava-Retryer实现,支持重试次数、重试间隔配置。 - 服务降级:当服务不可用时,返回默认值或友好提示(如
Resilience4j的降级功能)。 - 熔断机制:服务连续失败次数达到阈值时,暂时停止调用,避免雪崩效应(如
Resilience4j的熔断功能)。
6.2 性能优化
- 序列化优化:替换 FastJSON2 为 Protobuf,Protobuf 是二进制序列化协议,体积更小、速度更快,适合高性能场景。
- 连接池优化:Netty 客户端使用连接池,避免频繁创建和关闭连接,减少开销。
- 异步调用:支持 CompletableFuture 异步调用,避免同步调用阻塞线程,提高并发量。
6.3 功能扩展
- 多注册中心支持:实现 Nacos、Etcd 等注册中心的
ServiceRegistry接口,用户可通过配置切换。 - 服务监控:集成 SkyWalking 或 Prometheus,监控 RPC 调用耗时、成功率、并发量等指标。
- 权限控制:在 RPC 请求中添加 token,服务端验证 token 合法性,防止非法调用。
- 服务文档自动生成:基于服务接口注解,自动生成 RPC 服务文档(类似 Swagger)。
6.4 代码示例:添加重试机制
以RpcProxyFactory为例,集成Guava-Retryer实现重试:
// 添加Guava Retryer依赖
<dependency><groupId>com.github.rholder</groupId><artifactId>guava-retrying</artifactId><version>2.0.0</version>
</dependency>// 在RpcProxyFactory的invoke方法中添加重试逻辑
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder().retryIfExceptionOfType(NetworkException.class) // 网络异常重试.retryIfResult(response -> !response.isSuccess()) // 调用失败重试.withStopStrategy(StopStrategies.stopAfterAttempt(3)) // 最多重试3次.withWaitStrategy(WaitStrategies.fixedWait(100, TimeUnit.MILLISECONDS)) // 重试间隔100ms.build();try {// 带重试的调用RpcResponse rpcResponse = retryer.call(() -> clientTransport.sendRequest(rpcRequest).get());
} catch (ExecutionException e) {// 重试失败处理throw new RpcException("RPC调用重试失败:" + e.getCause().getMessage(), e.getCause());
}
7. 总结
本文从 0 到 1 实现了一个可运行的 RPC 框架,核心包含序列化层、网络传输层、服务注册中心、动态代理层、负载均衡层、注解驱动层六大模块,通过示例验证了框架的正确性和可用性。
- 核心逻辑:通过动态代理屏蔽远程调用细节,Netty 实现高性能通信,ZooKeeper 实现服务发现,负载均衡实现服务分发,注解驱动简化用户使用。
- 关键收获:理解 RPC 的底层原理(如 TCP 粘包拆包、序列化、服务发现),掌握框架设计的 “接口化” 思想(如
Serializer、ServiceRegistry),便于后续扩展。 - 生产落地:若需在生产环境使用,需补充容错、监控、权限等功能,并进行性能测试和压测,确保满足业务需求。
