当前位置: 首页 > news >正文

Flink-Yarn运行模式

 Yarn的部署过程

        Yarn上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器,在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群,Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源。

Flink抽象作业提交流程

  1. 一般情况下,由客户端(App)通过分发器提供的REST接口,将作业提交给JobManager;
  2. 由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster;
  3. JobMaster将JobMaster解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
  4. 资源管理器判断当前是否由足够的可用资源,如果没有,启动新的TaskManager;
  5. TaskManager启动后,向ResouceManager注册自己的可用任务槽(slots);
  6. TaskManager连接到对应的JobMaster,提供slots
  7. JobMaster将需要执行的任务分发给TaskManager
  8. TaskManager执行任务,互相之间交换数据

数据流图

        所有的Flink程序都可以归纳为三部分构成:Source、Transformation和Sink

        Source表示“源算子”,负责读取数据源

        Transformation表示“转换算子”,利用各种算子进行处理加工

        Sink表示“下沉算子”,负责数据的输出

Flink任务执行图

        按照生成数据可以分为四层:

        逻辑流图(StreamGraph)->作业图(JobGraph)->执行图(ExecutionGraph)->物理图(Physical Graph)

水位线

        用来衡量事件时间进展的标记,就被称为“水位线”

自定义水位线代码

周期性生成水位线

import com.shirun.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}

断点式水位线生成器        

public class PunctuatedGenerator implements WatermarkGenerator<Event> {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的itemId时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在onEvent方法中发射了水位线}
}

自定义数据源发送水位线

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;public class EmitWatermarkInSourceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).print();env.execute();}// 泛型是数据源中的类型public static class ClickSource implements SourceFunction<Event> {private boolean running = true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {Random random = new Random();String[] userArr = {"Mary", "Bob", "Alice"};String[] urlArr  = {"./home", "./cart", "./prod?id=1"};while (running) {long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳String username = userArr[random.nextInt(userArr.length)];String url      = urlArr[random.nextInt(urlArr.length)];Event event = new Event(username, url, currTs);// 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}}
}

相关文章:

  • C++ 中,派生类什么时候可以不定义构造函数,什么时候必须定义构造函数?
  • Flink 核心概念解析:流数据、并行处理与状态
  • TDengine 运维—容量规划
  • leetcode 25. Reverse Nodes in k-Group
  • 鸿蒙HarmonyOS最新的组件间通信的装饰器与状态组件详解
  • SpringMVC 通过ajax 实现文件的上传
  • 关于光谱相机的灵敏度
  • naive-ui切换主题
  • 实验分享|基于千眼狼sCMOS科学相机的流式细胞仪细胞核成像实验
  • 【marked与katex结合】渲染公式
  • Vue3 Element Plus el-table-column Sortable 排序失效
  • 2025最新obs31.0.x版本编译办法,windows系统
  • 数据湖和数据仓库的区别
  • ES的倒排索引和正排索引的区别及适用场景?为什么倒排索引适合全文搜索?
  • vue3 threejs 物体发光描边
  • 电力设备制造企业数字化转型路径研究:从生产优化到生态重构
  • WordPress_Madara 本地文件包含漏洞复现(CVE-2025-4524)
  • k8s-ServiceAccount 配置
  • GPT 等decoder系列常见的下游任务
  • Vite + Vue 工程中,为什么需要关注 `postcss.config.ts`?
  • 家里做网站买什么服务器好/做网络推广需要多少钱
  • 注册成立一个公司需要多少钱/福州百度seo
  • 动态网站开发加载图片/搜索引擎优化的流程
  • 一个新的网站怎么做SEO优化/近期时事新闻10条
  • 开封+网站建设+网络推广/南宁seo优化
  • 个人建什么样的网站好/爱站小工具圣经