当前位置: 首页 > news >正文

Beam2.61.0版本消费kafka重复问题排查

1.问题出现过程

在测试环境测试flink的job的任务消费kafka的情况,通过往job任务发送一条消息,然后flink web ui上消费出现了两条。然后通过重启JobManager和TaskManager后,任务从checkpoint恢复后就会出现重复消费。当任务不从checkpoint恢复的时候,任务不会出现重复消费的情况。由此可见是beam从checkpoint恢复的时候出现了重复消费的问题。

2.任务排查过程

由于我们beam使用的是FlinkRunner,所以Beam消费Kafka会基于Flink的Source的规范实现相关的Source。

Flink中的Source实现的几个重要的类:Source:工厂类负责实例化以下的几个组件SourceSplit:封装数据源的逻辑分片(如文件块、Kafka 分区区间)。SplitEnumerator:负责分片发现与分配逻辑。SourceReader:处理分片数据读取与反序列化。

在Beam中分别实现的Flink的KafkaSource是以下这几个类:

FlinkUnboundedSource
FlinkSourceSplit
FlinkSourceSplitEnumerator
FlinkSourceReaderBase <- FlinkUnboundedSourceReader

其中在Flink中Source算子的执行和SourceOpearator和SourceCoordinator这两个类有关,他们的执行顺序如下:

  1. 初始化阶段

    • SourceCoordinator 优先启动:在 JobMaster(JobManager)启动时,SourceCoordinator 作为独立组件被创建,并负责初始化 SplitEnumerator(分片枚举器)。

    • SourceOperator 后续启动:在 TaskManager 上,每个并行任务实例(Task)启动时,会初始化 SourceOperator,并在其open()方法中创建 SourceReader(数据读取器)。

  2. 运行时协作

    • 分片分配:SourceCoordinator 的 SplitEnumerator 通过 RPC 响应 SourceOperator 的分片请求(如AddSplitEvent),动态分配分片(Split)。

    • 数据读取:SourceOperator 将分配到的分片交给内部的 SourceReader,通过pollNext()方法读取数据并发送到下游。

SourceOperator类逻辑

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>implements OperatorEventHandler,PushingAsyncDataInput<OUT>,TimestampsAndWatermarks.WatermarkUpdateListener {
​/** The state that holds the currently assigned splits. */// 状态存储当前被分配的分片信息private ListState<SplitT> readerState;@Overridepublic void open() throws Exception {// 初始化Reader操作initReader();
​// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval());} else {eventTimeLogic =TimestampsAndWatermarks.createNoOpEventTimeLogic(watermarkStrategy, sourceMetricGroup);}
​// restore the state if necessary.// 从checkpoint状态中恢复出上一次被分配的分片信息final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());if (!splits.isEmpty()) {LOG.info("Restoring state for {} split(s) to reader.", splits.size());// 然后把分片信息添加到Reader中sourceReader.addSplits(splits);}
​// Register the reader to the coordinator.registerReader();
​sourceMetricGroup.idlingStarted();// Start the reader after registration, sending messages in start is allowed.sourceReader.start();
​eventTimeLogic.startPeriodicWatermarkEmits();}// SourceOperator处理算子的对应事件public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {// 处理新增分片的事件:对应任务第一次消费,或者有心的分片增加了(对应到kafka中就是分区数增加了)handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {if (eventTimeLogic != null) {eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());}output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {List<SplitT> newSplits = event.splits(splitSerializer);numSplits += newSplits.size();if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {// For splits arrived before the main output is initialized, store them into the// pending list. Outputs of these splits will be created once the main output is// ready.outputPendingSplits.addAll(newSplits);} else {// Create output directly for new splits if the main output is already initialized.createOutputForSplits(newSplits);}// 将新增的分片信息添加到reader中。sourceReader.addSplits(newSplits);} catch (IOException e) {throw new FlinkRuntimeException("Failed to deserialize the splits.", e);}}
}

以上可以看到在SourceOperator中,SourceReader新增分片的地方有两个:Open()函数中从checkpoint中恢复的和handleAddSplitsEvent()中添加的分片信息,然后继续看看sourceReader.addSplits(newSplits)中调用的是FlinkSourceReaderBase#addSplits(newSplits)方法。

由于Beam中kafka的FlinkSourceReader分别对应有界和无界,所以中间有一个抽象的类FlinkSourceReaderBase

FlinkSourceReaderBase类

public abstract class FlinkSourceReaderBase<T, OutputT>implements SourceReader<OutputT, FlinkSourceSplit<T>> {// 这是一个队列,存储的是分片信息 private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();@Overridepublic void addSplits(List<FlinkSourceSplit<T>> splits) {checkExceptionAndMaybeThrow();LOG.info("Adding splits {}", splits);// 往队列中添加了分片信息sourceSplits.addAll(splits);waitingForSplitChangeFuture.get().complete(null);}protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {// 从队列中消费分片 FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();if (sourceSplit != null) {// 然后根据分片创建对应的Reader,进行消费Kafka的数据。Source.Reader<T> reader = createReader(sourceSplit);ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);return Optional.of(readerAndOutput);}return Optional.empty();}
}

所以看到以上的代码其实很清楚了,消费kafka重复很有可能是因为分片被重复添加导致的,由于在Kafka中KafkaConsumer在指定分区和Offset的情况下,是可以多个消费者在同一个消费者组中消费同一个分区的。

接下来使用arthas去监控sourceReader.addSplits(newSplits)的地方的调用情况:

// 监控SourceOperator#open()方法
watch org.apache.flink.util.CollectionUtil iterableToList '{params,returnObj,throwExp}'  -n 5  -x 3 
​
// 监控SourceOperator#handleAddSplitsEvent()方法
watch org.apache.flink.streaming.api.operators.SourceOperator handleAddSplitsEvent '{params,returnObj,throwExp}'  -n 5  -x 3 

最终观察到这两个地方都被调用了,所以问题就是因为checkpoint恢复的时候添加了分片信息,而从SourceCoordinator中调用FlinkSourceSplitEnumerator()计算分片的地方又添加了一次导致最终kafka消费重复了。

FlinkSourceSplitEnumerator类

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;private final Source<T> beamSource;private final PipelineOptions pipelineOptions;private final int numSplits;private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;// 这里标识split计算是否被初始化过private boolean splitsInitialized;  public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits) {this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);// 这里看到永远都是false,所以无论有没有从checkpoint恢复过,这里都会执行过一次。 this.splitsInitialized = false;}@Overridepublic void start() {context.callAsync(() -> {// 执行分片计算的操作,计算哪些kafka分区被分配给哪个并行度try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);// 这里标识设置为true了 splitsInitialized = true;// 将分配好的分片信息通过rpc发送给SourceOpeartor,对应并行度的task取对应并行度的分片信息。sendPendingSplitsToSourceReaders();}});}}

以上看到FlinkSourceSplitEnumerator被初始化的时候splitsInitialized被设置为false,然后接着看实例化FlinkSourceSplitEnumerator的FlinkSource中的逻辑。

public abstract class FlinkSource<T, OutputT>implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {// 这里是没有checkpoint的时候执行的 @Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);}
​// 这里是从checkppoint中恢复的地方@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 在这里实例化了FlinkSourceSplitEnumeratorFlinkSourceSplitEnumerator<T> enumerator =new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}}

以上看到在实例化FlinkSourceSplitEnumerator的地方,只要是从checkpoint中恢复的时候,将标识splitsInitialized设置为true,那么就不会从checkpoint中恢复的时候,去重复计算和添加分片从而导致重复消费了。

3.问题解决

后来在Beam的2.64.0版本中,发现这个bug已经被修复了,FlinkSource中restoreEnumerator的地方已经加上了判断逻辑了。

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
​@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 这里将splitInitialized标识设置为了trueSplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =createEnumerator(enumContext, true);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)throws Exception {
​if (boundedness == Boundedness.BOUNDED) {return new LazyFlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);} else {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);}}}
​
​
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits,boolean splitsInitialized) {
​this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);this.splitsInitialized = splitsInitialized;}
​@Overridepublic void start() {// 这里加上了判断逻辑了,为true不会执行了if (!splitsInitialized) {initializeSplits();}}
​private void initializeSplits() {context.callAsync(() -> {try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);splitsInitialized = true;sendPendingSplitsToSourceReaders();}});}
}

4.其他问题

从上可以看到Beam的KafkaSource实际上对比Flink原生的KafkaSource其实还有很多功能上的不足,比如说:

1.Beam中KafkaSource当从checkpoint恢复任务时,且这时候手动增加了Kafka的分区数实际上是不会被消费到的。

2.Beam中KafkaSource没有动态分区发现的功能,既不能在不手动重启任务且不从checkpoint恢复的情况下下消费到新分区的。

相关文章:

  • 招聘网站做专题的目的电工培训课程
  • 网站开发手机版有道搜索
  • 做网站最好用的软件搜索引擎优化排名工具
  • 做产品推广哪个网站好链接制作
  • 中国深圳航空公司官网免费发seo外链平台
  • 我的校园网站制作n127网推广
  • SQL SERVER存储过程
  • SQL重置自增
  • Solidity学习 - 认识Solidity合约结构
  • Windows命令连接符的安全风险分析与防御策略
  • [附源码+数据库+毕业论文+开题报告]基于Spring+MyBatis+MySQL+Maven+jsp实现的宠物领养管理系统,推荐!
  • 无人机关键算法分析 ( MPU6050 ,PID,滤波,四元数,欧拉角,IMU 姿态解算)
  • vue3中使用vue-grid-layout来实现可拖动的仪表盘面板
  • 深度学习在智能物流中的创新应用与未来趋势
  • 在统信UOS(Linux)中构建SQLite3桌面应用笔记
  • C++之string类的实现代码及其详解(上)
  • 0 数学习题本
  • 【stm32】HAL库开发——Cube配置基本定时器
  • Llama 3 + Qwen2双模型实战:单张3090构建企业级多模态知识库(2025精解版)
  • 关于 ARM64 汇编:调用流程与栈帧结构解析
  • Jenkins与Kubernetes深度整合实践
  • GitLab 18.1 高级 SAST 已支持 PHP,可升级体验!
  • OSPF 路由协议详细笔记
  • Python-7-读取/写入文件数据
  • mb_bootloop_le.elf是使用microblaze默认的elf文件,这个文件包括哪些内容?
  • n8n智能体新境界:MCP服务器简化复杂自动化