Flink-Yarn运行模式
Yarn的部署过程
Yarn上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器,在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群,Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源。
Flink抽象作业提交流程
- 一般情况下,由客户端(App)通过分发器提供的REST接口,将作业提交给JobManager;
- 由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster;
- JobMaster将JobMaster解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
- 资源管理器判断当前是否由足够的可用资源,如果没有,启动新的TaskManager;
- TaskManager启动后,向ResouceManager注册自己的可用任务槽(slots);
- TaskManager连接到对应的JobMaster,提供slots
- JobMaster将需要执行的任务分发给TaskManager
- TaskManager执行任务,互相之间交换数据
数据流图
所有的Flink程序都可以归纳为三部分构成:Source、Transformation和Sink
Source表示“源算子”,负责读取数据源
Transformation表示“转换算子”,利用各种算子进行处理加工
Sink表示“下沉算子”,负责数据的输出
Flink任务执行图
按照生成数据可以分为四层:
逻辑流图(StreamGraph)->作业图(JobGraph)->执行图(ExecutionGraph)->物理图(Physical Graph)
水位线
用来衡量事件时间进展的标记,就被称为“水位线”
自定义水位线代码
周期性生成水位线
import com.shirun.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
断点式水位线生成器
public class PunctuatedGenerator implements WatermarkGenerator<Event> {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的itemId时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在onEvent方法中发射了水位线}
}
自定义数据源发送水位线
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;public class EmitWatermarkInSourceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).print();env.execute();}// 泛型是数据源中的类型public static class ClickSource implements SourceFunction<Event> {private boolean running = true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {Random random = new Random();String[] userArr = {"Mary", "Bob", "Alice"};String[] urlArr = {"./home", "./cart", "./prod?id=1"};while (running) {long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳String username = userArr[random.nextInt(userArr.length)];String url = urlArr[random.nextInt(urlArr.length)];Event event = new Event(username, url, currTs);// 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}}
}