当前位置: 首页 > news >正文

Flink面试题及详细答案100道(21-40)- 基础概念与架构

前后端面试题》专栏集合了前后端各个知识模块的面试题,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。

前后端面试题-专栏总目录

在这里插入图片描述

文章目录

  • 一、本文面试题目录
      • 21. Flink中常见的窗口类型有哪些(如滚动窗口、滑动窗口、会话窗口等)?各自的适用场景是什么?
      • 22. 如何自定义Flink窗口的触发器(Trigger)和驱逐器(Evictor)?
        • 自定义Trigger
        • 自定义Evictor
      • 23. 解释Flink的“KeyedStream”和“Non-KeyedStream”的区别,哪些算子需要基于KeyedStream?
      • 24. Flink中的“Reduce”和“Aggregate”算子有何区别?分别适用于什么场景?
        • Reduce算子
        • Aggregate算子
      • 25. 什么是Flink的“CoGroup”和“Join”算子?两者的实现原理有何不同?
      • 26. Flink的“Connect”和“Union”算子有什么区别?
      • 27. 如何使用Flink实现双流Join(如内连接、左连接)?需要注意哪些问题?
        • 实现方式
        • 注意事项
      • 28. 解释Flink的“ProcessFunction”,它与普通算子相比有哪些优势?
      • 29. Flink中“FlatMapFunction”“MapFunction”“FilterFunction”的区别是什么?
      • 30. 如何实现Flink的“自定义Source”和“自定义Sink”?请举例说明。
        • 自定义Source
        • 自定义Sink
      • 31. Flink的“Side Output”是什么?如何使用?
      • 32. 什么是Flink的“分流”和“合流”?分别有哪些实现方式?
        • 分流的实现方式
        • 合流的实现方式
      • 33. Flink中的“Broadcast State”是什么?适用于什么场景?
      • 34. 如何使用Flink处理迟到数据?有哪些策略?
      • 35. 解释Flink的“Window Join”和“Interval Join”的区别。
      • 36. Flink的“KeyBy”算子的实现原理是什么?需要注意哪些数据类型问题?
        • 实现原理
        • 数据类型注意事项
      • 37. 什么是Flink的“状态后端(State Backend)”?有哪几种类型,各有什么特点?
      • 38. Flink中“RichFunction”与普通Function有何区别?何时需要使用RichFunction?
      • 39. 如何在Flink中实现数据的“去重”操作?有哪些方法?
      • 40. 解释Flink的“Cep(Complex Event Processing)”库,它能解决什么问题?
  • 二、100道Flink 面试题目录列表

一、本文面试题目录

21. Flink中常见的窗口类型有哪些(如滚动窗口、滑动窗口、会话窗口等)?各自的适用场景是什么?

Flink中常见的窗口类型按划分方式可分为以下几类:

  1. 时间窗口(Time Window)

    • 滚动时间窗口(Tumbling Time Window)

      • 特点:窗口大小固定,无重叠,按时间间隔连续划分(如每10分钟一个窗口)。
      • 适用场景:固定周期的统计分析(如每小时的订单总量、日活用户统计)。
    • 滑动时间窗口(Sliding Time Window)

      • 特点:窗口大小固定,但有重叠部分(如窗口大小30分钟,滑动步长10分钟)。
      • 适用场景:需要高频更新统计结果的场景(如每10分钟更新一次最近30分钟的流量数据)。
    • 会话窗口(Session Window)

      • 特点:基于用户活动间隙划分,当一定时间内无新数据则窗口关闭(如会话超时时间30分钟)。
      • 适用场景:用户行为分析(如电商网站的用户会话跟踪、APP的单次使用行为分析)。
  2. 计数窗口(Count Window)

    • 滚动计数窗口(Tumbling Count Window)

      • 特点:元素数量达到阈值时触发(如每100个元素一个窗口)。
      • 适用场景:固定数量的批处理(如每收集1000条日志就进行一次分析)。
    • 滑动计数窗口(Sliding Count Window)

      • 特点:每新增N个元素滑动一次,窗口包含M个元素(M > N)。
      • 适用场景:需要频繁更新统计结果的场景(如每新增10个元素,统计最近50个元素的平均值)。

示例:滚动和滑动窗口的实现

// 滚动时间窗口(10分钟)
dataStream.keyBy(...).timeWindow(Time.minutes(10)).sum(1);// 滑动时间窗口(30分钟窗口,10分钟滑动一次)
dataStream.keyBy(...).timeWindow(Time.minutes(30), Time.minutes(10)).sum(1);// 会话窗口(超时时间30分钟)
dataStream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.minutes(30))).sum(1);

22. 如何自定义Flink窗口的触发器(Trigger)和驱逐器(Evictor)?

触发器(Trigger) 决定窗口何时触发计算,驱逐器(Evictor) 决定触发后保留或移除窗口中的哪些元素。

自定义Trigger

需继承Trigger<IN, W extends Window>,实现以下方法:

  • onElement():新元素进入窗口时调用
  • onProcessingTime():处理时间定时器触发时调用
  • onEventTime():事件时间定时器触发时调用
  • clear():窗口关闭时清理资源

示例:自定义计数触发器(元素数达到5时触发)

public class CountTrigger extends Trigger<Tuple2<String, Integer>, TimeWindow> {private int count = 0;private final int threshold;public CountTrigger(int threshold) {this.threshold = threshold;}@Overridepublic TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) {count++;if (count >= threshold) {count = 0; // 重置计数return TriggerResult.FIRE_AND_PURGE; // 触发并清空窗口}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) {count = 0;}
}// 使用自定义触发器
dataStream.keyBy(0).timeWindow(Time.minutes(10)).trigger(new CountTrigger(5)) // 每5个元素触发一次.sum(1);
自定义Evictor

需继承Evictor<IN, W extends Window>,实现evictBefore()(触发前移除元素)和evictAfter()(触发后移除元素)。

示例:保留窗口中最后3个元素

public class LastNEvictor extends Evictor<Tuple2<String, Integer>, TimeWindow> {private final int n;public LastNEvictor(int n) {this.n = n;}@Overridepublic void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements, int size, TimeWindow window, EvictorContext ctx) {// 触发前不做处理}@Overridepublic void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements, int size, TimeWindow window, EvictorContext ctx) {List<TimestampedValue<Tuple2<String, Integer>>> list = Lists.newArrayList(elements);if (list.size() > n) {// 移除前size-n个元素,保留最后n个for (int i = 0; i < list.size() - n; i++) {list.remove(0);}}}
}// 使用自定义驱逐器
dataStream.keyBy(0).timeWindow(Time.minutes(10)).evictor(new LastNEvictor(3)) // 保留最后3个元素.sum(1);

23. 解释Flink的“KeyedStream”和“Non-KeyedStream”的区别,哪些算子需要基于KeyedStream?

  • KeyedStream:按指定Key分组后的数据流,相同Key的元素会被分配到同一个并行任务处理,支持状态隔离和按Key的聚合操作。
  • Non-KeyedStream:未分组的数据流,所有元素在多个并行任务间随机分配,不支持按Key的状态管理。

两者核心区别:

特性KeyedStreamNon-KeyedStream
状态管理支持按Key隔离的状态仅支持算子级别的全局状态
并行处理相同Key的元素在同一任务处理元素随机分配到不同任务
聚合操作支持(如sum、reduce)需通过windowAll等算子(并行度1)
时间特性支持基于Key的窗口和定时器窗口操作并行度为1

需要基于KeyedStream的算子:

  • 状态相关算子:mapWithStateflatMapWithState
  • 聚合算子:reducesumminmaxaggregate
  • 窗口算子:window(非windowAll)、countWindow
  • 定时器相关:ProcessFunction中的registerEventTimeTimer

示例:KeyedStream与Non-KeyedStream的使用

DataStream<Tuple2<String, Integer>> dataStream = ...;// KeyedStream:按第一个字段分组
KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(t -> t.f0);
keyedStream.sum(1).print(); // 支持按Key聚合// Non-KeyedStream:全局聚合(并行度为1)
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5))).sum(1).print();

24. Flink中的“Reduce”和“Aggregate”算子有何区别?分别适用于什么场景?

ReduceAggregate都是Flink中的聚合算子,但适用场景和功能不同:

特性ReduceAggregate
输入输出类型必须相同(IN → IN)可不同(IN → ACC → OUT)
中间状态仅保留上一次聚合结果可自定义累加器(Accumulator)
灵活性较低,仅支持二元操作较高,支持复杂聚合逻辑
适用场景简单聚合(如求和、求最值)复杂聚合(如平均值、加权求和)
Reduce算子

接收两个相同类型的元素,返回一个同类型的元素,适合简单的累加操作。

示例:使用Reduce计算总和

DataStream<Tuple2<String, Integer>> dataStream = ...;dataStream.keyBy(t -> t.f0).timeWindow(Time.minutes(5)).reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1)) // 累加第二个字段.print();
Aggregate算子

通过自定义累加器实现复杂聚合,包含三个阶段:

  1. createAccumulator():初始化累加器
  2. add():将元素添加到累加器
  3. getResult():从累加器提取结果
  4. merge():合并多个累加器(用于窗口合并)

示例:使用Aggregate计算平均值

// 自定义AggregateFunction:计算平均值
public class AverageAggregate implements AggregateFunction<Tuple2<String, Integer>,       // 输入类型Tuple2<Integer, Integer>,      // 累加器类型(总和,计数)Tuple2<String, Double>> {      // 输出类型(Key,平均值)private String key;@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return new Tuple2<>(0, 0); // (sum, count)}@Overridepublic Tuple2<Integer, Integer> add(Tuple2<String, Integer> value, Tuple2<Integer, Integer> accumulator) {this.key = value.f0;return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);}@Overridepublic Tuple2<String, Double> getResult(Tuple2<Integer, Integer> accumulator) {return new Tuple2<>(key, (double) accumulator.f0 / accumulator.f1);}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}// 使用Aggregate算子
dataStream.keyBy(t -> t.f0).timeWindow(Time.minutes(5)).aggregate(new AverageAggregate()).print();

25. 什么是Flink的“CoGroup”和“Join”算子?两者的实现原理有何不同?

CoGroupJoin都是用于关联两个数据流的算子,但实现方式和功能不同:

  • Join算子:基于指定条件关联两个流,仅输出满足条件的匹配对,类似SQL的内连接。
  • CoGroup算子:将两个流中满足条件的元素分组后进行联合处理,可实现内连接、左连接、右连接等,灵活性更高。

实现原理区别:

  1. Join

    • 将两个流的数据按Key分组并放入窗口。
    • 对窗口内的元素进行笛卡尔积,过滤出满足条件的记录。
    • 仅输出匹配的记录对。
  2. CoGroup

    • 同样按Key分组并放入窗口。
    • 将两个流中相同Key的元素分别收集到两个迭代器中。
    • 提供自定义逻辑处理这两个迭代器(可输出匹配或不匹配的记录)。

示例:Join与CoGroup的对比

DataStream<Tuple2<String, Integer>> stream1 = ...; // (name, age)
DataStream<Tuple2<String, String>> stream2 = ...;  // (name, city)// Join算子:内连接(仅输出匹配的记录)
stream1.join(stream2).where(t1 -> t1.f0) // stream1的Key.equalTo(t2 -> t2.f0) // stream2的Key.window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1)) // (name, age, city).print();// CoGroup算子:左连接(输出所有stream1的记录,匹配不到则补null)
stream1.coGroup(stream2).where(t1 -> t1.f0).equalTo(t2 -> t2.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((iter1, iter2, out) -> {List<Tuple2<String, String>> list2 = Lists.newArrayList(iter2);for (Tuple2<String, Integer> t1 : iter1) {if (list2.isEmpty()) {out.collect(new Tuple3<>(t1.f0, t1.f1, null));} else {for (Tuple2<String, String> t2 : list2) {out.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}}}}).print();

26. Flink的“Connect”和“Union”算子有什么区别?

ConnectUnion都用于合并数据流,但适用场景和功能有显著区别:

特性ConnectUnion
输入流类型可处理不同类型的两个流必须处理相同类型的多个流
输出流类型保留两个流的类型(ConnectedStream)与输入流类型一致
合并逻辑可对两个流应用不同处理逻辑仅简单合并,对所有流处理逻辑相同
并行度保持原有两个流的并行度总并行度为所有输入流并行度之和
支持流数量仅支持两个流支持多个流(>=2)

示例:Connect与Union的对比

// 不同类型的流
DataStream<String> streamA = ...;
DataStream<Integer> streamB = ...;
DataStream<String> streamC = ...;// Connect:合并不同类型的流
ConnectedStreams<String, Integer> connectedStream = streamA.connect(streamB);
DataStream<Object> result = connectedStream.map(str -> str.toUpperCase(), // 处理streamAnum -> num * 2            // 处理streamB);// Union:合并相同类型的流(streamA和streamC都是String类型)
DataStream<String> unionStream = streamA.union(streamC);
unionStream.map(str -> str.toLowerCase()).print();

27. 如何使用Flink实现双流Join(如内连接、左连接)?需要注意哪些问题?

Flink中双流Join通常基于窗口实现,确保数据在有限时间范围内关联。常见类型包括内连接、左连接和右连接。

实现方式
  1. 内连接(Inner Join):仅输出两个流中匹配的记录。
  2. 左连接(Left Join):输出左流所有记录,右流匹配不到则补空。
  3. 右连接(Right Join):输出右流所有记录,左流匹配不到则补空。

示例:基于窗口的双流Join

// 定义两个流
DataStream<Order> orders = ...; // 订单流 (orderId, userId, amount, timestamp)
DataStream<User> users = ...;  // 用户流 (userId, name, timestamp)// 1. 内连接
orders.join(users).where(Order::getUserId).equalTo(User::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((order, user) -> new OrderWithUserName(order.getOrderId(),user.getName(),order.getAmount())).print("Inner Join Result:");// 2. 左连接(使用CoGroup实现)
orders.coGroup(users).where(Order::getUserId).equalTo(User::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((ordersIter, usersIter, out) -> {List<User> userList = Lists.newArrayList(usersIter);for (Order order : ordersIter) {if (userList.isEmpty()) {// 左流记录,无匹配的右流记录out.collect(new OrderWithUserName(order.getOrderId(), "Unknown", order.getAmount()));} else {for (User user : userList) {out.collect(new OrderWithUserName(order.getOrderId(), user.getName(), order.getAmount()));}}}}).print("Left Join Result:");
注意事项
  1. 窗口大小选择:窗口过小可能导致匹配率低,过大则增加状态存储压力。
  2. 数据乱序处理:需合理设置Watermark延迟时间,确保关联数据能在窗口内到达。
  3. 状态管理:Join会在窗口内缓存大量数据,需配置合适的状态后端(如RocksDB)。
  4. 数据倾斜:若某类Key的数据过多,会导致该窗口处理压力过大,需提前进行数据均衡。
  5. 性能优化:可通过预过滤减少窗口内数据量,或使用Interval Join替代窗口Join优化性能。

28. 解释Flink的“ProcessFunction”,它与普通算子相比有哪些优势?

ProcessFunction是Flink中最灵活的处理算子,允许访问事件时间、处理时间、状态和定时器,是实现复杂业务逻辑的核心工具。

与普通算子(如map、filter)相比,其优势在于:

  1. 访问时间特性:可直接获取事件时间和处理时间。
  2. 状态管理:支持Keyed State和定时器状态。
  3. 定时器功能:可注册事件时间或处理时间定时器,实现延迟操作。
  4. 侧输出流:可将不同类型的数据发送到侧输出流,实现分流。
  5. 细粒度控制:可自定义数据处理逻辑,适合复杂业务场景。

ProcessFunction家族包括:

  • ProcessFunction:处理Non-KeyedStream
  • KeyedProcessFunction:处理KeyedStream(最常用)
  • CoProcessFunction:处理ConnectedStreams
  • ProcessWindowFunction:窗口处理的增强版

示例:使用KeyedProcessFunction实现超时检测

// 检测用户会话超时(5秒内无活动则输出超时信息)
public class SessionTimeoutProcessFunction extends KeyedProcessFunction<String, UserEvent, String> {private transient ValueState<Long> lastActivityTimeState;@Overridepublic void open(Configuration parameters) {// 初始化状态:存储最后活动时间ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("lastActivity", Long.class);lastActivityTimeState = getRuntimeContext().getState(descriptor);}@Overridepublic void processElement(UserEvent event, Context ctx, Collector<String> out) throws Exception {Long lastTime = lastActivityTimeState.value();long currentTime = ctx.timestamp(); // 获取事件时间if (lastTime == null) {// 首次活动,注册5秒后超时定时器ctx.timerService().registerEventTimeTimer(currentTime + 5000);} else {// 有新活动,删除旧定时器,注册新定时器ctx.timerService().deleteEventTimeTimer(lastTime + 5000);ctx.timerService().registerEventTimeTimer(currentTime + 5000);}// 更新最后活动时间lastActivityTimeState.update(currentTime);out.collect(event.getUserId() + " 活动时间: " + new Date(currentTime));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 定时器触发,输出超时信息out.collect(ctx.getCurrentKey() + " 会话超时! 最后活动时间: " + new Date(lastActivityTimeState.value()));lastActivityTimeState.clear();}
}// 使用ProcessFunction
DataStream<UserEvent> userEvents = ...;
userEvents.keyBy(UserEvent::getUserId).process(new SessionTimeoutProcessFunction()).print();

29. Flink中“FlatMapFunction”“MapFunction”“FilterFunction”的区别是什么?

这三个都是Flink的基础转换算子,但功能和适用场景不同:

算子输入输出关系功能描述适用场景
MapFunction一对一(1:1)将一个元素转换为另一个元素简单转换(如类型转换、字段提取)
FlatMapFunction一对多(1:N,N≥0)将一个元素转换为零个、一个或多个元素拆分数据(如字符串分割、JSON数组解析)
FilterFunction一对一或零(1:0或1:1)根据条件保留或过滤元素数据筛选(如保留符合条件的记录)

示例:三种算子的对比

DataStream<String> input = env.fromElements("apple,banana", "orange", "grape,melon");// MapFunction:将字符串转换为长度
DataStream<Integer> mapResult = input.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return value.length();}
});// FlatMapFunction:将逗号分隔的字符串拆分为单个单词
DataStream<String> flatMapResult = input.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {for (String word : value.split(",")) {out.collect(word);}}
});// FilterFunction:保留长度大于5的单词
DataStream<String> filterResult = flatMapResult.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) {return value.length() > 5;}
});// 输出结果:
// mapResult: 10, 6, 10
// flatMapResult: apple, banana, orange, grape, melon
// filterResult: banana, orange, melon

30. 如何实现Flink的“自定义Source”和“自定义Sink”?请举例说明。

Flink允许通过自定义Source和Sink连接到特定的数据存储或服务,满足个性化的数据输入输出需求。

自定义Source

需实现SourceFunction(非并行)或ParallelSourceFunction(并行),重写run()cancel()方法。

示例:自定义并行Source(生成随机整数)

public class RandomNumberSource implements ParallelSourceFunction<Integer> {private boolean isRunning = true;private Random random = new Random();@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (isRunning) {// 生成1-100的随机整数int num = random.nextInt(100) + 1;ctx.collect(num);Thread.sleep(100); // 每100ms生成一个}}@Overridepublic void cancel() {isRunning = false;}
}// 使用自定义Source
DataStream<Integer> randomNumbers = env.addSource(new RandomNumberSource()).setParallelism(2);
randomNumbers.print();
自定义Sink

需实现SinkFunction,重写invoke()方法处理输出数据,或使用更灵活的RichSinkFunction(可访问生命周期方法)。

示例:自定义Sink(将数据写入文件)

public class FileSink extends RichSinkFunction<Integer> {private FileWriter writer;private String filePath;public FileSink(String filePath) {this.filePath = filePath;}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化文件写入器(每个并行实例写入不同文件)int index = getRuntimeContext().getIndexOfThisSubtask();writer = new FileWriter(filePath + "_" + index + ".txt", true);}@Overridepublic void invoke(Integer value, Context context) throws Exception {// 写入数据writer.write(value.toString() + "\n");writer.flush();}@Overridepublic void close() throws Exception {// 关闭资源if (writer != null) {writer.close();}}
}// 使用自定义Sink
randomNumbers.addSink(new FileSink("/tmp/random_numbers")).setParallelism(2);

31. Flink的“Side Output”是什么?如何使用?

Side Output(侧输出流)是Flink中处理分流的机制,允许从一个算子中输出多种类型的数据到不同的流中,解决了普通算子只能输出单一类型数据流的限制。

使用场景:

  • 分离正常数据和异常数据(如格式错误的记录)
  • 按不同条件拆分数据流(如按优先级划分)
  • 提取特定特征的数据(如超过阈值的异常值)

使用步骤:

  1. 定义OutputTag标识侧输出流(需指定类型)。
  2. 在算子中使用Context.output()将数据发送到侧输出流。
  3. 通过getSideOutput()获取侧输出流。

示例:使用Side Output分离正常和异常数据

// 1. 定义侧输出流标签(异常数据)
OutputTag<String> errorTag = new OutputTag<String>("error-data") {};// 2. 主处理流程(分离正常和异常数据)
SingleOutputStreamOperator<Integer> mainStream = inputStream.process(new ProcessFunction<String, Integer>() {@Overridepublic void processElement(String value, Context ctx, Collector<Integer> out) {try {// 尝试将字符串转换为整数(正常数据)int num = Integer.parseInt(value);out.collect(num);} catch (NumberFormatException e) {// 转换失败,发送到侧输出流(异常数据)ctx.output(errorTag, "Invalid number: " + value);}}});// 3. 获取主输出流和侧输出流
mainStream.print("Normal Data:");
DataStream<String> errorStream = mainStream.getSideOutput(errorTag);
errorStream.print("Error Data:");

32. 什么是Flink的“分流”和“合流”?分别有哪些实现方式?

  • 分流:将一个数据流拆分为多个子数据流,每个子数据流包含不同特征的数据。
  • 合流:将多个数据流合并为一个数据流,实现数据的汇总处理。
分流的实现方式
  1. Side Output:最常用方式,支持输出多种类型数据(见31题)。
  2. Filter:对原始流多次应用filter算子,得到不同子流(效率较低,数据会被多次处理)。

示例:使用Filter分流

DataStream<Integer> stream = ...;// 分流为偶数流和奇数流
DataStream<Integer> evenStream = stream.filter(num -> num % 2 == 0);
DataStream<Integer> oddStream = stream.filter(num -> num % 2 != 0);
合流的实现方式
  1. Union:合并多个同类型数据流(见26题)。
  2. Connect:合并两个不同类型数据流(见26题)。
  3. Join/CoGroup:基于Key关联两个数据流(见25题)。

示例:多种合流方式对比

DataStream<String> streamA = ...;
DataStream<String> streamB = ...;
DataStream<Integer> streamC = ...;// Union:合并同类型流
DataStream<String> unionStream = streamA.union(streamB);// Connect:合并不同类型流
ConnectedStreams<String, Integer> connectedStream = streamA.connect(streamC);// Join:基于Key合并
streamA.map(str -> new Tuple2<>(str, 1)).join(streamB.map(str -> new Tuple2<>(str, 2))).where(t -> t.f0).equalTo(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(5))).apply((t1, t2) -> t1.f0 + ": " + t1.f1 + "," + t2.f1);

33. Flink中的“Broadcast State”是什么?适用于什么场景?

Broadcast State是Flink中用于在多个并行任务间共享只读数据的机制,允许将一个流(广播流)的数据广播到另一个流(非广播流)的所有并行实例中,实现动态配置更新等功能。

核心特性:

  • 广播流(BroadcastStream):包含需共享的数据(如配置、规则),会被发送到所有并行任务。
  • 非广播流:普通数据流,每个并行任务可访问广播流中的完整数据。
  • 只读性:非广播流的任务只能读取广播状态,不能修改(保证一致性)。

适用场景:

  • 动态规则更新(如实时风控规则调整)
  • 配置参数下发(如动态修改阈值)
  • 小表关联(如将字典表广播到所有任务)

示例:使用Broadcast State实现动态过滤

// 1. 定义广播数据(过滤规则)和普通数据(待处理记录)
DataStream<FilterRule> ruleStream = ...; // 规则流:(keyword, action)
DataStream<String> dataStream = ...;    // 数据记录流// 2. 定义广播状态描述符
MapStateDescriptor<String, FilterRule> ruleStateDescriptor = new MapStateDescriptor<>("filterRules",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<FilterRule>() {})
);// 3. 广播规则流
BroadcastStream<FilterRule> broadcastRuleStream = ruleStream.broadcast(ruleStateDescriptor);// 4. 连接广播流和普通流,应用动态规则
DataStream<String> resultStream = dataStream.connect(broadcastRuleStream).process(new BroadcastProcessFunction<String, FilterRule, String>() {@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 读取广播状态中的规则Iterable<Map.Entry<String, FilterRule>> rules = ctx.getBroadcastState(ruleStateDescriptor).entries();boolean shouldFilter = false;for (Map.Entry<String, FilterRule> rule : rules) {if (value.contains(rule.getKey()) && rule.getValue().getAction().equals("block")) {shouldFilter = true;break;}}if (!shouldFilter) {out.collect(value);}}@Overridepublic void processBroadcastElement(FilterRule rule, Context ctx, Collector<String> out) throws Exception {// 更新广播状态ctx.getBroadcastState(ruleStateDescriptor).put(rule.getKeyword(), rule);}});

34. 如何使用Flink处理迟到数据?有哪些策略?

迟到数据指在Watermark已超过窗口结束时间后到达的数据。Flink提供多种处理策略:

  1. 允许窗口延迟关闭(Allowed Lateness)

    • 为窗口设置延迟时间,窗口在结束时间后继续等待该时长,期间接收的迟到数据仍会被处理。
    • 适用于大部分迟到数据在短时间内到达的场景。
  2. 侧输出流(Side Output)

    • 将超过允许延迟时间的迟到数据发送到侧输出流,单独处理。
    • 适用于需要完整保留迟到数据并后续分析的场景。
  3. 调整Watermark生成策略

    • 增大Watermark的延迟时间(如BoundedOutOfOrdernessTimestampExtractor的参数),给迟到数据更多到达时间。
    • 适用于数据乱序程度较高的场景,但会增加处理延迟。
  4. 使用会话窗口(Session Window)

    • 基于活动间隙划分窗口,避免固定窗口对迟到数据的严格限制。
    • 适用于用户行为等非周期性数据。

示例:综合处理迟到数据

// 1. 定义事件时间和Watermark(允许5秒乱序)
DataStream<Event> stream = ...;
DataStream<Event> withWatermark = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {@Overridepublic long extractTimestamp(Event element) {return element.getTimestamp();}}
);// 2. 定义侧输出流标签(接收迟到数据)
OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data") {};// 3. 窗口处理(允许3秒延迟关闭)
SingleOutputStreamOperator<Result> result = withWatermark.keyBy(Event::getKey).timeWindow(Time.minutes(10)).allowedLateness(Time.seconds(3)) // 窗口额外等待3秒.sideOutputLateData(lateDataTag) // 超过延迟时间的数据进入侧输出.apply(new WindowFunction<Event, Result, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<Event> input, Collector<Result> out) {// 窗口处理逻辑}});// 4. 处理侧输出流中的迟到数据
DataStream<Event> lateData = result.getSideOutput(lateDataTag);
lateData.print("Late Data:");

35. 解释Flink的“Window Join”和“Interval Join”的区别。

Window JoinInterval Join都是Flink中用于关联两个数据流的算子,但适用场景和实现方式不同:

特性Window JoinInterval Join
时间范围定义基于固定窗口(如每10分钟)基于左流元素时间的相对区间(如[-5, +5]秒)
数据关联方式窗口内两个流的元素笛卡尔积匹配右流元素时间在左流元素时间区间内则匹配
状态存储缓存窗口内所有元素仅缓存右流在时间区间内的元素
适用场景周期性批量关联实时性要求高的连续关联
性能窗口结束时集中处理,可能有延迟元素到达即处理,实时性更好

示例:Window Join与Interval Join对比

DataStream<Order> orders = ...; // (orderId, userId, timestamp)
DataStream<Payment> payments = ...; // (paymentId, orderId, timestamp)// Window Join:10分钟窗口内关联订单和支付
orders.join(payments).where(Order::getOrderId).equalTo(Payment::getOrderId).window(TumblingEventTimeWindows.of(Time.minutes(10))).apply((order, payment) -> new OrderPayment(order, payment)).print("Window Join:");// Interval Join:支付时间在订单时间的[-5, +10]秒区间内
orders.keyBy(Order::getOrderId).intervalJoin(payments.keyBy(Payment::getOrderId)).between(Time.seconds(-5), Time.seconds(10)) // 支付可在订单前5秒到后10秒.process(new ProcessJoinFunction<Order, Payment, OrderPayment>() {@Overridepublic void processElement(Order order, Payment payment, Context ctx, Collector<OrderPayment> out) {out.collect(new OrderPayment(order, payment));}}).print("Interval Join:");

36. Flink的“KeyBy”算子的实现原理是什么?需要注意哪些数据类型问题?

KeyBy算子通过将数据流按指定Key分组,使相同Key的元素被分配到同一个并行任务处理,是实现状态隔离和聚合操作的基础。

实现原理
  1. Key提取:根据用户指定的Key选择器(如字段索引、Lambda表达式)从元素中提取Key。
  2. 哈希分区:对提取的Key计算哈希值,然后通过hashCode % 并行度确定目标并行任务索引。
  3. 数据重分区:将元素发送到目标任务,确保相同Key的元素进入同一任务。
数据类型注意事项
  1. 不可变类型:Key应使用不可变类型(如String、Integer、Tuple),避免Key值在处理过程中被修改导致哈希不一致。
  2. POJO类型:若使用POJO作为Key,需确保:
    • 重写hashCode()equals()方法,保证哈希一致性。
    • 字段不可变或修改后需重新KeyBy(否则可能导致相同Key的元素分到不同任务)。
  3. 不支持的类型
    • 基本类型数组(如int[]):KeyBy会按引用而非内容计算哈希,需转为List或自定义类型。
    • 某些特殊类型(如ArrayList、HashMap):其hashCode()可能随内容变化,不适合作为Key。

示例:正确使用KeyBy的方式

// 1. 使用Tuple的字段作为Key(推荐)
DataStream<Tuple2<String, Integer>> stream = ...;
KeyedStream<Tuple2<String, Integer>, String> keyedByField = stream.keyBy(t -> t.f0);// 2. 使用POJO作为Key(需重写hashCode和equals)
public class UserKey {private String id;private String name;@Overridepublic int hashCode() {return Objects.hash(id, name); // 基于id和name计算哈希}@Overridepublic boolean equals(Object obj) {if (this == obj) return true;if (obj == null || getClass() != obj.getClass()) return false;UserKey other = (UserKey) obj;return Objects.equals(id, other.id) && Objects.equals(name, other.name);}
}DataStream<User> userStream = ...;
KeyedStream<User, UserKey> keyedByPojo = userStream.keyBy(user -> new UserKey(user.getId(), user.getName())
);

37. 什么是Flink的“状态后端(State Backend)”?有哪几种类型,各有什么特点?

状态后端(State Backend) 是Flink中负责管理状态存储、Checkpoint和恢复的组件,决定了状态的存储位置和格式。

Flink提供三种核心状态后端:

  1. MemoryStateBackend

    • 存储位置:JVM堆内存
    • 特点:
      • 读写速度快(内存操作)
      • 状态大小受限于JVM内存,不适合大规模状态
      • Checkpoint数据存储在JobManager内存中,可靠性低
    • 适用场景:开发测试、无状态或小状态作业
  2. FsStateBackend

    • 存储位置:
      • 工作状态:TaskManager堆内存
      • Checkpoint:文件系统(本地文件、HDFS等)
    • 特点:
      • 工作状态仍在内存,读写较快
      • Checkpoint持久化到文件系统,可靠性高
      • 状态大小受限于TaskManager内存,适合中等规模状态
    • 适用场景:生产环境中的中小规模状态作业
  3. RocksDBStateBackend

    • 存储位置:
      • 工作状态:TaskManager本地的RocksDB(嵌入式KV数据库,磁盘存储)
      • Checkpoint:文件系统
    • 特点:
      • 状态存储在磁盘,支持大规模状态(TB级)
      • 读写速度较内存慢,但通过内存缓存优化
      • 支持状态增量Checkpoint,减少IO开销
    • 适用场景:生产环境中的大规模状态作业(如长时间运行的有状态流处理)

配置方式:

// 代码中配置
env.setStateBackend(new MemoryStateBackend());
// 或
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
// 或
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));// 配置文件中配置(flink-conf.yaml)
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints

38. Flink中“RichFunction”与普通Function有何区别?何时需要使用RichFunction?

RichFunction是Flink中带生命周期管理的函数接口,继承自普通Function,提供了更多高级功能。

与普通Function的区别:

特性RichFunction普通Function
生命周期方法提供open()、close()等方法无生命周期方法
状态访问可通过getRuntimeContext()访问状态不支持状态访问
并行度信息可获取任务索引、并行度等信息无法获取执行环境信息
配置参数可通过open()方法获取配置参数无配置参数访问能力

需要使用RichFunction的场景:

  1. 访问状态:需要使用Keyed State或Operator State时。
  2. 资源管理:需要在函数初始化时创建资源(如数据库连接),并在结束时释放。
  3. 获取执行信息:需要获取并行度、任务索引等运行时信息时。
  4. 配置参数传递:需要从外部接收配置参数时。

示例:使用RichMapFunction管理数据库连接

public class DatabaseRichMapFunction extends RichMapFunction<String, Result> {private Connection conn;private PreparedStatement stmt;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化资源:创建数据库连接super.open(parameters);String url = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("db.url");conn = DriverManager.getConnection(url);stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?");}@Overridepublic Result map(String userId) throws Exception {// 使用数据库连接查询数据stmt.setString(1, userId);ResultSet rs = stmt.executeQuery();if (rs.next()) {return new Result(rs.getString("id"), rs.getString("name"));}return new Result(userId, "Unknown");}@Overridepublic void close() throws Exception {// 释放资源if (stmt != null) stmt.close();if (conn != null) conn.close();super.close();}
}// 使用RichFunction
Configuration config = new Configuration();
config.setString("db.url", "jdbc:mysql://localhost:3306/test");
env.getConfig().setGlobalJobParameters(config);DataStream<String> userIds = ...;
userIds.map(new DatabaseRichMapFunction()).print();

39. 如何在Flink中实现数据的“去重”操作?有哪些方法?

Flink中数据去重指移除流中重复的元素,确保每个元素只被处理一次。常用方法包括:

  1. 基于状态的去重

    • 原理:使用ValueStateListState存储已处理的唯一标识(如ID),新元素到来时先检查状态,不存在则处理并更新状态。
    • 适用场景:有唯一标识的流数据,需精确去重。

    示例:基于状态去重

    public class DeduplicationFunction extends KeyedProcessFunction<String, Event, Event> {private transient ValueState<Boolean> seenState;@Overridepublic void open(Configuration parameters) {seenState = getRuntimeContext().getState(new ValueStateDescriptor<>("seen", Boolean.class));}@Overridepublic void processElement(Event event, Context ctx, Collector<Event> out) throws Exception {// 检查是否已处理过该事件(按eventId去重)if (seenState.value() == null) {out.collect(event); // 输出未重复的事件seenState.update(true); // 标记为已处理// 设置状态过期时间(避免状态无限增长)ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 86400000); // 24小时后过期}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {// 清除过期状态seenState.clear();}
    }// 使用去重函数(按eventId分组)
    DataStream<Event> stream = ...;
    stream.keyBy(Event::getEventId).process(new DeduplicationFunction()).print();
    
  2. 基于窗口的去重

    • 原理:在窗口内使用distinct()方法去重,只保证窗口内数据不重复。
    • 适用场景:允许一定时间范围内的重复,无需全局去重。

    示例:窗口内去重

    stream.keyBy(...).timeWindow(Time.hours(1)).distinct("eventId") // 按eventId字段去重.print();
    
  3. 基于外部系统的去重

    • 原理:利用外部KV存储(如Redis)记录已处理的ID,新元素到来时先查询外部系统。
    • 适用场景:状态过大无法本地存储,或需要跨作业去重。

    示例:基于Redis去重

    public class RedisDeduplicationFunction extends RichMapFunction<Event, Event> {private Jedis jedis;private String redisHost;@Overridepublic void open(Configuration parameters) {redisHost = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("redis.host");jedis = new Jedis(redisHost);}@Overridepublic Event map(Event event) {String key = "seen:" + event.getEventId();if (jedis.set(key, "1", "NX", "EX", 86400) != null) {// 不存在该key,返回事件return event;} else {// 已存在,返回null(后续可过滤)return null;}}@Overridepublic void close() {jedis.close();}
    }// 使用Redis去重
    stream.map(new RedisDeduplicationFunction()).filter(Objects::nonNull) // 过滤重复元素.print();
    

40. 解释Flink的“Cep(Complex Event Processing)”库,它能解决什么问题?

Flink CEP是Flink的复杂事件处理库,用于从连续的数据流中检测特定的事件模式(如序列、聚合、条件组合等),并触发相应动作。

核心概念:

  • 事件(Event):流中的单个数据记录。
  • 模式(Pattern):定义需要检测的事件序列或条件(如"A followed by B within 10 seconds")。
  • 模式匹配(Pattern Matching):CEP引擎按模式规则扫描数据流,识别符合条件的事件序列。
  • 结果处理:对匹配到的事件序列进行处理(如告警、聚合)。

能解决的问题:

  1. 实时监控与告警:如检测异常登录序列(多次失败后成功)。
  2. 业务流程分析:如监测电商下单-支付-发货的完整流程,识别异常中断。
  3. 用户行为分析:如检测用户在APP中的特定操作路径(浏览-加购-购买)。
  4. 故障诊断:如根据系统日志序列诊断故障原因。

示例:使用Flink CEP检测异常登录模式

// 1. 定义事件类型
public class LoginEvent {private String userId;private String ip;private boolean success;private long timestamp;// getters and setters
}// 2. 定义模式:3次失败登录后跟随1次成功登录
Pattern<LoginEvent, ?> loginPattern = Pattern.<LoginEvent>begin("firstFail").where(event -> !event.isSuccess()) // 第一次失败.next("secondFail").where(event -> !event.isSuccess()) // 第二次失败.next("thirdFail").where(event -> !event.isSuccess()) // 第三次失败.next("success").where(event -> event.isSuccess()) // 成功登录.within(Time.minutes(5)); // 5分钟内完成// 3. 将模式应用到数据流
DataStream<LoginEvent> loginStream = ...;
PatternStream<LoginEvent> patternStream = CEP.pattern(loginStream.keyBy(LoginEvent::getUserId), // 按用户ID分组loginPattern
);// 4. 处理匹配结果(输出告警)
DataStream<String> alertStream = patternStream.select(pattern -> {LoginEvent firstFail = pattern.get("firstFail").get(0);LoginEvent success = pattern.get("success").get(0);return "异常登录告警: 用户 " + firstFail.getUserId() + " 在 " + firstFail.getTimestamp() + " 到 " + success.getTimestamp() + " 期间经历3次失败后登录成功,IP: " + success.getIp();
});alertStream.print();

二、100道Flink 面试题目录列表

文章序号Flink 100道
1Flink面试题及详细答案100道(01-20)
2Flink面试题及详细答案100道(21-40)
3Flink面试题及详细答案100道(41-60)
4Flink面试题及详细答案100道(61-80)
5Flink面试题及详细答案100道(81-100)

文章转载自:

http://sFsx86yY.tqqhm.cn
http://CkrJZcdn.tqqhm.cn
http://f6yJL0kl.tqqhm.cn
http://mdE0zRPP.tqqhm.cn
http://uNNduQwM.tqqhm.cn
http://53QqqRFO.tqqhm.cn
http://u1qb8wOV.tqqhm.cn
http://HO1Lphuc.tqqhm.cn
http://iarSXgMJ.tqqhm.cn
http://d89M5VNN.tqqhm.cn
http://GmKgGxLl.tqqhm.cn
http://r2suriSN.tqqhm.cn
http://T8rzLuxp.tqqhm.cn
http://MhSHT0Ti.tqqhm.cn
http://j3aiOKk6.tqqhm.cn
http://y87o3BTd.tqqhm.cn
http://8Um4CH7B.tqqhm.cn
http://ilxoDu6i.tqqhm.cn
http://ALUCcdqq.tqqhm.cn
http://7SpyXYsO.tqqhm.cn
http://zHPrpIvF.tqqhm.cn
http://XNIWrrhm.tqqhm.cn
http://iGr2y29y.tqqhm.cn
http://gyrBsA3I.tqqhm.cn
http://obj6qpse.tqqhm.cn
http://3n1U5bci.tqqhm.cn
http://mllUBPT0.tqqhm.cn
http://wUp1fmWN.tqqhm.cn
http://9pfLpg1o.tqqhm.cn
http://OAZUucnA.tqqhm.cn
http://www.dtcms.com/a/379846.html

相关文章:

  • 用Python打造专业级老照片修复工具:让时光倒流的数字魔法
  • 第八章:移动端着色器的优化-Mobile Shader Adjustment《Unity Shaders and Effets Cookbook》
  • 前端性能优化:Webpack Tree Shaking 的实践与踩坑前端性能优化:Webpack Tree Shaking 的实践与踩坑
  • 国产凝思debian系Linux离线安装rabbitmq教程步骤
  • how to setup k3s on an offline ubuntu
  • RabbitMQ对接MQTT消息发布指南
  • ⸢ 肆-Ⅰ⸥ ⤳ 默认安全建设方案:d.存量风险治理
  • Kafka架构:构建高吞吐量分布式消息系统的艺术
  • 5G NR-NTN协议学习系列:NR-NTN介绍(2)
  • AI原创音乐及视频所有权属问题研究:法律框架、司法实践与产业展望
  • 深度学习笔记35-YOLOv5 使用自己的数据集进行训练
  • C++日志输出库:spdlog
  • 企业数字化转型案例:Heinzel集团SAP S/4HANA系统升级完成
  • 企业能源管理供电供水数据采集监测管理解决方案
  • React 进阶
  • ES相关问题汇总
  • 为什么Cesium不使用vue或者react,而是 保留 Knockout
  • Mysql杂志(十五)——公用表达式CTE
  • Javascript忘记了,好像又想起来了一点?
  • AI + 制造:NebulaAI 场景实践来了!
  • mosdns缓存dns服务器配置记录
  • android14 硬键盘ESC改BACK按键返回无效问题
  • 代码随想录算法训练营第62天 | Floyd 算法精讲、A * 算法精讲 (A star算法)、最短路算法总结篇、图论总结
  • 教程:用免费 Google Translate API 在 VSCode 中实现中文注释自动翻译英文
  • 数据储存方式
  • Java生态圈核心组件深度解析:Spring技术栈与分布式系统实战
  • 解决Ubuntu中apt-get -y安装时弹出交互提示的问题
  • 硅基计划3.0 Map类Set类
  • Ubuntu20.04手动安装中文输入法
  • 算法训练营DAY60 第十一章:图论part11