当前位置: 首页 > news >正文

Flink Data Sink 理论 、架构、语义保证、两阶段提交与可插拔拓扑

1. 总览:Sink 在作业中的位置

  • Source 负责读取(FLIP-27),Sink 负责写出(FLIP-191/372)。
  • Sink 是工厂式接口,生产在 TaskManager 上运行的 SinkWriter;高级场景可再拼装 CommittingSinkWriter + Committer 和自定义拓扑。

2. 核心 API 与职责边界

Sink(可序列化的工厂)

  • createWriter():创建 SinkWriter
  • 可选:实现 SupportsWriterState(恢复)与 SupportsCommitter(两阶段提交);
  • 专家模式:实现 SupportsPreWriteTopology / SupportsPreCommitTopology / SupportsPostCommitTopology 自定义算子拓扑。

SinkWriter(数据面)

  • write(IN element, Context ctx):写一条;
  • flush(boolean endOfInput):在 checkpoint输入结束时被调用(At-Least-Once 的关键边界);
  • writeWatermark(Watermark wm):可选,常用于滚动分桶等策略。

CommittingSinkWriter / Committer(控制面)

  • Writer 在 checkpoint 上产出 Committable(预提交产物:临时文件、事务 ID…);
  • Committer 在 对齐成功后原子 commit;失败恢复时依赖 幂等提交

3. 快速上手:把 Sink 接到 DataStream

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> stream = env.fromSource(new MySource(...),WatermarkStrategy.noWatermarks(),"MySource");Sink<Integer> mySink = new MySink(...); stream.sinkTo(mySink);env.execute("Sink API Quickstart");

4. SinkWriter:写入、Flush 与水位线

  • write() 内只做“快速、可中断”的写入(写入内存缓冲/批次),避免长阻塞。
  • flush(endOfInput) 用于在 checkpoint 或收尾时刷出所有挂起数据
  • writeWatermark() 常见用法是“事件时间驱动的滚动”(例如滚动小文件/桶)。

5. Writer 状态:失败恢复与再平衡

实现 SupportsWriterState → Writer 变成 StatefulSinkWriter

  • snapshot 返回 WriterState(例如:当前临时文件名、缓冲计数、分桶信息);
  • restore 从 WriterState 恢复,确保幂等续写
  • 扩缩容时通过 ListState 合并/重分配状态。

6. Exactly-Once:两阶段提交(CommittingSinkWriter + Committer)

时序

  1. 正常写入 → checkpoint barrier 到达;
  2. CommittingSinkWriter.prepareCommit(flush=true):把临时产物落稳并产出 Committable
  3. Committer.commit():对齐成功后原子提交(rename/事务 commit);
  4. 恢复时:重复的 Committable 可幂等(已存在就跳过)。

关键要点

  • 事务/临时对象命名建议包含 checkpointId
  • Commit 操作必须 可重试+幂等
  • Writer 的 flush 只保证 At-Least-Once,Exactly-Once 由 Committer 收尾。

7. 自定义 Sink 拓扑(配图)

7.1 SupportsPreWriteTopology(Writer 之前)

在这里插入图片描述

用途:写前 分区重分发 / 乱序缓冲 / 批量整形。
典型场景:把同一 topic/partition 的数据路由到同一个 SinkWriter

7.2 SupportsPreCommitTopology(Writer 和 Committer 之间)

在这里插入图片描述

用途:将各并行 Writer 的 Committable 汇聚到少量/单一子任务做批量提交去重与重试,显著降低后端交互压力。

7.3 SupportsPostCommitTopology(Committer 之后)

在这里插入图片描述

用途:提交后进行 小文件合并构建索引审计/告警 等异步优化,常见于对象存储/HDFS。

8. 实战最佳实践与反模式

最佳实践

  • 幂等优先:即便有两阶段提交,目标端的最终写入也应尽量幂等;
  • 分层批量write() 层缓冲 + prepareCommit() 批量提交;
  • 度量pendingCommittablesflushLatencycommitRetry/FailinflightBatches
  • 背压友好write() 快速返回,I/O 放在内部缓冲/线程池;
  • 收尾:关注 flush(endOfInput=true),批作业不处理会卡尾

反模式

  • write() 里做长时间阻塞或不可中断的外部调用;
  • Writer/Committer 里保存不可序列化的大对象;
  • Commit 操作不可重试/不可幂等
  • 忽视状态版本(升级后 savepoint 失效)。

9. 测试清单

  • 单元:WriterState/Committable 序列化、幂等行为;
  • 组件:Flush 触发(条数/定时/checkpoint)、写后可见性;
  • 容错:随机 kill TM/JM 验证 Exactly-Once;
  • 扩缩容:state 重分配后的续写/续事务;
  • 长稳:句柄/连接池泄漏、内存曲线、对象回收。

10. 可复用的最小代码骨架

真实业务替换“外部系统写入/提交”即可。以下代码均带中文关键注释

10.1 最小可用 Sink(At-Least-Once)

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.common.eventtime.Watermark;
import java.io.IOException;/** 一个最小的 At-Least-Once Sink:仅实现 Writer,并在 checkpoint/end 时 flush。 */
public class ConsoleLikeSink implements Sink<String> {@Overridepublic SinkWriter<String> createWriter(InitContext context) {return new ConsoleWriter();}static class ConsoleWriter implements SinkWriter<String> {@Overridepublic void write(String element, Context context) throws IOException {// 写入逻辑(尽量非阻塞/可中断)System.out.println("WRITE: " + element);}@Overridepublic void flush(boolean endOfInput) throws IOException {// 在 checkpoint 或输入结束时调用:刷新缓冲区/刷盘System.out.flush();}@Overridepublic void writeWatermark(Watermark watermark) {// 可选:基于水位线做滚动/分桶策略}@Overridepublic void close() throws Exception { /* 释放资源 */ }}
}

10.2 带 Writer 状态的 Sink(SupportsWriterState

import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
import java.util.List;/** 保存“当前批次缓冲”的状态,用于失败恢复后继续写入。 */
public class StatefulBatchingSink implements StatefulSink<String, StatefulBatchingSink.WriterState> {/** WriterState 示例:仅持久化缓冲条数。生产中可携带临时文件名、分桶等更多信息。 */public static class WriterState { public final int buffered; public WriterState(int b){this.buffered=b;} }@Overridepublic StatefulSinkWriter<String, WriterState> createWriter(InitContext ctx) throws IOException {return new BatchingWriter(0);}@Overridepublic StatefulSinkWriter<String, WriterState> restoreWriter(InitContext ctx, Collection<WriterState> recovered) throws IOException {int restored = recovered.stream().mapToInt(s -> s.buffered).sum();return new BatchingWriter(restored);}@Overridepublic SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {return new SimpleVersionedSerializer<WriterState>() {@Override public int getVersion() { return 1; }@Override public byte[] serialize(WriterState s) { return new byte[]{(byte)s.buffered}; }@Override public WriterState deserialize(int v, byte[] b) { return new WriterState(b[0]); }};}/** 实际 Writer:示例按条数批量 flush,状态记录缓冲条数。 */static class BatchingWriter implements StatefulSinkWriter<String, WriterState> {private int buffered;BatchingWriter(int initial){ this.buffered = initial; }@Overridepublic void write(String element, Context ctx) throws IOException {// 真实实现:写入内存缓冲/临时文件buffered++;if (buffered >= 1000) flush(false);}@Overridepublic void flush(boolean endOfInput) throws IOException {if (buffered > 0) {// 把缓冲批次一次性写出至目标(操作需幂等或可重放)// doFlush(buffered);buffered = 0;}}@Overridepublic List<WriterState> snapshotState(long checkpointId) {// 把“尚未刷新的缓冲量”记录到状态,恢复后无缝续写return List.of(new WriterState(buffered));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}
}

10.3 Exactly-Once Sink(SupportsCommitter:两阶段提交)

import org.apache.flink.api.connector.sink2.CommittingSink;
import org.apache.flink.api.connector.sink2.CommittingSink.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import java.io.IOException;
import java.util.Collection;
import java.util.List;/** 通过“临时对象 + 原子提交(如 rename/txn.commit)”实现 Exactly-Once。 */
public class ExactlyOnceTxnSink implements CommittingSink<String, ExactlyOnceTxnSink.Committable> {/** 预提交产物:例如临时文件路径或事务ID。 */public static class Committable { public final String temp; public Committable(String t){this.temp=t;} }@Overridepublic CommittingSinkWriter<String, Committable> createWriter(InitContext context) throws IOException {return new TxnWriter();}@Overridepublic Committer<Committable> createCommitter(CommitterInitContext context) throws IOException {return new TxnCommitter();}/** Writer:写到临时对象;在 checkpoint 上返回 Committable。 */static class TxnWriter implements CommittingSinkWriter<String, Committable> {private final String temp = "tmp-" + System.nanoTime();@Overridepublic void write(String element, Context ctx) throws IOException {// 实际实现:把数据写到 temp(事务缓冲/临时文件)}@Overridepublic Collection<Committable> prepareCommit(boolean flush) throws IOException {if (flush) {// 将临时对象“落稳”(例如 fsync/close)}return List.of(new Committable(temp));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}/** Committer:对齐成功后原子提交;需保证幂等(已提交则跳过)。 */static class TxnCommitter implements Committer<Committable> {@Overridepublic void commit(Collection<CommitRequest<Committable>> requests) throws IOException, InterruptedException {for (var r : requests) {String temp = r.getCommittable().temp;// 例如:rename(temp, finalPath) 或 txn.commit(transactionId)// 若目标已存在则安全返回(幂等)}}@Override public void close() throws Exception {}}
}

结语

Data Sink API 把“写入(Writer)— 状态(State)— 提交(Committer)— 拓扑(Pre/PreCommit/PostCommit)”分层清晰:

  • 仅实现 Writer 即可获得 At-Least-Once
  • WriterState 即可失败恢复/扩缩容无缝续写;
  • 引入 CommittingSinkWriter + Committer 实现 Exactly-Once
  • 借助三类 拓扑扩展点,可以优雅地完成路由/集中提交/小文件合并等高级需求。
http://www.dtcms.com/a/507872.html

相关文章:

  • DeviceNet转Ethernet/IP食品饮料包装线码垛机器人高效通信方案
  • 《基于分布式多模态传感模块的全身尺度机器人皮肤:设计、评估与应用》TR-O 2025论文解析
  • 亿万网站网站开发详细流程
  • 还是网站好买一个app软件要多少钱
  • 无锡万度网站建设WordPress Demo演示
  • 智能外呼是什么意思
  • 【读论文】——基于光谱学的玉米种子品质检测及其成像技术综述
  • 如何自建网站满分作文网
  • 服务器/Pytorch——对于只调用一次的函数初始化,放在for训练外面和里面的差异
  • iOS 混淆与 IPA 加固一页式行动手册(多工具组合实战 源码成品运维闭环)
  • PySide6 使用搜索引擎搜索 多类实现 更新1次
  • 宁波网站优化的关键企业网站后台管理系统模板
  • 网站开发项目需求分析说明书电子商务网站开发与实现
  • 群晖实现证书90天自动更新(无需对外提供80端口)
  • AMCL自适应(KLD - Sampling: Adaptive Particle Filters)一种基于粒子滤波的移动机器人定位算法
  • NOR FLASH
  • 网站代码优化方案网站建设和编程的区别
  • 重庆建设工程造价管理协会网站直播网站开发价格
  • 【Nest】集成测试
  • ELK运维之路(Logstash基础使用-7.17.24)
  • 快速排序(JAVA详细讲解快速排序的四种方式)
  • 数据结构四大简单排序算法详解:直接插入排序、选择排序、基数排序和冒泡排序
  • 官渡网站建设wordpress单页面制作
  • 企业电子商务网站开发数据库设计昆明seo博客
  • 东道 网站建设erp系统哪家做得好
  • 现代 Web 开发中检测用户离开页面的完整方案(附 Vue 实现)
  • [crackme]029-figugegl.1
  • 网站建站分辨率腾讯企点怎么注册
  • 第四章:L2CAP 的“数据语言”——揭秘蓝牙通信的报文格式
  • 【代码随想录算法训练营——Day43(Day42周日休息)】动态规划——300.最长递增子序列、674.最长连续递增序列、718.最长重复子数组