flink api-datastream api-sink算子
Flink的Sink算子是流处理管道中的最终操作节点,负责将处理后的数据输出到外部系统。以下是其核心要点:
核心功能与定位
Sink算子通过addSink
或sinkTo
方法实现数据输出,是Flink作业的终点站,支持将结果写入文件系统、数据库、消息队列等外部存储。其设计需保障状态一致性,通过检查点机制确保故障恢复时的数据正确性。
主要类型与实现
-
文件系统Sink
- 早期使用
writeAsText
/writeAsCsv
(已弃用),并行度影响文件输出形式(单文件或目录)。 - 推荐使用
StreamingFileSink
,支持行编码(forRowFormat
)和批量编码(forBulkFormat
),自动分桶存储,适应分布式环境。
- 早期使用
-
数据库Sink
- 通过JDBCOutputFormat或自定义
RichSinkFunction
实现MySQL等关系型数据库写入,需在open
生命周期建立连接。
- 通过JDBCOutputFormat或自定义
-
消息队列Sink
- Kafka集成需配置生产者地址、主题及序列化器(如
SimpleStringSchema
),依赖flink-connector-kafka
模块。
- Kafka集成需配置生产者地址、主题及序列化器(如
-
自定义Sink
实现SinkFunction
接口并重写invoke
方法,可灵活对接任意外部系统。例如,通过RichSinkFunction
复用连接资源。
关键机制
- 二阶段提交协议:保障端到端精确一次(Exactly-Once)语义,协调Flink与外部系统的数据一致性。
- 分桶策略:文件Sink默认按时间分桶(如每小时新桶),支持自定义分区规则。
版本演进
- Flink 1.12前使用
addSink
,之后推荐sinkTo
API,架构更清晰。 FileSink
替代StreamingFileSink
,统一批流写入接口。
例子
- 文件系统Sink
/*** Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。* FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构造器,可以直接调用FileSink的静态方法。* 行编码:FileSink.forRowFormat(bathPath,rowEncoder)* 批量编码:FileSink.forBulkFormat(bathPath,bulkWriterFactory)*/
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有并行度个数的文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是.inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> map = env.fromElements(123L,234L, 345L).map(String::valueOf);// 输出到文件系统FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("input/"),new SimpleStringEncoder<>("UTF-8")).withOutputFileConfig(OutputFileConfig.builder