当前位置: 首页 > news >正文

Flink-Source算子点位提交问题(Earliest)

背景

最近在做 Flink 任务数据源切换时遇到 offset 消费问题,遂写篇文章记录下来。

切换时只修改了 source 算子的 topic,uid 等其他信息保持不变:

  1. 发布时,发现算子的消费者点位重置为earliest,导致消息积压。
  2. 消息积压后,打算通过时间戳重置点位到发布前,但是发现点位重置失效。

原因分析

source算子点位初始化模式

source算子点位初始化有两种方式:1)消费者组偏移量:setStartFromGroupOffsets;2)时间戳:setStartFromTimestamp。

消费组偏移量(FromGroupOffsets)

该方式会将 startupMode 初始化为 StartupMode.GROUP_OFFSETS:

startupMode枚举:

时间戳(FromTimestamp)

该方式会将 startupMode 初始化为 StartupMode.TIMESTAMP:

source 算子初始化

示例代码:

public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");configuration.setString("execution.savepoint.path", "xxx");configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 启用checkpointenv.enableCheckpointing(5000);env.setParallelism(1);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);ParameterTool argTools = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(argTools);// 添加数据源// "old_topic", "new_topic"FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();consumer.setStartFromGroupOffsets();DataStream<String> stream = env.addSource(consumer).uid("kafka-source").name("kafka-source");SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {@Overridepublic HeartEntity map(String value) throws Exception {HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);return heartEntity;}}).uid("map-heart").name("map-heart");// 使用状态计数DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo).map(new RichMapFunction<HeartEntity, Long>() {private transient ValueState<Long> countState;private long count = 0;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(HeartEntity value) throws Exception {count++;countState.update(count);return countState.value();}}).uid("count-map").name("count-map");// 打印计数结果countStream.print().uid("print").name("print");// 启动Flink任务env.execute("Flink Kafka State Count Example");}
从状态启动
initializeState

状态初始化时,FlinkKafkaConsumerBase 执行 initializeState 方法中:当 source topic 从 sg_lock_heart_msg_topic 切换为 sg_tw_common_com_lock_heart_report_topic 时,可以看到新 topic 绑定的 source 算子仍然是从老 topic 的算子状态启动的,因为 uid 没变。

initializeState 往下走可以看到,restoreState 的是老 topic 分区的状态;

open

算子初始化时,如果状态不为空且 topic 分区不在状态中,那么就会把新的 topic 分区加入到状态中,并设置算子消费新分区的 startupMode 为 EARLIEST_OFFSET,即从最早的消息开始消费。

老的 topic 分区不会再消费,会被移除订阅。

订阅的 topic 分区

从指定时间戳启动

setStartFromTimestamp 设置启动模式为时间戳

然而在算子初始化时,由于从状态启动,新 topic分区 仍然会从 earliest 消费:

也就是说,checkpoint/savepoint 中存储的 source 点位状态在恢复时大于设置的时间戳。

解决方案

尝试一(修改 uid)

从 source 算子初始化的 open 过程可知,既然从状态启动时会将已存在 source 算子(uid在状态中)的新 topic 点位设置为最早,那么如果将新 topic 的 uid 改成与老 topic 的 uid 不一致,是否就能避免从 earliest 恢复:因为从状态恢复时新的 uid 并不在状态中,那么就不会走 open 中将新 topic 点位置为 earliest 的流程。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");

可以看到在状态初始化阶段(initializeState),source 算子的状态 (restoreState)被置为空集合,而不是 null。为什么?

当在算子初始化时,因为 restoreState 不为 null,仍然会进入点位重置的流程:

可以看到这里将新 topic 分区放入了 restoreState 中,且点位置为 earliest(StarupMode 枚举中,EARLIEST_OFFSET = -915623761775L)。

再往下走,restoreState 会将其中的新 topic 分区放入订阅的分区中

从此,新 topic 又从最早开始消费😓。那么方案尝试一是失败的!

在线上实际操作时,消费点位确实被重置到了 earliest,又导致积压了😦。

尝试二(修改消费者组)

有没有办法让 restoreState 置为 null 呢,那就真的不会走到点位重置的流程了🎊

突然看到 restoreState 的注释:

如果消费者从状态恢复,就会设置 restoreState。那怎么让消费者不从状态恢复?无状态启动肯定是不行的,不能让其他算子的状态丢了。那我直接换个消费组名!试一试呢

Properties props = new Properties();props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");props.put("group.id", "flink-label-engine-new");

还是不行,直到目前发现只要从状态启动,context 上下文会让代码走进给 restoreState 赋值的位置。

isRestored分析

isRestore分析

尝试三(新增拓扑图)

根据算子状态恢复可知,只要新增的 source 算子跟其他已有算子形成了算子链,如果以状态启动,那么 source 的点位就会被置为 earliest。

  1. 新增一个新 topic 的 source 算子和 sink 算子(要保证新增的算子与已有算子隔离,不会形成算子链),然后修改老 source 算子的 uid 和 topic 与新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new).uid("kafka-source-new").name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");

由于从状态启动,且新加入的算子与其他算子隔离,老 source 算子的点位从状态启动;新 source 算子的点位被置为 GROUP_OFFSET。

1. 暂停并保存状态;

2. 修改老 source 算子的 uid 和 topic 与 新算子保持一致,同时删除新算子;

3. 然后从状态启动(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子点位从状态恢复:

4. 下游 “count-map”的状态是否正常:发送测试消息,可以看出状态没丢失

http://www.dtcms.com/a/267419.html

相关文章:

  • 力扣 hot100 Day35
  • STM32中实现shell控制台(命令解析实现)
  • MySQL回表查询深度解析:原理、影响与优化实战
  • 从UI设计到数字孪生实战部署:构建智慧城市的智慧照明系统
  • 【项目笔记】高并发内存池项目剖析(三)
  • NX二次开发——NX二次开发-检查点是否在面上或者体上
  • MPLS 多协议标签交换
  • Python实例题:基于 Python 的简单聊天机器人
  • springsecurity5配置之后启动项目报错:authenticationManager cannot be null
  • LangChain4j 框架模仿豆包实现智能对话系统:架构与功能详解
  • windows 安装 wsl
  • 基于matlab卡尔曼滤波器消除噪声
  • 点击方块挑战小游戏流量主微信小程序开源
  • Java+Vue开发的进销存ERP系统,集采购、销售、库存管理,助力企业数字化运营
  • 浏览器与服务器的交互
  • 深度学习图像分类数据集—百种鸟类识别分类
  • STM32中实现shell控制台(shell窗口输入实现)
  • 结构型智能科技的关键可行性——信息型智能向结构型智能的转变(修改提纲)
  • rk3128 emmc显示剩余容量为0
  • kubectl exec 遇到 unable to upgrade connection Forbidden 的解决办法
  • 浅度解读-(未完成版)浅层神经网络-多个隐层神经元
  • 解决el-select数据类型相同但是显示数字的问题
  • Python-函数、参数及参数解构-返回值作用域-递归函数-匿名函数-生成器-学习笔记
  • 从数据洞察到设计创新:UI前端如何利用数字孪生提升用户体验?
  • 【算法笔记】4.LeetCode-Hot100-数组专项
  • 操作系统---I/O核心子系统与磁盘
  • Linux操作系统之文件(四):文件系统(上)
  • pyspark大规模数据加解密优化实践
  • NVMe高速传输之摆脱XDMA设计13:PCIe初始化状态机设计
  • 2025 Centos 安装PostgreSQL