Flink面试题及详细答案100道(21-40)- 基础概念与架构
《前后端面试题
》专栏集合了前后端各个知识模块的面试题,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。
文章目录
- 一、本文面试题目录
- 21. Flink中常见的窗口类型有哪些(如滚动窗口、滑动窗口、会话窗口等)?各自的适用场景是什么?
- 22. 如何自定义Flink窗口的触发器(Trigger)和驱逐器(Evictor)?
- 自定义Trigger
- 自定义Evictor
- 23. 解释Flink的“KeyedStream”和“Non-KeyedStream”的区别,哪些算子需要基于KeyedStream?
- 24. Flink中的“Reduce”和“Aggregate”算子有何区别?分别适用于什么场景?
- Reduce算子
- Aggregate算子
- 25. 什么是Flink的“CoGroup”和“Join”算子?两者的实现原理有何不同?
- 26. Flink的“Connect”和“Union”算子有什么区别?
- 27. 如何使用Flink实现双流Join(如内连接、左连接)?需要注意哪些问题?
- 实现方式
- 注意事项
- 28. 解释Flink的“ProcessFunction”,它与普通算子相比有哪些优势?
- 29. Flink中“FlatMapFunction”“MapFunction”“FilterFunction”的区别是什么?
- 30. 如何实现Flink的“自定义Source”和“自定义Sink”?请举例说明。
- 自定义Source
- 自定义Sink
- 31. Flink的“Side Output”是什么?如何使用?
- 32. 什么是Flink的“分流”和“合流”?分别有哪些实现方式?
- 分流的实现方式
- 合流的实现方式
- 33. Flink中的“Broadcast State”是什么?适用于什么场景?
- 34. 如何使用Flink处理迟到数据?有哪些策略?
- 35. 解释Flink的“Window Join”和“Interval Join”的区别。
- 36. Flink的“KeyBy”算子的实现原理是什么?需要注意哪些数据类型问题?
- 实现原理
- 数据类型注意事项
- 37. 什么是Flink的“状态后端(State Backend)”?有哪几种类型,各有什么特点?
- 38. Flink中“RichFunction”与普通Function有何区别?何时需要使用RichFunction?
- 39. 如何在Flink中实现数据的“去重”操作?有哪些方法?
- 40. 解释Flink的“Cep(Complex Event Processing)”库,它能解决什么问题?
- 二、100道Flink 面试题目录列表
一、本文面试题目录
21. Flink中常见的窗口类型有哪些(如滚动窗口、滑动窗口、会话窗口等)?各自的适用场景是什么?
Flink中常见的窗口类型按划分方式可分为以下几类:
-
时间窗口(Time Window)
-
滚动时间窗口(Tumbling Time Window)
- 特点:窗口大小固定,无重叠,按时间间隔连续划分(如每10分钟一个窗口)。
- 适用场景:固定周期的统计分析(如每小时的订单总量、日活用户统计)。
-
滑动时间窗口(Sliding Time Window)
- 特点:窗口大小固定,但有重叠部分(如窗口大小30分钟,滑动步长10分钟)。
- 适用场景:需要高频更新统计结果的场景(如每10分钟更新一次最近30分钟的流量数据)。
-
会话窗口(Session Window)
- 特点:基于用户活动间隙划分,当一定时间内无新数据则窗口关闭(如会话超时时间30分钟)。
- 适用场景:用户行为分析(如电商网站的用户会话跟踪、APP的单次使用行为分析)。
-
-
计数窗口(Count Window)
-
滚动计数窗口(Tumbling Count Window)
- 特点:元素数量达到阈值时触发(如每100个元素一个窗口)。
- 适用场景:固定数量的批处理(如每收集1000条日志就进行一次分析)。
-
滑动计数窗口(Sliding Count Window)
- 特点:每新增N个元素滑动一次,窗口包含M个元素(M > N)。
- 适用场景:需要频繁更新统计结果的场景(如每新增10个元素,统计最近50个元素的平均值)。
-
示例:滚动和滑动窗口的实现
// 滚动时间窗口(10分钟)
dataStream.keyBy(...).timeWindow(Time.minutes(10)).sum(1);// 滑动时间窗口(30分钟窗口,10分钟滑动一次)
dataStream.keyBy(...).timeWindow(Time.minutes(30), Time.minutes(10)).sum(1);// 会话窗口(超时时间30分钟)
dataStream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.minutes(30))).sum(1);
22. 如何自定义Flink窗口的触发器(Trigger)和驱逐器(Evictor)?
触发器(Trigger) 决定窗口何时触发计算,驱逐器(Evictor) 决定触发后保留或移除窗口中的哪些元素。
自定义Trigger
需继承Trigger<IN, W extends Window>
,实现以下方法:
onElement()
:新元素进入窗口时调用onProcessingTime()
:处理时间定时器触发时调用onEventTime()
:事件时间定时器触发时调用clear()
:窗口关闭时清理资源
示例:自定义计数触发器(元素数达到5时触发)
public class CountTrigger extends Trigger<Tuple2<String, Integer>, TimeWindow> {private int count = 0;private final int threshold;public CountTrigger(int threshold) {this.threshold = threshold;}@Overridepublic TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) {count++;if (count >= threshold) {count = 0; // 重置计数return TriggerResult.FIRE_AND_PURGE; // 触发并清空窗口}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) {count = 0;}
}// 使用自定义触发器
dataStream.keyBy(0).timeWindow(Time.minutes(10)).trigger(new CountTrigger(5)) // 每5个元素触发一次.sum(1);
自定义Evictor
需继承Evictor<IN, W extends Window>
,实现evictBefore()
(触发前移除元素)和evictAfter()
(触发后移除元素)。
示例:保留窗口中最后3个元素
public class LastNEvictor extends Evictor<Tuple2<String, Integer>, TimeWindow> {private final int n;public LastNEvictor(int n) {this.n = n;}@Overridepublic void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements, int size, TimeWindow window, EvictorContext ctx) {// 触发前不做处理}@Overridepublic void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements, int size, TimeWindow window, EvictorContext ctx) {List<TimestampedValue<Tuple2<String, Integer>>> list = Lists.newArrayList(elements);if (list.size() > n) {// 移除前size-n个元素,保留最后n个for (int i = 0; i < list.size() - n; i++) {list.remove(0);}}}
}// 使用自定义驱逐器
dataStream.keyBy(0).timeWindow(Time.minutes(10)).evictor(new LastNEvictor(3)) // 保留最后3个元素.sum(1);
23. 解释Flink的“KeyedStream”和“Non-KeyedStream”的区别,哪些算子需要基于KeyedStream?
- KeyedStream:按指定Key分组后的数据流,相同Key的元素会被分配到同一个并行任务处理,支持状态隔离和按Key的聚合操作。
- Non-KeyedStream:未分组的数据流,所有元素在多个并行任务间随机分配,不支持按Key的状态管理。
两者核心区别:
特性 | KeyedStream | Non-KeyedStream |
---|---|---|
状态管理 | 支持按Key隔离的状态 | 仅支持算子级别的全局状态 |
并行处理 | 相同Key的元素在同一任务处理 | 元素随机分配到不同任务 |
聚合操作 | 支持(如sum、reduce) | 需通过windowAll等算子(并行度1) |
时间特性 | 支持基于Key的窗口和定时器 | 窗口操作并行度为1 |
需要基于KeyedStream的算子:
- 状态相关算子:
mapWithState
、flatMapWithState
- 聚合算子:
reduce
、sum
、min
、max
、aggregate
- 窗口算子:
window
(非windowAll)、countWindow
- 定时器相关:
ProcessFunction
中的registerEventTimeTimer
示例:KeyedStream与Non-KeyedStream的使用
DataStream<Tuple2<String, Integer>> dataStream = ...;// KeyedStream:按第一个字段分组
KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(t -> t.f0);
keyedStream.sum(1).print(); // 支持按Key聚合// Non-KeyedStream:全局聚合(并行度为1)
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5))).sum(1).print();
24. Flink中的“Reduce”和“Aggregate”算子有何区别?分别适用于什么场景?
Reduce和Aggregate都是Flink中的聚合算子,但适用场景和功能不同:
特性 | Reduce | Aggregate |
---|---|---|
输入输出类型 | 必须相同(IN → IN) | 可不同(IN → ACC → OUT) |
中间状态 | 仅保留上一次聚合结果 | 可自定义累加器(Accumulator) |
灵活性 | 较低,仅支持二元操作 | 较高,支持复杂聚合逻辑 |
适用场景 | 简单聚合(如求和、求最值) | 复杂聚合(如平均值、加权求和) |
Reduce算子
接收两个相同类型的元素,返回一个同类型的元素,适合简单的累加操作。
示例:使用Reduce计算总和
DataStream<Tuple2<String, Integer>> dataStream = ...;dataStream.keyBy(t -> t.f0).timeWindow(Time.minutes(5)).reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1)) // 累加第二个字段.print();
Aggregate算子
通过自定义累加器实现复杂聚合,包含三个阶段:
createAccumulator()
:初始化累加器add()
:将元素添加到累加器getResult()
:从累加器提取结果merge()
:合并多个累加器(用于窗口合并)
示例:使用Aggregate计算平均值
// 自定义AggregateFunction:计算平均值
public class AverageAggregate implements AggregateFunction<Tuple2<String, Integer>, // 输入类型Tuple2<Integer, Integer>, // 累加器类型(总和,计数)Tuple2<String, Double>> { // 输出类型(Key,平均值)private String key;@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return new Tuple2<>(0, 0); // (sum, count)}@Overridepublic Tuple2<Integer, Integer> add(Tuple2<String, Integer> value, Tuple2<Integer, Integer> accumulator) {this.key = value.f0;return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);}@Overridepublic Tuple2<String, Double> getResult(Tuple2<Integer, Integer> accumulator) {return new Tuple2<>(key, (double) accumulator.f0 / accumulator.f1);}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}// 使用Aggregate算子
dataStream.keyBy(t -> t.f0).timeWindow(Time.minutes(5)).aggregate(new AverageAggregate()).print();
25. 什么是Flink的“CoGroup”和“Join”算子?两者的实现原理有何不同?
CoGroup和Join都是用于关联两个数据流的算子,但实现方式和功能不同:
- Join算子:基于指定条件关联两个流,仅输出满足条件的匹配对,类似SQL的内连接。
- CoGroup算子:将两个流中满足条件的元素分组后进行联合处理,可实现内连接、左连接、右连接等,灵活性更高。
实现原理区别:
-
Join:
- 将两个流的数据按Key分组并放入窗口。
- 对窗口内的元素进行笛卡尔积,过滤出满足条件的记录。
- 仅输出匹配的记录对。
-
CoGroup:
- 同样按Key分组并放入窗口。
- 将两个流中相同Key的元素分别收集到两个迭代器中。
- 提供自定义逻辑处理这两个迭代器(可输出匹配或不匹配的记录)。
示例:Join与CoGroup的对比
DataStream<Tuple2<String, Integer>> stream1 = ...; // (name, age)
DataStream<Tuple2<String, String>> stream2 = ...; // (name, city)// Join算子:内连接(仅输出匹配的记录)
stream1.join(stream2).where(t1 -> t1.f0) // stream1的Key.equalTo(t2 -> t2.f0) // stream2的Key.window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1)) // (name, age, city).print();// CoGroup算子:左连接(输出所有stream1的记录,匹配不到则补null)
stream1.coGroup(stream2).where(t1 -> t1.f0).equalTo(t2 -> t2.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((iter1, iter2, out) -> {List<Tuple2<String, String>> list2 = Lists.newArrayList(iter2);for (Tuple2<String, Integer> t1 : iter1) {if (list2.isEmpty()) {out.collect(new Tuple3<>(t1.f0, t1.f1, null));} else {for (Tuple2<String, String> t2 : list2) {out.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}}}}).print();
26. Flink的“Connect”和“Union”算子有什么区别?
Connect和Union都用于合并数据流,但适用场景和功能有显著区别:
特性 | Connect | Union |
---|---|---|
输入流类型 | 可处理不同类型的两个流 | 必须处理相同类型的多个流 |
输出流类型 | 保留两个流的类型(ConnectedStream) | 与输入流类型一致 |
合并逻辑 | 可对两个流应用不同处理逻辑 | 仅简单合并,对所有流处理逻辑相同 |
并行度 | 保持原有两个流的并行度 | 总并行度为所有输入流并行度之和 |
支持流数量 | 仅支持两个流 | 支持多个流(>=2) |
示例:Connect与Union的对比
// 不同类型的流
DataStream<String> streamA = ...;
DataStream<Integer> streamB = ...;
DataStream<String> streamC = ...;// Connect:合并不同类型的流
ConnectedStreams<String, Integer> connectedStream = streamA.connect(streamB);
DataStream<Object> result = connectedStream.map(str -> str.toUpperCase(), // 处理streamAnum -> num * 2 // 处理streamB);// Union:合并相同类型的流(streamA和streamC都是String类型)
DataStream<String> unionStream = streamA.union(streamC);
unionStream.map(str -> str.toLowerCase()).print();
27. 如何使用Flink实现双流Join(如内连接、左连接)?需要注意哪些问题?
Flink中双流Join通常基于窗口实现,确保数据在有限时间范围内关联。常见类型包括内连接、左连接和右连接。
实现方式
- 内连接(Inner Join):仅输出两个流中匹配的记录。
- 左连接(Left Join):输出左流所有记录,右流匹配不到则补空。
- 右连接(Right Join):输出右流所有记录,左流匹配不到则补空。
示例:基于窗口的双流Join
// 定义两个流
DataStream<Order> orders = ...; // 订单流 (orderId, userId, amount, timestamp)
DataStream<User> users = ...; // 用户流 (userId, name, timestamp)// 1. 内连接
orders.join(users).where(Order::getUserId).equalTo(User::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((order, user) -> new OrderWithUserName(order.getOrderId(),user.getName(),order.getAmount())).print("Inner Join Result:");// 2. 左连接(使用CoGroup实现)
orders.coGroup(users).where(Order::getUserId).equalTo(User::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((ordersIter, usersIter, out) -> {List<User> userList = Lists.newArrayList(usersIter);for (Order order : ordersIter) {if (userList.isEmpty()) {// 左流记录,无匹配的右流记录out.collect(new OrderWithUserName(order.getOrderId(), "Unknown", order.getAmount()));} else {for (User user : userList) {out.collect(new OrderWithUserName(order.getOrderId(), user.getName(), order.getAmount()));}}}}).print("Left Join Result:");
注意事项
- 窗口大小选择:窗口过小可能导致匹配率低,过大则增加状态存储压力。
- 数据乱序处理:需合理设置Watermark延迟时间,确保关联数据能在窗口内到达。
- 状态管理:Join会在窗口内缓存大量数据,需配置合适的状态后端(如RocksDB)。
- 数据倾斜:若某类Key的数据过多,会导致该窗口处理压力过大,需提前进行数据均衡。
- 性能优化:可通过预过滤减少窗口内数据量,或使用Interval Join替代窗口Join优化性能。
28. 解释Flink的“ProcessFunction”,它与普通算子相比有哪些优势?
ProcessFunction是Flink中最灵活的处理算子,允许访问事件时间、处理时间、状态和定时器,是实现复杂业务逻辑的核心工具。
与普通算子(如map、filter)相比,其优势在于:
- 访问时间特性:可直接获取事件时间和处理时间。
- 状态管理:支持Keyed State和定时器状态。
- 定时器功能:可注册事件时间或处理时间定时器,实现延迟操作。
- 侧输出流:可将不同类型的数据发送到侧输出流,实现分流。
- 细粒度控制:可自定义数据处理逻辑,适合复杂业务场景。
ProcessFunction家族包括:
ProcessFunction
:处理Non-KeyedStreamKeyedProcessFunction
:处理KeyedStream(最常用)CoProcessFunction
:处理ConnectedStreamsProcessWindowFunction
:窗口处理的增强版
示例:使用KeyedProcessFunction实现超时检测
// 检测用户会话超时(5秒内无活动则输出超时信息)
public class SessionTimeoutProcessFunction extends KeyedProcessFunction<String, UserEvent, String> {private transient ValueState<Long> lastActivityTimeState;@Overridepublic void open(Configuration parameters) {// 初始化状态:存储最后活动时间ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("lastActivity", Long.class);lastActivityTimeState = getRuntimeContext().getState(descriptor);}@Overridepublic void processElement(UserEvent event, Context ctx, Collector<String> out) throws Exception {Long lastTime = lastActivityTimeState.value();long currentTime = ctx.timestamp(); // 获取事件时间if (lastTime == null) {// 首次活动,注册5秒后超时定时器ctx.timerService().registerEventTimeTimer(currentTime + 5000);} else {// 有新活动,删除旧定时器,注册新定时器ctx.timerService().deleteEventTimeTimer(lastTime + 5000);ctx.timerService().registerEventTimeTimer(currentTime + 5000);}// 更新最后活动时间lastActivityTimeState.update(currentTime);out.collect(event.getUserId() + " 活动时间: " + new Date(currentTime));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 定时器触发,输出超时信息out.collect(ctx.getCurrentKey() + " 会话超时! 最后活动时间: " + new Date(lastActivityTimeState.value()));lastActivityTimeState.clear();}
}// 使用ProcessFunction
DataStream<UserEvent> userEvents = ...;
userEvents.keyBy(UserEvent::getUserId).process(new SessionTimeoutProcessFunction()).print();
29. Flink中“FlatMapFunction”“MapFunction”“FilterFunction”的区别是什么?
这三个都是Flink的基础转换算子,但功能和适用场景不同:
算子 | 输入输出关系 | 功能描述 | 适用场景 |
---|---|---|---|
MapFunction | 一对一(1:1) | 将一个元素转换为另一个元素 | 简单转换(如类型转换、字段提取) |
FlatMapFunction | 一对多(1:N,N≥0) | 将一个元素转换为零个、一个或多个元素 | 拆分数据(如字符串分割、JSON数组解析) |
FilterFunction | 一对一或零(1:0或1:1) | 根据条件保留或过滤元素 | 数据筛选(如保留符合条件的记录) |
示例:三种算子的对比
DataStream<String> input = env.fromElements("apple,banana", "orange", "grape,melon");// MapFunction:将字符串转换为长度
DataStream<Integer> mapResult = input.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return value.length();}
});// FlatMapFunction:将逗号分隔的字符串拆分为单个单词
DataStream<String> flatMapResult = input.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {for (String word : value.split(",")) {out.collect(word);}}
});// FilterFunction:保留长度大于5的单词
DataStream<String> filterResult = flatMapResult.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) {return value.length() > 5;}
});// 输出结果:
// mapResult: 10, 6, 10
// flatMapResult: apple, banana, orange, grape, melon
// filterResult: banana, orange, melon
30. 如何实现Flink的“自定义Source”和“自定义Sink”?请举例说明。
Flink允许通过自定义Source和Sink连接到特定的数据存储或服务,满足个性化的数据输入输出需求。
自定义Source
需实现SourceFunction
(非并行)或ParallelSourceFunction
(并行),重写run()
和cancel()
方法。
示例:自定义并行Source(生成随机整数)
public class RandomNumberSource implements ParallelSourceFunction<Integer> {private boolean isRunning = true;private Random random = new Random();@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (isRunning) {// 生成1-100的随机整数int num = random.nextInt(100) + 1;ctx.collect(num);Thread.sleep(100); // 每100ms生成一个}}@Overridepublic void cancel() {isRunning = false;}
}// 使用自定义Source
DataStream<Integer> randomNumbers = env.addSource(new RandomNumberSource()).setParallelism(2);
randomNumbers.print();
自定义Sink
需实现SinkFunction
,重写invoke()
方法处理输出数据,或使用更灵活的RichSinkFunction
(可访问生命周期方法)。
示例:自定义Sink(将数据写入文件)
public class FileSink extends RichSinkFunction<Integer> {private FileWriter writer;private String filePath;public FileSink(String filePath) {this.filePath = filePath;}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化文件写入器(每个并行实例写入不同文件)int index = getRuntimeContext().getIndexOfThisSubtask();writer = new FileWriter(filePath + "_" + index + ".txt", true);}@Overridepublic void invoke(Integer value, Context context) throws Exception {// 写入数据writer.write(value.toString() + "\n");writer.flush();}@Overridepublic void close() throws Exception {// 关闭资源if (writer != null) {writer.close();}}
}// 使用自定义Sink
randomNumbers.addSink(new FileSink("/tmp/random_numbers")).setParallelism(2);
31. Flink的“Side Output”是什么?如何使用?
Side Output(侧输出流)是Flink中处理分流的机制,允许从一个算子中输出多种类型的数据到不同的流中,解决了普通算子只能输出单一类型数据流的限制。
使用场景:
- 分离正常数据和异常数据(如格式错误的记录)
- 按不同条件拆分数据流(如按优先级划分)
- 提取特定特征的数据(如超过阈值的异常值)
使用步骤:
- 定义
OutputTag
标识侧输出流(需指定类型)。 - 在算子中使用
Context.output()
将数据发送到侧输出流。 - 通过
getSideOutput()
获取侧输出流。
示例:使用Side Output分离正常和异常数据
// 1. 定义侧输出流标签(异常数据)
OutputTag<String> errorTag = new OutputTag<String>("error-data") {};// 2. 主处理流程(分离正常和异常数据)
SingleOutputStreamOperator<Integer> mainStream = inputStream.process(new ProcessFunction<String, Integer>() {@Overridepublic void processElement(String value, Context ctx, Collector<Integer> out) {try {// 尝试将字符串转换为整数(正常数据)int num = Integer.parseInt(value);out.collect(num);} catch (NumberFormatException e) {// 转换失败,发送到侧输出流(异常数据)ctx.output(errorTag, "Invalid number: " + value);}}});// 3. 获取主输出流和侧输出流
mainStream.print("Normal Data:");
DataStream<String> errorStream = mainStream.getSideOutput(errorTag);
errorStream.print("Error Data:");
32. 什么是Flink的“分流”和“合流”?分别有哪些实现方式?
- 分流:将一个数据流拆分为多个子数据流,每个子数据流包含不同特征的数据。
- 合流:将多个数据流合并为一个数据流,实现数据的汇总处理。
分流的实现方式
- Side Output:最常用方式,支持输出多种类型数据(见31题)。
- Filter:对原始流多次应用filter算子,得到不同子流(效率较低,数据会被多次处理)。
示例:使用Filter分流
DataStream<Integer> stream = ...;// 分流为偶数流和奇数流
DataStream<Integer> evenStream = stream.filter(num -> num % 2 == 0);
DataStream<Integer> oddStream = stream.filter(num -> num % 2 != 0);
合流的实现方式
- Union:合并多个同类型数据流(见26题)。
- Connect:合并两个不同类型数据流(见26题)。
- Join/CoGroup:基于Key关联两个数据流(见25题)。
示例:多种合流方式对比
DataStream<String> streamA = ...;
DataStream<String> streamB = ...;
DataStream<Integer> streamC = ...;// Union:合并同类型流
DataStream<String> unionStream = streamA.union(streamB);// Connect:合并不同类型流
ConnectedStreams<String, Integer> connectedStream = streamA.connect(streamC);// Join:基于Key合并
streamA.map(str -> new Tuple2<>(str, 1)).join(streamB.map(str -> new Tuple2<>(str, 2))).where(t -> t.f0).equalTo(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((t1, t2) -> t1.f0 + ": " + t1.f1 + "," + t2.f1);
33. Flink中的“Broadcast State”是什么?适用于什么场景?
Broadcast State是Flink中用于在多个并行任务间共享只读数据的机制,允许将一个流(广播流)的数据广播到另一个流(非广播流)的所有并行实例中,实现动态配置更新等功能。
核心特性:
- 广播流(BroadcastStream):包含需共享的数据(如配置、规则),会被发送到所有并行任务。
- 非广播流:普通数据流,每个并行任务可访问广播流中的完整数据。
- 只读性:非广播流的任务只能读取广播状态,不能修改(保证一致性)。
适用场景:
- 动态规则更新(如实时风控规则调整)
- 配置参数下发(如动态修改阈值)
- 小表关联(如将字典表广播到所有任务)
示例:使用Broadcast State实现动态过滤
// 1. 定义广播数据(过滤规则)和普通数据(待处理记录)
DataStream<FilterRule> ruleStream = ...; // 规则流:(keyword, action)
DataStream<String> dataStream = ...; // 数据记录流// 2. 定义广播状态描述符
MapStateDescriptor<String, FilterRule> ruleStateDescriptor = new MapStateDescriptor<>("filterRules",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<FilterRule>() {})
);// 3. 广播规则流
BroadcastStream<FilterRule> broadcastRuleStream = ruleStream.broadcast(ruleStateDescriptor);// 4. 连接广播流和普通流,应用动态规则
DataStream<String> resultStream = dataStream.connect(broadcastRuleStream).process(new BroadcastProcessFunction<String, FilterRule, String>() {@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 读取广播状态中的规则Iterable<Map.Entry<String, FilterRule>> rules = ctx.getBroadcastState(ruleStateDescriptor).entries();boolean shouldFilter = false;for (Map.Entry<String, FilterRule> rule : rules) {if (value.contains(rule.getKey()) && rule.getValue().getAction().equals("block")) {shouldFilter = true;break;}}if (!shouldFilter) {out.collect(value);}}@Overridepublic void processBroadcastElement(FilterRule rule, Context ctx, Collector<String> out) throws Exception {// 更新广播状态ctx.getBroadcastState(ruleStateDescriptor).put(rule.getKeyword(), rule);}});
34. 如何使用Flink处理迟到数据?有哪些策略?
迟到数据指在Watermark已超过窗口结束时间后到达的数据。Flink提供多种处理策略:
-
允许窗口延迟关闭(Allowed Lateness)
- 为窗口设置延迟时间,窗口在结束时间后继续等待该时长,期间接收的迟到数据仍会被处理。
- 适用于大部分迟到数据在短时间内到达的场景。
-
侧输出流(Side Output)
- 将超过允许延迟时间的迟到数据发送到侧输出流,单独处理。
- 适用于需要完整保留迟到数据并后续分析的场景。
-
调整Watermark生成策略
- 增大Watermark的延迟时间(如
BoundedOutOfOrdernessTimestampExtractor
的参数),给迟到数据更多到达时间。 - 适用于数据乱序程度较高的场景,但会增加处理延迟。
- 增大Watermark的延迟时间(如
-
使用会话窗口(Session Window)
- 基于活动间隙划分窗口,避免固定窗口对迟到数据的严格限制。
- 适用于用户行为等非周期性数据。
示例:综合处理迟到数据
// 1. 定义事件时间和Watermark(允许5秒乱序)
DataStream<Event> stream = ...;
DataStream<Event> withWatermark = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {@Overridepublic long extractTimestamp(Event element) {return element.getTimestamp();}}
);// 2. 定义侧输出流标签(接收迟到数据)
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data") {};// 3. 窗口处理(允许3秒延迟关闭)
SingleOutputStreamOperator<Result> result = withWatermark.keyBy(Event::getKey).timeWindow(Time.minutes(10)).allowedLateness(Time.seconds(3)) // 窗口额外等待3秒.sideOutputLateData(lateDataTag) // 超过延迟时间的数据进入侧输出.apply(new WindowFunction<Event, Result, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<Event> input, Collector<Result> out) {// 窗口处理逻辑}});// 4. 处理侧输出流中的迟到数据
DataStream<Event> lateData = result.getSideOutput(lateDataTag);
lateData.print("Late Data:");
35. 解释Flink的“Window Join”和“Interval Join”的区别。
Window Join和Interval Join都是Flink中用于关联两个数据流的算子,但适用场景和实现方式不同:
特性 | Window Join | Interval Join |
---|---|---|
时间范围定义 | 基于固定窗口(如每10分钟) | 基于左流元素时间的相对区间(如[-5, +5]秒) |
数据关联方式 | 窗口内两个流的元素笛卡尔积匹配 | 右流元素时间在左流元素时间区间内则匹配 |
状态存储 | 缓存窗口内所有元素 | 仅缓存右流在时间区间内的元素 |
适用场景 | 周期性批量关联 | 实时性要求高的连续关联 |
性能 | 窗口结束时集中处理,可能有延迟 | 元素到达即处理,实时性更好 |
示例:Window Join与Interval Join对比
DataStream<Order> orders = ...; // (orderId, userId, timestamp)
DataStream<Payment> payments = ...; // (paymentId, orderId, timestamp)// Window Join:10分钟窗口内关联订单和支付
orders.join(payments).where(Order::getOrderId).equalTo(Payment::getOrderId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((order, payment) -> new OrderPayment(order, payment)).print("Window Join:");// Interval Join:支付时间在订单时间的[-5, +10]秒区间内
orders.keyBy(Order::getOrderId).intervalJoin(payments.keyBy(Payment::getOrderId)).between(Time.seconds(-5), Time.seconds(10)) // 支付可在订单前5秒到后10秒.process(new ProcessJoinFunction<Order, Payment, OrderPayment>() {@Overridepublic void processElement(Order order, Payment payment, Context ctx, Collector<OrderPayment> out) {out.collect(new OrderPayment(order, payment));}}).print("Interval Join:");
36. Flink的“KeyBy”算子的实现原理是什么?需要注意哪些数据类型问题?
KeyBy算子通过将数据流按指定Key分组,使相同Key的元素被分配到同一个并行任务处理,是实现状态隔离和聚合操作的基础。
实现原理
- Key提取:根据用户指定的Key选择器(如字段索引、Lambda表达式)从元素中提取Key。
- 哈希分区:对提取的Key计算哈希值,然后通过
hashCode % 并行度
确定目标并行任务索引。 - 数据重分区:将元素发送到目标任务,确保相同Key的元素进入同一任务。
数据类型注意事项
- 不可变类型:Key应使用不可变类型(如String、Integer、Tuple),避免Key值在处理过程中被修改导致哈希不一致。
- POJO类型:若使用POJO作为Key,需确保:
- 重写
hashCode()
和equals()
方法,保证哈希一致性。 - 字段不可变或修改后需重新KeyBy(否则可能导致相同Key的元素分到不同任务)。
- 重写
- 不支持的类型:
- 基本类型数组(如int[]):KeyBy会按引用而非内容计算哈希,需转为List或自定义类型。
- 某些特殊类型(如ArrayList、HashMap):其
hashCode()
可能随内容变化,不适合作为Key。
示例:正确使用KeyBy的方式
// 1. 使用Tuple的字段作为Key(推荐)
DataStream<Tuple2<String, Integer>> stream = ...;
KeyedStream<Tuple2<String, Integer>, String> keyedByField = stream.keyBy(t -> t.f0);// 2. 使用POJO作为Key(需重写hashCode和equals)
public class UserKey {private String id;private String name;@Overridepublic int hashCode() {return Objects.hash(id, name); // 基于id和name计算哈希}@Overridepublic boolean equals(Object obj) {if (this == obj) return true;if (obj == null || getClass() != obj.getClass()) return false;UserKey other = (UserKey) obj;return Objects.equals(id, other.id) && Objects.equals(name, other.name);}
}DataStream<User> userStream = ...;
KeyedStream<User, UserKey> keyedByPojo = userStream.keyBy(user -> new UserKey(user.getId(), user.getName())
);
37. 什么是Flink的“状态后端(State Backend)”?有哪几种类型,各有什么特点?
状态后端(State Backend) 是Flink中负责管理状态存储、Checkpoint和恢复的组件,决定了状态的存储位置和格式。
Flink提供三种核心状态后端:
-
MemoryStateBackend
- 存储位置:JVM堆内存
- 特点:
- 读写速度快(内存操作)
- 状态大小受限于JVM内存,不适合大规模状态
- Checkpoint数据存储在JobManager内存中,可靠性低
- 适用场景:开发测试、无状态或小状态作业
-
FsStateBackend
- 存储位置:
- 工作状态:TaskManager堆内存
- Checkpoint:文件系统(本地文件、HDFS等)
- 特点:
- 工作状态仍在内存,读写较快
- Checkpoint持久化到文件系统,可靠性高
- 状态大小受限于TaskManager内存,适合中等规模状态
- 适用场景:生产环境中的中小规模状态作业
- 存储位置:
-
RocksDBStateBackend
- 存储位置:
- 工作状态:TaskManager本地的RocksDB(嵌入式KV数据库,磁盘存储)
- Checkpoint:文件系统
- 特点:
- 状态存储在磁盘,支持大规模状态(TB级)
- 读写速度较内存慢,但通过内存缓存优化
- 支持状态增量Checkpoint,减少IO开销
- 适用场景:生产环境中的大规模状态作业(如长时间运行的有状态流处理)
- 存储位置:
配置方式:
// 代码中配置
env.setStateBackend(new MemoryStateBackend());
// 或
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
// 或
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));// 配置文件中配置(flink-conf.yaml)
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
38. Flink中“RichFunction”与普通Function有何区别?何时需要使用RichFunction?
RichFunction是Flink中带生命周期管理的函数接口,继承自普通Function,提供了更多高级功能。
与普通Function的区别:
特性 | RichFunction | 普通Function |
---|---|---|
生命周期方法 | 提供open()、close()等方法 | 无生命周期方法 |
状态访问 | 可通过getRuntimeContext()访问状态 | 不支持状态访问 |
并行度信息 | 可获取任务索引、并行度等信息 | 无法获取执行环境信息 |
配置参数 | 可通过open()方法获取配置参数 | 无配置参数访问能力 |
需要使用RichFunction的场景:
- 访问状态:需要使用Keyed State或Operator State时。
- 资源管理:需要在函数初始化时创建资源(如数据库连接),并在结束时释放。
- 获取执行信息:需要获取并行度、任务索引等运行时信息时。
- 配置参数传递:需要从外部接收配置参数时。
示例:使用RichMapFunction管理数据库连接
public class DatabaseRichMapFunction extends RichMapFunction<String, Result> {private Connection conn;private PreparedStatement stmt;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化资源:创建数据库连接super.open(parameters);String url = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("db.url");conn = DriverManager.getConnection(url);stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?");}@Overridepublic Result map(String userId) throws Exception {// 使用数据库连接查询数据stmt.setString(1, userId);ResultSet rs = stmt.executeQuery();if (rs.next()) {return new Result(rs.getString("id"), rs.getString("name"));}return new Result(userId, "Unknown");}@Overridepublic void close() throws Exception {// 释放资源if (stmt != null) stmt.close();if (conn != null) conn.close();super.close();}
}// 使用RichFunction
Configuration config = new Configuration();
config.setString("db.url", "jdbc:mysql://localhost:3306/test");
env.getConfig().setGlobalJobParameters(config);DataStream<String> userIds = ...;
userIds.map(new DatabaseRichMapFunction()).print();
39. 如何在Flink中实现数据的“去重”操作?有哪些方法?
Flink中数据去重指移除流中重复的元素,确保每个元素只被处理一次。常用方法包括:
-
基于状态的去重
- 原理:使用
ValueState
或ListState
存储已处理的唯一标识(如ID),新元素到来时先检查状态,不存在则处理并更新状态。 - 适用场景:有唯一标识的流数据,需精确去重。
示例:基于状态去重
public class DeduplicationFunction extends KeyedProcessFunction<String, Event, Event> {private transient ValueState<Boolean> seenState;@Overridepublic void open(Configuration parameters) {seenState = getRuntimeContext().getState(new ValueStateDescriptor<>("seen", Boolean.class));}@Overridepublic void processElement(Event event, Context ctx, Collector<Event> out) throws Exception {// 检查是否已处理过该事件(按eventId去重)if (seenState.value() == null) {out.collect(event); // 输出未重复的事件seenState.update(true); // 标记为已处理// 设置状态过期时间(避免状态无限增长)ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 86400000); // 24小时后过期}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {// 清除过期状态seenState.clear();} }// 使用去重函数(按eventId分组) DataStream<Event> stream = ...; stream.keyBy(Event::getEventId).process(new DeduplicationFunction()).print();
- 原理:使用
-
基于窗口的去重
- 原理:在窗口内使用
distinct()
方法去重,只保证窗口内数据不重复。 - 适用场景:允许一定时间范围内的重复,无需全局去重。
示例:窗口内去重
stream.keyBy(...).timeWindow(Time.hours(1)).distinct("eventId") // 按eventId字段去重.print();
- 原理:在窗口内使用
-
基于外部系统的去重
- 原理:利用外部KV存储(如Redis)记录已处理的ID,新元素到来时先查询外部系统。
- 适用场景:状态过大无法本地存储,或需要跨作业去重。
示例:基于Redis去重
public class RedisDeduplicationFunction extends RichMapFunction<Event, Event> {private Jedis jedis;private String redisHost;@Overridepublic void open(Configuration parameters) {redisHost = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("redis.host");jedis = new Jedis(redisHost);}@Overridepublic Event map(Event event) {String key = "seen:" + event.getEventId();if (jedis.set(key, "1", "NX", "EX", 86400) != null) {// 不存在该key,返回事件return event;} else {// 已存在,返回null(后续可过滤)return null;}}@Overridepublic void close() {jedis.close();} }// 使用Redis去重 stream.map(new RedisDeduplicationFunction()).filter(Objects::nonNull) // 过滤重复元素.print();
40. 解释Flink的“Cep(Complex Event Processing)”库,它能解决什么问题?
Flink CEP是Flink的复杂事件处理库,用于从连续的数据流中检测特定的事件模式(如序列、聚合、条件组合等),并触发相应动作。
核心概念:
- 事件(Event):流中的单个数据记录。
- 模式(Pattern):定义需要检测的事件序列或条件(如"A followed by B within 10 seconds")。
- 模式匹配(Pattern Matching):CEP引擎按模式规则扫描数据流,识别符合条件的事件序列。
- 结果处理:对匹配到的事件序列进行处理(如告警、聚合)。
能解决的问题:
- 实时监控与告警:如检测异常登录序列(多次失败后成功)。
- 业务流程分析:如监测电商下单-支付-发货的完整流程,识别异常中断。
- 用户行为分析:如检测用户在APP中的特定操作路径(浏览-加购-购买)。
- 故障诊断:如根据系统日志序列诊断故障原因。
示例:使用Flink CEP检测异常登录模式
// 1. 定义事件类型
public class LoginEvent {private String userId;private String ip;private boolean success;private long timestamp;// getters and setters
}// 2. 定义模式:3次失败登录后跟随1次成功登录
Pattern<LoginEvent, ?> loginPattern = Pattern.<LoginEvent>begin("firstFail").where(event -> !event.isSuccess()) // 第一次失败.next("secondFail").where(event -> !event.isSuccess()) // 第二次失败.next("thirdFail").where(event -> !event.isSuccess()) // 第三次失败.next("success").where(event -> event.isSuccess()) // 成功登录.within(Time.minutes(5)); // 5分钟内完成// 3. 将模式应用到数据流
DataStream<LoginEvent> loginStream = ...;
PatternStream<LoginEvent> patternStream = CEP.pattern(loginStream.keyBy(LoginEvent::getUserId), // 按用户ID分组loginPattern
);// 4. 处理匹配结果(输出告警)
DataStream<String> alertStream = patternStream.select(pattern -> {LoginEvent firstFail = pattern.get("firstFail").get(0);LoginEvent success = pattern.get("success").get(0);return "异常登录告警: 用户 " + firstFail.getUserId() + " 在 " + firstFail.getTimestamp() + " 到 " + success.getTimestamp() + " 期间经历3次失败后登录成功,IP: " + success.getIp();
});alertStream.print();
二、100道Flink 面试题目录列表
文章序号 | Flink 100道 |
---|---|
1 | Flink面试题及详细答案100道(01-20) |
2 | Flink面试题及详细答案100道(21-40) |
3 | Flink面试题及详细答案100道(41-60) |
4 | Flink面试题及详细答案100道(61-80) |
5 | Flink面试题及详细答案100道(81-100) |