当前位置: 首页 > news >正文

flink如何基于Pekko实现RPC调用

摘要

通过阅读flink源码,了解flink是如何基于Pekko实现远程RPC调用的

Pekko实现远程调用

Flink 的 RPC 框架底层是构建在 Pekko 的 actor 模型之上的,了解Pekko如何使用,对后续源码的阅读有帮助。

Apache Pekko(原为 Akka 的一个分支)是一个强大的工具包,用于构建并发、分布式和可扩展的系统。它基于经典的 Actor 模型,提供了一种事件驱动、非阻塞的编程范式,使开发者能够更轻松地构建容错性强、模块化清晰的分布式应用。

引入依赖

确保你使用的是 Apache Pekko 的 Maven 依赖:

<dependencies><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-actor_2.13</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-remote_2.13</artifactId><version>1.0.2</version></dependency>
</dependencies>

定义消息类(RPC 通信协议)

public class HelloRequest implements java.io.Serializable {  public final String message;  public HelloRequest(String message) {  this.message = message;  }  
}
public class HelloResponse implements java.io.Serializable {  public final String reply;  public HelloResponse(String reply) {  this.reply = reply;  }  @Override  public String toString() {  return reply;  }  
}

HelloRequestHelloResponse 是在使用 Pekko远程通信 时的消息协议类,也就是你定义的“请求消息”和“响应消息”。它们是通过网络在客户端与服务端之间传输的,所以必须满足可序列化(Serializable)的要求。

服务端代码(远程服务)

HelloActor.java

public class HelloActor extends AbstractActor {  @Override  public Receive createReceive() {  return receiveBuilder()  .match(HelloRequest.class, req -> {  System.out.println("服务端收到消息: " + req.message);  // 回复客户端  getSender().tell(new HelloResponse("你好,客户端,我收到了:" + req.message), getSelf());  })  .build();  }  public static Props props() {  return Props.create(HelloActor.class);  }  
}

在 Pekko 中,HelloActor 相当于传统 RPC 框架中的服务实现类,但其处理逻辑是基于 消息驱动模型 而非方法调用。Pekko 的核心设计理念是:Actor 只对接收到的消息做出反应,并保持自身状态独立和可并发执行

以下是关键点说明:

  • createReceive() 方法 定义了该 Actor 支持的消息类型和对应的处理逻辑。使用 receiveBuilder().match(...).build() 来设置“消息类型 → 响应处理”的映射。
  • getSender().tell(...) 表示将处理结果异步返回给消息发送者,它等价于传统 RPC 中的“返回值”机制,只不过是通过消息的方式返回。
  • Props.create(...) 返回一个 Props 实例,描述了如何构造该 Actor。这类似于构造函数的封装工厂。Props 是 Actor 的构造“配方”,用于 ActorSystem.actorOf(...) 创建真正的 Actor 实例。

ServerApp.java

public class ServerApp {  public static void main(String[] args) {  // 使用硬编码配置启动远程 ActorSystem        Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 25520            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ServerSystem", config);  // 启动 HelloActor,名字是 helloActor,供客户端远程访问  ActorRef actorRef = system.actorOf(HelloActor.props(), "helloActor");  System.out.println("服务端已启动,等待远程调用...");  }  
}

代码说明

  • 用 Java 代码动态构造 Pekko 配置(替代 application.conf 文件)
  • pekko.actor.serialize-messages = on 强制所有 Actor 之间发送的消息都走序列化流程(即使是本地通信)
  • ActorSystem.create(...) 创建了一个名为 ServerSystem 的远程 Actor 系统。
  • 指定 IP 和端口为 127.0.0.1:25520,就像传统 RPC 服务绑定地址。
  • 启动一个名为 helloActor 的 actor,客户端稍后通过这个名字进行访问。

客户端代码

ClientApp.java

public class ClientApp {  public static void main(String[] args) throws Exception {  // 使用硬编码配置启动客户端 ActorSystem,端口 0 表示随机  Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 0            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ClientSystem", config);  // 远程 actor 路径,相当于 RPC 服务地址 + 接口名  String remotePath = "pekko://ServerSystem@127.0.0.1:25520/user/helloActor";  // 选择远程 actor,相当于创建客户端 stub        ActorSelection selection = system.actorSelection(remotePath);  // 使用 ask 模式发送消息,并接收响应(模拟同步 RPC 调用)  CompletionStage<Object> future =  Patterns.ask(selection, new HelloRequest("这是来自客户端的问候"), Duration.ofSeconds(10));  // 等待响应结果(阻塞)  future.thenApply(response -> {  if (response instanceof HelloResponse helloResponse) {  return "客户端收到回复: " + helloResponse.reply;  } else {  return "收到未知回复: " + response;  }  })  .exceptionally(ex -> "调用失败: " + ex.getMessage())  .thenAccept(System.out::println).toCompletableFuture().join();  system.terminate();  }  
}

代码说明:

  • ActorSelection是一种 actor地址定位方式,它类似于 DNS 查询,可以根据路径去“找”一个远程 actor
  • Patterns.ask(...) 就像传统 RPC 的同步调用,它封装了发送、等待响应的过程。Duration.ofSeconds(3) 指定超时时间。.get() 阻塞等待结果,实际底层是异步实现。
    Pekko(或 Akka)中,如果你不需要请求-响应(ask),而只是发送消息给 Actorfire-and-forget),你可以直接使用 ActorRef.tell(...) 方法。
// 从 ActorSystem 中选择一个路径为 "/user/helloActor" 的 Actor(可能还没拿到真实引用)
// 注意:这个路径必须匹配一个已存在的 Actor,否则会 resolve 失败
ActorSelection selection = actorSystem.actorSelection("/user/helloActor");// 异步解析 selection,尝试获取对应 Actor 的真正引用 ActorRef(带超时)
CompletionStage<ActorRef> futureRef = selection.resolveOne(Duration.ofSeconds(3));// 当成功获取 ActorRef 后,使用 tell 发送一条消息,不需要返回(fire-and-forget)
futureRef.thenAccept(ref -> ref.tell("你好", ActorRef.noSender()));

flink的RPC框架如何使用

Flink 基于Pekko实现了自己RPC框架。当需要组件间需要使用RPC服务时,只需要定义接口、编写服务端接口逻辑即可。FlinkRpc框架自己会完成接收远程请求、调度线程、安全并发、处理生命周期等工作,让你像写本地对象一样写分布式服务。

本来想直接使用flinkrpc模块创建一个简单的demo项目来说明的,但是由于Flink使用了自定义的类加载器(如 SubmoduleClassLoader)来隔离不同模块(尤其是用户代码、插件、RPC 的动态 proxy 等)导致类不可见的问题

org.flink.MyServiceGateway referenced from a method is not visible from class loader: org.apache.flink.core.classloading.SubmoduleClassLoader

所以找了flink其中一个rpc服务来进行说明

Dispatcher组件

Dispatcher集群调度的中枢组件,它的作用相当于一个集群控制器,负责接收作业、分配作业、启动作业执行组件、以及监控作业生命周期。虽然Dispatcher只是在JobManager内使用,类似
伪分布式一样,但其创建与使用流程和真正的远程RPC组件是一样的。

DIspatcher在集群启动的时候,通过DispatcherFactory创建,StandaloneSession模式下,工厂实现类为SessionDispatcherFactory

下面以Dispatcher组件为例进行说明如何基于flinkrpc框架实现一个rpc服务。

rpc框架使用流程

使用流程大致如下:

  1. 定义 RpcGateway 接口作为rpc协议
  2. 继承 RpcEndpoint或者FencedRpcEndpoint 并实现RpcGateWay接口
  3. 使用 RpcService 注册服务(启动服务端)
  4. 使用RpcService连接服务端(获取client)

步骤1.定义 RpcGateway 接口

在这里插入图片描述

Dispatcher的RPC接口类是DispatcherGateway, FencedRpcGatewayRpcGateway的子接口。 rpc方法的返回值必须是 CompletableFuture<T> 类型,这是 Flink RPC 框架的设计要求

步骤2. 实现服务端

StandaloneSession模式下,Dispatcher的实现类是StandaloneDispatcher,该类是Dispacher的子类。Dispatcher类继承FencedRpcEndpoint类并实现DispatcherGateway接口
在这里插入图片描述

RpcEndpointFlink自研RPC`框架中用于实现远程服务端逻辑的抽象类,它帮你处理 RPC 生命周期、消息分发、线程安全调度等问题,其子类只需专注于“我要提供什么服务”即可。

步骤3. 启动服务

通过工厂创建了Dispatcher对象后,调用其start()方法启动服务
在这里插入图片描述

步骤4. 远程调用

提交job的时候,会调用dispatchersubmitJob启动并调度该作业。
在这里插入图片描述

gateway是一个DispatcherGateway对象,通过下面的代码获得到的,相当于Client。
在这里插入图片描述

通过该对象调用接口方法即可发起远程调用。由于Dispatcher的客户端代码从创建到使用的代码分的太散了,不方便说明,下面通过一个简单的示例来描述Client的创建流程。

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);MyServiceGateway gateway = gatewayFuture.get();gateway.sayHello("Flink").thenAccept(System.out::println);

MyServiceGateway.class就是定义的RpcGateway接口, gateway是一个远程代理对象了,调用它就等于远程 RPC 调用!

Client是如何发送消息的

已知flink底层是利用Pekko来实现rpc调用的,再次回顾flink rpc示例代码中可以想到

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);
MyServiceGateway gateway = gatewayFuture.get();
gateway.sayHello("Flink").thenAccept(System.out::println);

该gateway对象发起远程调用,本质上应该是使用了类似下面的代码来发送消息的

CompletionStage<Object> future =  Patterns.ask(selection, "Flink", Duration.ofSeconds(3));

这个gateway对象是由rpcService.connect返回的. rpcService是一个RpcService接口对象,其实现就4个,排除掉测试用的就剩一个 PekkoRpcService了。

connect方法的源码
继续看connect方法的源码,首先会先调用resolveActorAddress解析入参的rpc地址"akka://flink@127.0.0.1:6123/user/myService"得到一个ActorRef对象

private CompletableFuture<ActorRef> resolveActorAddress(String address) {  final ActorSelection actorSel = actorSystem.actorSelection(address);  return actorSel.resolveOne(configuration.getTimeout())  .toCompletableFuture()  .exceptionally(  error -> {  throw new CompletionException(  new RpcConnectionException(  String.format(  "Could not connect to rpc endpoint under address %s.",  address),  error));  });  
}

获取到ActorRef后,使用 Java 的 动态代理机制创建一个实现了MyServiceGateway接口的代理对象 proxy
在这里插入图片描述

既然是动态代理,那就得看Handler方法里面的逻辑了,创建Handler的invocationHandlerFactory代码如下:
在这里插入图片描述

查看对应的invoke方法会看到实际发消息的是invokeRpc
在这里插入图片描述

所以最终actor.tell是在这里被调用的

转换成rpc参数的逻辑如下,只是将被调用方法所需的参数与信息封装成MethodInvocation对象
在这里插入图片描述

Server是接收处理消息的

前面的代码已经知道了client通过Pekko的actor发送了消息,现在要看Server这边是怎么处理的了(找到Actor处理RpcInvocation消息)。

服务端需要继承RpcEndpoint类,并在构建的时候传递rpcService对象`

final RpcService rpcService = ...; // 通常通过 PekkoRpcServiceUtils 创建
final String endpointId = "myService";MyServiceEndpoint endpoint = new MyServiceEndpoint(rpcService, endpointId);
endpoint.start();  // 启动 RPC 服务端

查看RpcEndpoint的构造函数,可以看到利用rpcService对象启动了一个rpcServer
在这里插入图片描述

继续往下看前需要了解Actor是如何处理消息的:
​ActorSystem​​ 是Pekko应用的中央控制枢纽,作为单例容器管理所有Actor的层级结构和生命周期。当发送消息给远程Actor时,ActorSystem会自动将消息序列化并通过网络传输到目标节点,在远程节点反序列化后放入目标ActorMailbox队列,最终由目标节点的ActorSystem调度线程执行消息处理,整个过程对开发者完全透明,如同本地调用一般。

可以粗略的认为:一个Actor等同于一个Server端(轻量级),Actor内有一个队列,当有新的消息从客户端发送过来就放到该队列中。然后有一个线程不断从队列中取消息,然后调用该 Actor 的 createReceive() 所定义的行为处理消息。

了解的Actor是如何接收信息后,继续看PekkoRpcServicestartServer方法,其中调用下面的方法,通知另一个Actor来创建本RpcEndpoin对应的Actor
在这里插入图片描述

那么就要找出负责创建Actor的这个supervisor(Actor)在哪里,才能继续往下看了。

很容易就可以看到PekkoRpcService对象它的构造函数中调用下面的函数找到对应的Actor的具体类型
在这里插入图片描述

查看SupervisroActor类的createReceive()就可以看到真正创建actor的逻辑了

@Override  
public Receive createReceive() {  return receiveBuilder()  .match(StartRpcActor.class, this::createStartRpcActorMessage)  .matchAny(this::handleUnknownMessage)  .build();  
}

flink rpc框架中,所有RpcEndpoint对应的Actor的类型都是PekkoRpcActor, 只是名字不一样而已。在PekkoRpcActorCreateReceive()可以看到与Client发送过来的RPC消息相对应的处理逻辑。
在这里插入图片描述

在这里插入图片描述

通过反射调用方法,此处的rpcEndpoint就是我们继承了RcpEndpoint的对象
在这里插入图片描述

到此,我们就知道了服务端的业务方法是如何被调用了。

RpcService的作用

在前面介绍 Flink 中 Client 与 Server 如何工作的过程中,我们可以看到其底层是通过 Pekko实现远程通信的。但在调用流程中,业务代码中并没有直接与 ActorSystemActorRef 等 Pekko 原生类打交道。这是因为 Flink 通过一层抽象 RpcService优雅地屏蔽了底层通信实现的细节

// 1. 创建 RpcService(基于 Pekko 实现)
RpcService rpcService = ...;// 2. 实例化 Dispatcher(继承自 RpcEndpoint)
StandaloneDispatcher dispatcher = new StandaloneDispatcher(rpcService, ...);// 3. 注册服务端
DispatcherGateway dispatcherGateway = rpcService.startServer(dispatcher);// 4. 客户端连接(可在其他进程中执行)
rpcService.connect("pekko://flink@host:6123/user/dispatcher", DispatcherGateway.class);

如果没有 RpcService 这一层抽象,Flink 的组件(如 Dispatcher、JobMaster)之间想要通信,就必须直接操作 Pekko 的底层 API,比如:

  • 使用 ActorSystem 创建 ActorRef
  • 使用 tell()ask() 发送消息;
  • 管理消息序列化和远程地址;
  • 处理超时、线程调度等复杂细节。

这会导致:

  • Actor 概念侵入业务逻辑,开发就需要学习Actor相关的知识;
  • 接口强耦合通信实现,未来若切换通信框架非常困难;
  • 本地调用与远程调用流程不统一,维护复杂。

相关文章:

  • openKylin适配RISC-V高性能服务器芯片,携手睿思芯科共拓智算新蓝海
  • ROS学习之动作通信
  • LangChain4j入门学习项目
  • 解决Vue再浏览器的控制台中更新属性不生效
  • Zephyr boot
  • 电池自动点焊机:技术革新下的电池制造核心引擎
  • FastMCP框架进行MCP开发:(一)基础环境搭建及测试
  • 新生活的开启:从 Trae AI 离开后的三个月
  • 如何在 Windows 上实时显示键盘操作?
  • C++ 面向对象特性详解:继承机制
  • Oracle EBS R12.1.3无法打开WEBADI界面
  • WHAT - JavaScript bind vs call vs apply(包括在箭头函数中vs在普通函数中)
  • Windows 下 C++ 线程同步与异步有哪些方式
  • 优化 Python 爬虫性能:异步爬取新浪财经大数据
  • 苍穹外卖-2025 完成基础配置环节(详细图解)
  • Cursor Rules 的核心定位与作用 DevOps是
  • 代理 AI 时代的隐私重构:从边界控制到信任博弈
  • C#上位机通过WebApi访问WinCC
  • C++11 智能指针weak_ptr、shared_ptr与 unique_ptr
  • 跟着AI学习C# Day22
  • 动态网页的文件扩展名/企业网站推广优化
  • 西安市建设委员会的网站/seo页面排名优化
  • 公司网站开发怎么入账/网络推广代理
  • 网站建设流程知乎/seo排名优化软件免费
  • 做网站建设公司网易互客/搜索引擎的营销方法有哪些
  • 行业网站的特点/小学生摘抄新闻2024