Flink Data Sink 理论 、架构、语义保证、两阶段提交与可插拔拓扑
1. 总览:Sink 在作业中的位置
- Source 负责读取(FLIP-27),Sink 负责写出(FLIP-191/372)。
- Sink 是工厂式接口,生产在 TaskManager 上运行的 SinkWriter;高级场景可再拼装 CommittingSinkWriter + Committer 和自定义拓扑。
2. 核心 API 与职责边界
Sink(可序列化的工厂)
createWriter()
:创建 SinkWriter;- 可选:实现
SupportsWriterState
(恢复)与SupportsCommitter
(两阶段提交); - 专家模式:实现
SupportsPreWriteTopology
/SupportsPreCommitTopology
/SupportsPostCommitTopology
自定义算子拓扑。
SinkWriter(数据面)
write(IN element, Context ctx)
:写一条;flush(boolean endOfInput)
:在 checkpoint 或 输入结束时被调用(At-Least-Once 的关键边界);writeWatermark(Watermark wm)
:可选,常用于滚动分桶等策略。
CommittingSinkWriter / Committer(控制面)
- Writer 在 checkpoint 上产出 Committable(预提交产物:临时文件、事务 ID…);
- Committer 在 对齐成功后原子 commit;失败恢复时依赖 幂等提交。
3. 快速上手:把 Sink 接到 DataStream
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> stream = env.fromSource(new MySource(...),WatermarkStrategy.noWatermarks(),"MySource");Sink<Integer> mySink = new MySink(...); stream.sinkTo(mySink);env.execute("Sink API Quickstart");
4. SinkWriter:写入、Flush 与水位线
- 在
write()
内只做“快速、可中断”的写入(写入内存缓冲/批次),避免长阻塞。 flush(endOfInput)
用于在 checkpoint 或收尾时刷出所有挂起数据;writeWatermark()
常见用法是“事件时间驱动的滚动”(例如滚动小文件/桶)。
5. Writer 状态:失败恢复与再平衡
实现 SupportsWriterState
→ Writer 变成 StatefulSinkWriter
:
- 在 snapshot 返回 WriterState(例如:当前临时文件名、缓冲计数、分桶信息);
- 在 restore 从 WriterState 恢复,确保幂等续写;
- 扩缩容时通过 ListState 合并/重分配状态。
6. Exactly-Once:两阶段提交(CommittingSinkWriter + Committer)
时序
- 正常写入 →
checkpoint barrier
到达; - CommittingSinkWriter.prepareCommit(flush=true):把临时产物落稳并产出 Committable;
- Committer.commit():对齐成功后原子提交(rename/事务 commit);
- 恢复时:重复的 Committable 可幂等(已存在就跳过)。
关键要点
- 事务/临时对象命名建议包含 checkpointId;
- Commit 操作必须 可重试+幂等;
- Writer 的 flush 只保证 At-Least-Once,Exactly-Once 由 Committer 收尾。
7. 自定义 Sink 拓扑(配图)
7.1 SupportsPreWriteTopology(Writer 之前)
用途:写前 分区重分发 / 乱序缓冲 / 批量整形。
典型场景:把同一 topic/partition 的数据路由到同一个 SinkWriter
。
7.2 SupportsPreCommitTopology(Writer 和 Committer 之间)
用途:将各并行 Writer 的 Committable 汇聚到少量/单一子任务做批量提交、去重与重试,显著降低后端交互压力。
7.3 SupportsPostCommitTopology(Committer 之后)
用途:提交后进行 小文件合并、构建索引、审计/告警 等异步优化,常见于对象存储/HDFS。
8. 实战最佳实践与反模式
最佳实践
- 幂等优先:即便有两阶段提交,目标端的最终写入也应尽量幂等;
- 分层批量:
write()
层缓冲 +prepareCommit()
批量提交; - 度量:
pendingCommittables
、flushLatency
、commitRetry/Fail
、inflightBatches
; - 背压友好:
write()
快速返回,I/O 放在内部缓冲/线程池; - 收尾:关注
flush(endOfInput=true)
,批作业不处理会卡尾。
反模式
- 在
write()
里做长时间阻塞或不可中断的外部调用; - Writer/Committer 里保存不可序列化的大对象;
- Commit 操作不可重试/不可幂等;
- 忽视状态版本(升级后 savepoint 失效)。
9. 测试清单
- 单元:WriterState/Committable 序列化、幂等行为;
- 组件:Flush 触发(条数/定时/checkpoint)、写后可见性;
- 容错:随机 kill TM/JM 验证 Exactly-Once;
- 扩缩容:state 重分配后的续写/续事务;
- 长稳:句柄/连接池泄漏、内存曲线、对象回收。
10. 可复用的最小代码骨架
真实业务替换“外部系统写入/提交”即可。以下代码均带中文关键注释。
10.1 最小可用 Sink(At-Least-Once)
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.common.eventtime.Watermark;
import java.io.IOException;/** 一个最小的 At-Least-Once Sink:仅实现 Writer,并在 checkpoint/end 时 flush。 */
public class ConsoleLikeSink implements Sink<String> {@Overridepublic SinkWriter<String> createWriter(InitContext context) {return new ConsoleWriter();}static class ConsoleWriter implements SinkWriter<String> {@Overridepublic void write(String element, Context context) throws IOException {// 写入逻辑(尽量非阻塞/可中断)System.out.println("WRITE: " + element);}@Overridepublic void flush(boolean endOfInput) throws IOException {// 在 checkpoint 或输入结束时调用:刷新缓冲区/刷盘System.out.flush();}@Overridepublic void writeWatermark(Watermark watermark) {// 可选:基于水位线做滚动/分桶策略}@Overridepublic void close() throws Exception { /* 释放资源 */ }}
}
10.2 带 Writer 状态的 Sink(SupportsWriterState
)
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
import java.util.List;/** 保存“当前批次缓冲”的状态,用于失败恢复后继续写入。 */
public class StatefulBatchingSink implements StatefulSink<String, StatefulBatchingSink.WriterState> {/** WriterState 示例:仅持久化缓冲条数。生产中可携带临时文件名、分桶等更多信息。 */public static class WriterState { public final int buffered; public WriterState(int b){this.buffered=b;} }@Overridepublic StatefulSinkWriter<String, WriterState> createWriter(InitContext ctx) throws IOException {return new BatchingWriter(0);}@Overridepublic StatefulSinkWriter<String, WriterState> restoreWriter(InitContext ctx, Collection<WriterState> recovered) throws IOException {int restored = recovered.stream().mapToInt(s -> s.buffered).sum();return new BatchingWriter(restored);}@Overridepublic SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {return new SimpleVersionedSerializer<WriterState>() {@Override public int getVersion() { return 1; }@Override public byte[] serialize(WriterState s) { return new byte[]{(byte)s.buffered}; }@Override public WriterState deserialize(int v, byte[] b) { return new WriterState(b[0]); }};}/** 实际 Writer:示例按条数批量 flush,状态记录缓冲条数。 */static class BatchingWriter implements StatefulSinkWriter<String, WriterState> {private int buffered;BatchingWriter(int initial){ this.buffered = initial; }@Overridepublic void write(String element, Context ctx) throws IOException {// 真实实现:写入内存缓冲/临时文件buffered++;if (buffered >= 1000) flush(false);}@Overridepublic void flush(boolean endOfInput) throws IOException {if (buffered > 0) {// 把缓冲批次一次性写出至目标(操作需幂等或可重放)// doFlush(buffered);buffered = 0;}}@Overridepublic List<WriterState> snapshotState(long checkpointId) {// 把“尚未刷新的缓冲量”记录到状态,恢复后无缝续写return List.of(new WriterState(buffered));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}
}
10.3 Exactly-Once Sink(SupportsCommitter
:两阶段提交)
import org.apache.flink.api.connector.sink2.CommittingSink;
import org.apache.flink.api.connector.sink2.CommittingSink.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import java.io.IOException;
import java.util.Collection;
import java.util.List;/** 通过“临时对象 + 原子提交(如 rename/txn.commit)”实现 Exactly-Once。 */
public class ExactlyOnceTxnSink implements CommittingSink<String, ExactlyOnceTxnSink.Committable> {/** 预提交产物:例如临时文件路径或事务ID。 */public static class Committable { public final String temp; public Committable(String t){this.temp=t;} }@Overridepublic CommittingSinkWriter<String, Committable> createWriter(InitContext context) throws IOException {return new TxnWriter();}@Overridepublic Committer<Committable> createCommitter(CommitterInitContext context) throws IOException {return new TxnCommitter();}/** Writer:写到临时对象;在 checkpoint 上返回 Committable。 */static class TxnWriter implements CommittingSinkWriter<String, Committable> {private final String temp = "tmp-" + System.nanoTime();@Overridepublic void write(String element, Context ctx) throws IOException {// 实际实现:把数据写到 temp(事务缓冲/临时文件)}@Overridepublic Collection<Committable> prepareCommit(boolean flush) throws IOException {if (flush) {// 将临时对象“落稳”(例如 fsync/close)}return List.of(new Committable(temp));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}/** Committer:对齐成功后原子提交;需保证幂等(已提交则跳过)。 */static class TxnCommitter implements Committer<Committable> {@Overridepublic void commit(Collection<CommitRequest<Committable>> requests) throws IOException, InterruptedException {for (var r : requests) {String temp = r.getCommittable().temp;// 例如:rename(temp, finalPath) 或 txn.commit(transactionId)// 若目标已存在则安全返回(幂等)}}@Override public void close() throws Exception {}}
}
结语
Data Sink API 把“写入(Writer)— 状态(State)— 提交(Committer)— 拓扑(Pre/PreCommit/PostCommit)”分层清晰:
- 仅实现 Writer 即可获得 At-Least-Once;
- 加 WriterState 即可失败恢复/扩缩容无缝续写;
- 引入 CommittingSinkWriter + Committer 实现 Exactly-Once;
- 借助三类 拓扑扩展点,可以优雅地完成路由/集中提交/小文件合并等高级需求。