当前位置: 首页 > news >正文

从 0 到 1Flink DataStream API 入门与上手实战

一、为什么选择 DataStream API?

DataStream API 是 Flink 处理连续数据流的核心编程接口,具备:

  • 低延迟高吞吐:天然面向流处理的执行引擎。
  • 强大的序列化与类型系统:原生支持基本类型、Tuples、POJOs;其他类型回退 Kryo;也可用 Avro 等外部序列化器。
  • 灵活的数据接入与输出:从文件到 Kafka/Kinesis,再到数据库、对象存储、消息系统。

一句话:只要能被序列化,就能被 DataStream 流式处理。

二、可流式处理的数据类型与序列化

1)基础与复合类型

  • 基本类型:String / Long / Integer / Boolean / Array
  • 复合类型:TuplesPOJOs
  • 其他类型默认回退 Kryo;也可替换为 Avro 等(推荐在跨语言/严格 schema 场景下使用)。

2)Java Tuples

Flink 在 Java 中定义了 Tuple0 ~ Tuple25

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// 注意:下标从 0 开始
String name = person.f0;
Integer age  = person.f1;

适用场景:原型/临时任务、字段不复杂时;大型项目建议用 POJO/Avro 以提升可读性与演进性。

3)POJOs(强烈推荐)

Flink 将满足以下条件的类识别为 POJO,并支持按字段名访问与schema 演进

  • 类是 public独立(不是非静态内部类)
  • 具有 public 无参构造方法
  • 所有非静态非 transient字段要么 publicfinal,要么有符合 JavaBeans 规范的 getter/setter
    示例:
public class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}
}
Person person = new Person("Fred Flintstone", 35);

最佳实践:POJO 字段改动、顺序调整等常见 schema 演进,Flink 原生序列化器可平滑处理。

三、首个 DataStream 示例:过滤成年人

下面的最小示例会在本地构建一个人员流,只保留年龄 ≥ 18 的元素,并打印输出。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromData(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) {return person.age >= 18;}});// 输出到 TaskManager 日志(IDE 控制台可见)adults.print();// 别忘了执行!env.execute("Adults Filter");}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}@Overridepublic String toString() {return name + ": age " + age;}}
}

输出示例:

1> Fred: age 35
2> Wilma: age 35

前缀 1> / 2> 表示由哪个并行子任务打印(线程/子任务编号)。

四、执行环境与作业提交流程(你在代码里做了什么)

  • 每个 Flink 应用都需要一个 StreamExecutionEnvironment(流式程序必须使用此环境)。
  • 你在代码里调用的 DataStream API(source → transform → sink)会构建作业图并绑定在 env 上。
  • 调用 env.execute() 后,作业图被打包交给 JobManager,由其并行化并分发到多个 TaskManager 上运行,每个task slot承担作业的一段。

重要提醒:不调用 execute(),作业不会运行。

五、常用输入 Source(从原型到生产)

1)快速原型

  • fromData:快速构造小数据流(如上例)
  • Socket:配合 nc -lk 9999 快速进数据
DataStream<String> lines = env.socketTextStream("localhost", 9999);

2)文件/目录

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///path")
).build();DataStream<String> lines = env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file-input"
);

3)生产级数据源(推荐)

  • Kafka / Kinesis / 各类分布式文件系统:低延迟、高吞吐、支持回溯与重放,是高性能与容错的基础。
  • REST / 数据库:常用于维表/画像流式富化(可配合 Async I/O 或 CDC)。

六、常用输出 Sink(打印只是开始)

  • 打印print() 便于开发期查看。
  • 生产常见FileSink、各类数据库 sink(JDBC/NoSQL)、消息中间件(Kafka 等)。
  • 注意语义保证(At-least-once / Exactly-once)与端到端一致性的配置与成本。

七、开发/调试:先在本地跑顺畅

  • 本地 IDE 调试:设置断点、查看本地变量、逐句执行。必要时直接 Step Into Flink 源码,理解内部机制。
  • 远程/容器集群排障:借助 Web UI、JobManager/TaskManager 日志定位异常。
  • 依赖与可序列化:作业分布式运行要求对象可序列化依赖在各节点可用(例如通过 fat JAR/shade 打包)。

八、最佳实践与易错点

  1. 别忘了 env.execute():常见“空跑”问题。
  2. 类型与序列化:POJO 要满足 Flink 规则;自定义类型确认可序列化;必要时切换 Avro 并管理 schema。
  3. 并行度与输出日志:打印输出前缀编号就是并行子任务 ID;输出乱序是正常的。
  4. Source/Sink 语义:生产中关注端到端一致性(尤其是 Kafka → Flink → 外部存储)。
  5. 事件时间与水位线:原型阶段可用处理时间,真实乱序场景需事件时间 + watermark(本文未展开,后续进阶)。

九、动手练习(Hands-on 建议)

  • 从官方 training repo 开始,按 README 拉起环境。
  • 第一练:Filtering a Stream(Ride Cleansing)
    重点体会:类型建模、算子组合、IDE 调试、Web UI 观察拓扑与并行度。

练习时请尝试:把 fromData 替换为 socketTextStream,或加上 map → keyBy → reduce 的小链路,感受 DataStream 的“搭积木”思路。

十、参考路线与延伸阅读

  • 选择序列化器性能调优(优先 POJO/Avro,能定 schema 就别回退到通用 Kryo)
  • 一个 Flink 程序的解剖:从 env 到 job graph、并行度、checkpoint
  • Data Sources / Sinks / Connectors:根据你的基础设施选型
  • 状态、时间与一致性(下一阶段重点):Keyed State、Timer、Watermark、Checkpoint、Exactly-once
http://www.dtcms.com/a/406833.html

相关文章:

  • 做网站设计电脑买什么高端本好营销企业有哪些
  • 系统架构设计师备考第34天——软件架构风格
  • postman使用总结
  • 做网站 怎么连到数据库怎么做存储网站
  • Java 后端面试技术文档(参考)
  • 分享智能跳绳解决方案
  • 毕业设计的网站app开发公司介绍
  • WebSocket实时通信不卡顿:cpolar内网穿透实验室第503个成功挑战
  • PyTorch 数据处理工具箱
  • C++项目:仿muduo库高并发服务器-------时间轮定时器
  • 边玩边学,13个Python小游戏(含源码)
  • 有了域名怎样做淘客网站中国铁建统一企业门户
  • 大连网站排名网络推广公司一个很好的个人网站开发
  • Windows文件快速检索工具:基于PyQt5的高效实现
  • C++Primerplus 编程练习 第十三章
  • Custom SRP 11 - Post Processing
  • 【Linux】进程替换
  • wordpress调用目录网址seo查询
  • 【C++】模版专题
  • K8s实践中的重点知识
  • 云栖2025 | 人工智能平台 PAI 年度发布
  • 【文献管理工具】学术研究的智能助手—Zotero 文献管理工具详细图文安装教程
  • H5平台网站建设wordpress 会话已过期
  • 建论坛网站印度人通过什么网站做国际贸易
  • UniApp ConnectSocket连接websocket
  • 正点原子【第四期】Linux之驱动开发学习笔记-5.1 设备树下的LED驱动实验
  • uniapp中全局封装一个跨组件的复制粘贴方法
  • 新奇特:神经网络烘焙坊(上),权重矩阵的甜蜜配方之谜
  • 分布式调度问题:定时任务
  • labelimg(目标检测标注工具)的安装、使用教程和问题解决