Flink之DataStream
Apache Flink 的 DataStream API 是用于 处理无限(流)或有限(批)数据流的核心编程模型,适用于事件驱动、实时分析、ETL 等场景。相比 Flink Table API,DataStream API 提供了更强的灵活性和底层控制能力。
一、基本概念
1.1 DataStream
DataStream
是 Flink 中的核心抽象,用于表示一个元素流(event stream),可以是:
-
无限流(unbounded):例如传感器数据、Kafka 日志等。
-
有限流(bounded):例如读取的文件或已结束的 Kafka topic。
1.2 类型
-
DataStream<T>
:表示非键控的数据流。 -
KeyedStream<K, T>
:对DataStream
使用.keyBy(...)
进行分区后得到的键控流。 -
SingleOutputStreamOperator<T>
:表示有后续操作(如 map/filter)后的流。
二、核心组件和操作
2.1 数据源(Sources)
通过 StreamExecutionEnvironment
创建流数据来源:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> stream = env.fromElements("a", "b", "c");
DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(...));
2.2 转换操作(Transformations)
常用操作包括:
操作 | 说明 |
---|---|
map | 一对一转换 |
flatMap | 一对多转换 |
filter | 过滤数据 |
keyBy | 按 key 分区 |
reduce | 增量聚合 |
window | 定义窗口 |
process | 更底层的流处理接口 |
示例:
DataStream<String> words = stream.flatMap((String line, Collector<String> out) -> {for (String word : line.split(" ")) out.collect(word);
}).returns(Types.STRING);
2.3 窗口操作(Windowing)
Flink 的窗口机制可用于将无限流“划分”为有限数据组:
stream.keyBy(value -> value.key).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((v1, v2) -> ...);
-
支持类型:
-
滚动窗口(Tumbling)
-
滑动窗口(Sliding)
-
会话窗口(Session)
-
2.4 时间语义
支持 3 种时间语义:
-
处理时间(Processing Time)
-
事件时间(Event Time)
-
摄取时间(Ingestion Time)
配合 Watermark 使用事件时间:
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(...));
三、状态管理(State Management)
DataStream API 支持保存状态用于:
-
聚合
-
去重
-
CEP 等复杂场景
使用 KeyedProcessFunction
或 RichFunction
可以访问状态 API:
ValueState<Integer> state;@Override
public void open(Configuration parameters) {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Integer.class));
}
四、容错与一致性
Flink 提供:
-
精确一次(Exactly-once)或至少一次(At-least-once)语义
-
基于 Checkpointing 实现
env.enableCheckpointing(10000); // 每 10 秒做一次 checkpoint
五、连接操作(Stream Joins)
支持不同类型流之间的连接:
-
connect
: 将两个不同类型流合并处理 -
union
: 合并同类型流 -
interval join
: 基于时间范围连接两个流 -
CoProcessFunction
: 对 connect 的结果使用不同逻辑处理两个流
六、输出(Sinks)
支持输出到:
-
Kafka
-
Redis
-
HDFS
-
MySQL / JDBC
-
ElasticSearch 等
示例:
stream.addSink(new FlinkKafkaProducer<>(...));
七、DataStream 和 Table API 的对比
特性 | DataStream API | Table API / SQL |
---|---|---|
灵活性 | 高(更底层) | 中(更偏向声明式) |
使用场景 | 自定义复杂逻辑、状态处理 | 结构化数据处理、简洁分析 |
容错一致性 | 支持 | 支持 |
状态控制 | 细粒度控制 | 抽象封装 |
八、示例:从 Kafka 读取并统计词频
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> input = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));input.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {for (String word : value.split(" ")) {out.collect(new Tuple2<>(word, 1));}}
})
.keyBy(t -> t.f0)
.sum(1)
.print();env.execute();