当前位置: 首页 > news >正文

Flink TaskManager之间数据传输(NetworkManager)

NetworkManager可以看到只存在于TaskManager中,用于TaskManager之间的数据传输。是基于netty实现的。

初始化

NetworkManager是在TaskManager中运行的一个服务。所以会在TaskManager初始化过程中。入口是TaskManager的main方法。

NettyConnectionManager初始化

NettyConnectionManager 是 Flink 里管理网络连接的组件,借助 Netty 框架处理通信。它能初始化 Netty 服务器和客户端,管理缓冲区池,创建分区请求客户端工厂。可启动和关闭服务,创建与管理分区请求客户端,还能统计活动连接数量。

方法入口:

最终创建NettyConnectionManager

NettyConnectionManager创建流程图

主要变量:

  1. NettyServer server

作用:代表 Netty 服务器实例,负责监听网络端口,接收来自其他节点的连接请求,处理入站的网络数据。

  1. NettyClient client

作用:代表 Netty 客户端实例,用于主动连接到其他节点,发送请求并接收响应,处理出站的网络数据。

  1. NettyBufferPool bufferPool

作用:管理网络通信时所需的缓冲区,负责分配和回收直接内存缓冲区,提高内存使用效率,减少内存分配和回收的开销。

  1. PartitionRequestClientFactory partitionRequestClientFactory

作用:用于创建 PartitionRequestClient 实例,这些实例负责向远程节点请求结果分区数据,管理客户端连接的创建和复用。

  1. NettyProtocol nettyProtocol

作用:定义了 Netty 通信的协议,包含了如何处理请求和响应的逻辑,确保不同节点之间能够正确地进行数据交互。

netty服务端初始化

创建ServerBootstrap并初始化

优先使用epoll模式,其次使用nio

设置channel

可以看到channel的信息在NettyProtocol的getServerChannelHandlers方法获取。

可以看到设置了4个ChannelHandler。

各自功能如下:

  • messageEncoder:NettyMessage.NettyMessageEncoder 实例,负责将消息编码为适合网络传输的格式。
  • new NettyMessage.NettyMessageDecoder():NettyMessage.NettyMessageDecoder 实例,用于将接收到的网络数据解码为消息对象。
  • serverHandler:PartitionRequestServerHandler 实例,处理客户端的分区请求和任务事件请求。
  • queueOfPartitionQueues:PartitionRequestQueue 实例,管理分区请求队列,确保请求按顺序处理。

这些处理器共同构成了 Netty 服务器的处理链,实现消息的编码、解码、请求处理和队列管理。

启动server

netty客户端初始化

跟netty服务端类似。

创建Bootstrap并初始化

设置channel

client初始化的时候并没有设置channel,因为client此时并不知道要请求哪个server的数据,同时client也会请求多个server数据的情况。所以在需要请求server数据的时候才会进行channel设置。

client的channel信息也在NettyProtocol中。

包含三个处理器,各自功能如下:

  • messageEncoder:用于将消息编码为适合在网络中传输的格式,方便数据在网络上发送。
  • new NettyMessageClientDecoderDelegate(networkClientHandler):NettyMessageClientDecoderDelegate 实例,它是一个解码器委托类,借助传入的 networkClientHandler 来处理解码后的消息。
  • networkClientHandler:NetworkClientHandler 实例,负责处理客户端的网络请求和响应,是客户端网络操作的核心处理逻辑。

这些处理器共同构成了 Netty 客户端的处理链,实现消息的编码、解码和客户端网络操作的处理。

其中CreditBasedPartitionRequestClientHandler 通过信用机制实现了客户端与服务器之间的数据请求和传输控制,确保数据的高效、稳定传输,同时支持背压处理,避免系统过载。

绑定server

创建client

是调用NettyConnectionManager的createPartitionRequestClient来创建client,其中参数connectionId中包含了远端server的ip和端口。

首先对maxNumberOfConnections取余,创建一个新的connectionId。这个是为了复用tcp连接,将多个逻辑连接映射到少量的物理连接上,达到复用连接的作用,这样也便于节省资源,控制流量。

maxNumberOfConnections默认是1

connectWithRetries是重试连接server,创建client。

最后返回一个NettyPartitionRequestClient,这个是flink对nettyClient的高级封装,处理业务逻辑。

最后调用的是nettyClient.connect,设置channel,绑定server。

客户端请求数据

在上面客户端创建,最后返回NettyPartitionRequestClient实例。

请求数据是requestSubpartition方法,请求对应分区的数据。

首先在handler中注册inputChannel,再创建PartitionRequest,最后调用tcpChannel的writeAndFlush发送消息。

服务端处理数据

server处理PartitionRequest消息是在PartitionRequestServerHandler中。

生成reader用来读取数据,将reader注册到PartitionRequestQueue中。

PartitionRequestQueue负责写数据的是writeAndFlushNextMessageIfPossible。

循环获取已经注册的reader,从reader中获取buffer(包含读取的数据),创建BufferResponse并发送。要是reader后面还有数据就加回队列。

客户端接收数据

CreditBasedPartitionRequestClientHandler接收数据,用来处理BufferResponse消息。

获取当初注册的inputChannel,调用decodeBufferOrEvent来处理BufferResponse消息。

调用RemoteInputChannel的onBuffer方法,在onBuffer中将收到的buffer缓存到receivedBuffers中。后续从receivedBuffers读取数据。

读取receivedBuffers是从getNextBuffer方法进入。

相关文章:

  • MySQL进阶 面试速记
  • 基于 docker 的 LLaMA-Factory 全流程部署指南
  • 开发体育直播即时比分系统:赛事收藏功能的技术实现方案
  • vscode集成deepseek实现辅助编程(银河麒麟系统)【详细自用版】
  • Android学习总结之Kotlin 协程
  • presto行转列
  • Vulnhub-zico2靶机打靶记录
  • ML 聚类算法 dbscan|| OPTICS|mean-shift
  • C语言基础系列【32】指针进阶5:指针与常量
  • 上市电子制造企业如何实现合规的质量文件管理?
  • 0301-组件基础-react-仿低代码平台项目
  • 【AI微信小程序开发】AI减脂菜谱小程序项目代码:根据用户身高/体重等信息定制菜谱(含完整前端+后端代码)
  • 爱心计算器:用 Python 创建带有动态爱心效果的计算器
  • Ubuntu20.04安装OpenVINO环境以及YOLOv8 C++部署测试
  • Android里面内存优化
  • 【Redis】基础1:基本概念,基本数据结构
  • Git操作指南
  • Python数据类型-int
  • JavaScript基础-触屏事件
  • 加密钱包助记词丢失后的一天
  • 韩代总统李周浩履职
  • 杭州挂牌临平区两宗住宅用地,起始总价约11.02亿元
  • 王毅:时代不容倒退,公道自在人心
  • 邮储银行一季度净赚超252亿降逾2%,营收微降
  • 王毅:为改革完善全球治理作出金砖贡献
  • 大学男生被捉奸后将女生推下高楼?桂林理工大学辟谣