flink api-datastream api-transformation算子
Flink Transformation算子是DataStream API中用于数据转换的核心操作,主要分为以下几类:
基本转换算子
Map:一对一转换,对每个元素应用函数并输出新元素。例如将字符串转为大写或提取日志字段。
FlatMap:一对多转换,将单个元素拆分为零个或多个元素,常用于文本分词或嵌套结构展开。
Filter:根据条件过滤元素,仅保留满足布尔表达式的数据。
分组与聚合算子
KeyBy:按指定Key哈希分区,为聚合操作提供数据局部性支持。
Reduce:滚动聚合,基于前一次结果和当前元素计算新值,如累加或极值统计。
Window:基于时间或数量的窗口操作,支持滚动、滑动等窗口类型,需配合聚合函数使用。
多流操作算子
Union:合并多个同类型DataStream,不消除重复数据。
Connect:连接不同类型DataStream,生成ConnectedStreams,后续可通过CoMap/CoFlatMap处理。
Split/Select:将流按条件拆分为多个子流,再通过Select选择特定子流。
物理分区算子
Shuffle:随机均匀重分布数据,避免倾斜。
Rebalance:轮询分配数据到下游任务,实现负载均衡。
Broadcast:将数据广播到所有并行任务。
特殊转换算子
Project(仅限Tuple类型):选择字段子集,类似SQL的SELECT操作。
Iterate:迭代反馈流,用于实现循环逻辑。
例子
数据对象定义
package com.atguigu.wc.pojo;import java.util.Objects;public class WaterSensor {// 水位传感器类型public String id;// 传感器记录时间戳public Long ts;// 水位记录public Integer vc;public WaterSensor(){}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +"}";}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}}
基本转换算子
Map:
// 方式一:传入匿名类,实现MapFunction
waterSensorDataStreamSource.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.id;}
}).print();// 方式二:传入MapFunction实现类
waterSensorDataStreamSource.map(new UserMap()).print();
FlatMap:
public class TransFlatMap {public static void main(String[] ar