当前位置: 首页 > news >正文

flink api-datastream api-sink算子

Flink的Sink算子是流处理管道中的最终操作节点,负责将处理后的数据输出到外部系统。以下是其核心要点:

核心功能与定位

Sink算子通过addSinksinkTo方法实现数据输出,是Flink作业的终点站,支持将结果写入文件系统、数据库、消息队列等外部存储。其设计需保障状态一致性,通过检查点机制确保故障恢复时的数据正确性。

主要类型与实现

  1. 文件系统Sink

    • 早期使用writeAsText/writeAsCsv(已弃用),并行度影响文件输出形式(单文件或目录)。
    • 推荐使用StreamingFileSink,支持行编码(forRowFormat)和批量编码(forBulkFormat),自动分桶存储,适应分布式环境。
  2. 数据库Sink

    • 通过JDBCOutputFormat或自定义RichSinkFunction实现MySQL等关系型数据库写入,需在open生命周期建立连接。
  3. 消息队列Sink

    • Kafka集成需配置生产者地址、主题及序列化器(如SimpleStringSchema),依赖flink-connector-kafka模块。
  4. 自定义Sink
    实现SinkFunction接口并重写invoke方法,可灵活对接任意外部系统。例如,通过RichSinkFunction复用连接资源。

关键机制

  • 二阶段提交协议:保障端到端精确一次(Exactly-Once)语义,协调Flink与外部系统的数据一致性。
  • 分桶策略:文件Sink默认按时间分桶(如每小时新桶),支持自定义分区规则。

版本演进

  • Flink 1.12前使用addSink,之后推荐sinkTo API,架构更清晰。
  • FileSink替代StreamingFileSink,统一批流写入接口。

例子

  1. 文件系统Sink
/*** Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。* FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构造器,可以直接调用FileSink的静态方法。* 行编码:FileSink.forRowFormat(bathPath,rowEncoder)* 批量编码:FileSink.forBulkFormat(bathPath,bulkWriterFactory)*/
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有并行度个数的文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是.inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> map = env.fromElements(123L,234L, 345L).map(String::valueOf);// 输出到文件系统FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("input/"),new SimpleStringEncoder<>("UTF-8")).withOutputFileConfig(OutputFileConfig.builder
http://www.dtcms.com/a/424545.html

相关文章:

  • 有没有专门做衣服搭配的网站怎样在织梦后台里面做网站地图
  • 【go】普通map和sync.map的区别,源码解析
  • wordpress多站点详细设置(图解)建个个人网站一年多少钱
  • Python bisect
  • Docker 安装与核心知识总结
  • 编辑网站化妆品网页设计素材
  • 做视频网站的技能可以自己制作广告的软件
  • Jupyter Notebook下载安装使用教程(附安装包,图文并茂)
  • 《算法与数据结构》第七章[算法2]:广度优先搜索(BFS)
  • Salesforce 知识点:Connected App
  • 通用系统资源监控命令(Linux)
  • 衡水网站建设知识企业站系统
  • 做房产网站用什么软件亚马逊雨林的资料
  • airsim多无人机+无人车联合仿真辅导
  • 深度学习:池化(Pooling)
  • 亚圣信息科技做网站怎么样社交网站 cms
  • ftp网站目录做旅行同业的网站
  • 9.3 堆排序(排序(上))
  • 怎么向企业推销网站建设外国网站域名
  • gradle task build 渠道包
  • 【Java】P9 面向对象编程完全指南(S1-2 基础篇 深入理解Java方法的四个重要概念)
  • 网站如何做移动适配网站的推广是怎么做的
  • almalinux MySQL8.0安装
  • python做网站建e全景效果图
  • 网站建设费可以抵扣么推广网上国网有什么好处
  • 【APK安全】WebView组件的安全风险与防御指南
  • 秦皇岛网站定制哪家好厦门市建设局网站咨询电话
  • 是阿里巴巴好还是自己做网站好?wordpress nginx配置伪静态
  • 夫妻工作室网站建设枣庄网站seo
  • 【Android】一个demo理解dispatchTouchEvent、onInterceptTouchEvent与onTouchEvent