当前位置: 首页 > news >正文

从零开始实现一个简单的 RPC 框架(Java 版)

一、什么是 RPC?

RPC(Remote Procedure Call,远程过程调用)是一种进程间通信技术,允许程序像调用本地函数一样调用远程服务器上的方法。在分布式系统中,RPC 是非常常见且核心的技术之一,它隐藏了底层网络通信的复杂性,使得开发人员可以专注于业务逻辑的设计与实现。

常见的 RPC 框架:

  • Dubbo:阿里巴巴开源的高性能、轻量级 RPC 框架。
  • gRPC:Google 推出的基于 HTTP/2 的高性能 RPC 框架。
  • Thrift:Facebook 开源的跨语言服务框架。
  • Spring Cloud Feign / OpenFeign:集成在 Spring 生态中的声明式 REST 客户端。

这些成熟的框架功能强大,但如果我们想理解其背后的原理,最好的方式就是自己动手实现一个简单的版本。


二、目标与设计思路

我们要实现的是一个最简版的 RPC 框架,具备以下基本功能:

  1. 服务提供者注册服务接口
  2. 服务消费者调用远程方法
  3. 使用 Netty 或 Socket 进行网络通信
  4. 使用序列化机制传输数据(如 JSON 或 JDK 序列化)
  5. 支持同步调用

我们将采用经典的客户端-服务端架构,整体流程如下:

[客户端] --> 发送请求 --> [服务端]
[服务端] --> 处理请求并返回结果 --> [客户端]

三、项目结构设计

为了便于组织代码,我们按照模块划分如下:

simple-rpc/
├── simple-rpc-api/         # 公共接口定义
├── simple-rpc-server/      # 服务提供方
├── simple-rpc-client/      # 服务消费方
└── simple-rpc-common/      # 公共工具类、协议、序列化等

四、公共接口定义(simple-rpc-api)

首先,我们定义一个服务接口,供服务提供方和消费方共同依赖。

// HelloService.java
public interface HelloService {String sayHello(String name);
}

这个接口会在服务端被实现,在客户端被调用。


五、服务提供方实现(simple-rpc-server)

1. 服务实现类

// HelloServiceImpl.java
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {System.out.println("收到请求:" + name);return "Hello, " + name;}
}

2. 启动服务端

我们使用 Netty 来搭建 TCP 服务器,监听客户端请求。

添加 Maven 依赖(Netty):
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.96.Final</version>
</dependency>
编写服务启动类:
// RpcServer.java
public class RpcServer {private final int port;public RpcServer(int port) {this.port = port;}public void start() throws Exception {ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcDecoder())     // 解码请求.addLast(new RpcEncoder())     // 编码响应.addLast(new RpcServerHandler());}});ChannelFuture future = bootstrap.bind(port).sync();System.out.println("RPC 服务已启动,监听端口:" + port);future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new RpcServer(8080).start();}
}

3. 请求处理器(RpcServerHandler)

// RpcServerHandler.java
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {private final Map<String, Object> serviceMap = new HashMap<>();public void addService(String serviceName, Object service) {serviceMap.put(serviceName, service);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {String serviceName = request.getClassName();Object service = serviceMap.get(serviceName);if (service == null) {RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setException(new RuntimeException("找不到对应的服务:" + serviceName));ctx.writeAndFlush(response);return;}try {Method method = service.getClass().getMethod(request.getMethodName(),request.getParameterTypes());Object result = method.invoke(service, request.getParameters());RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setResult(result);ctx.writeAndFlush(response);} catch (Exception e) {RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setException(e);ctx.writeAndFlush(response);}}
}

六、服务消费方实现(simple-rpc-client)

1. 动态代理调用远程服务

我们通过 Java 动态代理来生成远程调用对象:

// RpcProxy.java
public class RpcProxy {private final String host;private final int port;public RpcProxy(String host, int port) {this.host = host;this.port = port;}public <T> T getProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(getClass().getClassLoader(),new Class<?>[]{serviceClass},new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RpcRequest request = new RpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);RpcClient client = new RpcClient(host, port);RpcResponse response = client.send(request);if (response.getException() != null) {throw response.getException();}return response.getResult();}});}
}

2. 网络客户端(RpcClient)

// RpcClient.java
public class RpcClient {private final String host;private final int port;public RpcClient(String host, int port) {this.host = host;this.port = port;}public RpcResponse send(RpcRequest request) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcEncoder()).addLast(new RpcDecoder()).addLast(new RpcClientHandler());}});ChannelFuture future = bootstrap.connect(host, port).sync();future.channel().writeAndFlush(request).sync();RpcClientHandler handler = (RpcClientHandler) future.channel().pipeline().last();RpcResponse response = handler.getResponse();return response;} finally {group.shutdownGracefully();}}
}

3. 客户端处理器(RpcClientHandler)

// RpcClientHandler.java
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {private RpcResponse response;public RpcResponse getResponse() {return response;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) {this.response = msg;}
}

七、通用组件(simple-rpc-common)

1. 协议定义

我们自定义一个简单的 RPC 协议,包含请求和响应结构。

请求体(RpcRequest):
// RpcRequest.java
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;// getter/setter
}
响应体(RpcResponse):
// RpcResponse.java
public class RpcResponse {private String requestId;private Object result;private Throwable exception;// getter/setter
}

2. 序列化与反序列化(JSON 示例)

我们可以使用 Jackson 来进行 JSON 序列化。

// JsonUtil.java
public class JsonUtil {private static final ObjectMapper mapper = new ObjectMapper();public static byte[] serialize(Object obj) {try {return mapper.writeValueAsBytes(obj);} catch (Exception e) {throw new RuntimeException(e);}}public static <T> T deserialize(byte[] data, Class<T> clazz) {try {return mapper.readValue(data, clazz);} catch (Exception e) {throw new RuntimeException(e);}}
}

3. 编解码器(Netty Handler)

编码器(RpcEncoder):
// RpcEncoder.java
public class RpcEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {byte[] data = JsonUtil.serialize(msg);out.writeInt(data.length);out.writeBytes(data);}
}
解码器(RpcDecoder):
// RpcDecoder.java
public class RpcDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int length = in.readInt();if (in.readableBytes() < length) {in.resetReaderIndex();return;}byte[] data = new byte[length];in.readBytes(data);try {Object obj = JsonUtil.deserialize(data, RpcRequest.class);out.add(obj);} catch (Exception e) {throw new RuntimeException(e);}}
}

八、测试运行

1. 启动服务端

// 启动服务端
public class ServerMain {public static void main(String[] args) throws Exception {RpcServer server = new RpcServer(8080);HelloService helloService = new HelloServiceImpl();server.addHandler(HelloService.class.getName(), helloService);server.start();}
}

2. 调用远程服务

// 客户端调用
public class ClientMain {public static void main(String[] args) {RpcProxy proxy = new RpcProxy("127.0.0.1", 8080);HelloService service = proxy.getProxy(HelloService.class);String result = service.sayHello("World");System.out.println("服务端返回结果:" + result);}
}

九、总结与优化建议

当前实现的优点:

  • 结构清晰,易于理解和扩展。
  • 使用 Netty 提高通信性能。
  • 支持同步调用,满足基础需求。

可以进一步优化的方向:

  1. 异步调用支持:增加 Future/Promise 机制。
  2. 服务注册中心:引入 Zookeeper、Eureka、Consul 等注册中心。
  3. 负载均衡策略:多个服务实例时选择合适的调用节点。
  4. 异常处理增强:超时重试、熔断机制。
  5. 协议扩展:支持 Protobuf、Thrift 等更高效的序列化格式。
  6. 日志与监控:添加调用链追踪、性能统计等功能。

http://www.dtcms.com/a/284366.html

相关文章:

  • uniapp运行鸿蒙报错整理
  • 25年7月最新版本利用标准算法库对医保服务平台js逆向之signData进行分析
  • SiLM6000S:高集成智能光伏关断器,集成SunSpec PLC接收,助力安全合规
  • python Flask 框架入门
  • Kotlin 属性委托 observable 的实现原理
  • 使用Leaflet实现地图高亮点标记功能 渲染本地icon图片
  • 集成算法学习学习
  • Qt 监控串口设备热插拔的方法
  • javaweb学习开发代码_HTML-CSS-JS
  • [RAG] 文档格式化 | 知识库摄入 | VectorDB.faiss | BM25索引.pkl
  • 松材线虫检测仪在林业的作用
  • 【Lua】题目小练1
  • 九学王资源apk应用名称整理
  • 【机器学习实战【七】】机器学习特征选定与评估
  • ELN:生物医药科研的数字化引擎——衍因科技引领高效创新
  • 多线程(一) --- 线程的基础知识
  • 使用位运算优化 Vue.js 应用:高效状态管理技巧
  • Oracle 19.28 RU 升级最佳实践指南
  • 装饰器模式及优化
  • 大模型Agent应用开发实战:从框架选型到行业落地
  • 十六进制与嵌入式系统及通信系统
  • yolo8+ASR+NLP+TTS(视觉语音助手)
  • 基于Rust Softplus 函数实践方法
  • 【通识】网络的基础知识
  • 学习日志预告
  • 【测试100问】为什么要做接口测试?
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | GoodCheapFast(Good - Cheap - Fast三选二开关)
  • 区块链之Casper协议背景、演变发展、运作机制和潜在风险
  • 周志华《机器学习导论》第8章 集成学习 Ensemble Learning
  • 2025开源组件安全工具推荐OpenSCA