当前位置: 首页 > news >正文

Flink -DataStream API 流处理的核心接口

DataStream API 核心操作

DataStream API 是 Flink 流处理的核心接口,提供了丰富的操作来处理无界数据流。本章将详细介绍 DataStream API 的核心操作。

1. 数据源操作

1.1 基于集合的数据源

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;public class CollectionSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从元素创建流DataStream<String> fromElements = env.fromElements("Hello", "World", "Flink");// 从集合创建流List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);DataStream<Integer> fromCollection = env.fromCollection(numbers);// 从迭代器创建流DataStream<Long> fromIterator = env.fromSequence(1, 10);fromElements.print("Elements");fromCollection.print("Collection");fromIterator.print("Iterator");env.execute("Collection Source Example");}
}

1.2 基于 Socket 的数据源

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocketSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 Socket 创建流DataStream<String> socketStream = env.socketTextStream("localhost", 9999);socketStream.print("Socket Data");env.execute("Socket Source Example");}
}

1.3 自定义数据源

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class CustomSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 自定义数据源DataStream<String> customSource = env.addSource(new RandomWordSource());customSource.print("Custom Source");env.execute("Custom Source Example");}public static class RandomWordSource implements SourceFunction<String> {private volatile boolean isRunning = true;private final String[] words = {"Apache", "Flink", "Stream", "Processing", "Java", "BigData"};private final Random random = new Random();@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {// 随机选择一个单词String word = words[random.nextInt(words.length)];ctx.collect(word);// 每秒发送一个单词Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}}
}

2. 基本转换操作

2.1 Map 操作

Map 操作对流中的每个元素应用一个函数,将其转换为另一个元素。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromElements("hello", "world", "flink", "streaming");// 使用匿名内部类DataStream<String> upperCase = text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}});// 使用 Lambda 表达式DataStream<Integer> lengths = text.map(String::length);// 使用自定义 MapFunctionDataStream<String> prefixed = text.map(new AddPrefixMapFunction());upperCase.print("Upper Case");lengths.print("Lengths");prefixed.print("Prefixed");env.execute("Map Example");}public static class AddPrefixMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {return "Prefix_" + value;}}
}

2.2 FlatMap 操作

FlatMap 操作对流中的每个元素应用一个函数,可以产生零个、一个或多个元素。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> sentences = env.fromElements("Hello World Flink","Stream Processing","Apache Flink is powerful");// 使用匿名内部类分割句子为单词DataStream<String> words = sentences.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String sentence, Collector<String> out) throws Exception {for (String word : sentence.split(" ")) {if (!word.isEmpty()) {out.collect(word.toLowerCase());}}}});// 使用 Lambda 表达式DataStream<String> filteredWords = sentences.flatMap((String sentence, Collector<String> out) -> {for (String word : sentence.split(" ")) {if (!word.isEmpty() && word.length() > 3) {out.collect(word);}}}).returns(String.class);words.print("All Words");filteredWords.print("Filtered Words");env.execute("FlatMap Example");}
}

2.3 Filter 操作

Filter 操作根据指定条件过滤流中的元素。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 过滤偶数DataStream<Integer> evenNumbers = numbers.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}});// 过滤大于5的数DataStream<Integer> greaterThanFive = numbers.filter(value -> value > 5);// 复杂过滤条件DataStream<Integer> complexFilter = numbers.filter(new NumberFilter());evenNumbers.print("Even Numbers");greaterThanFive.print("Greater Than Five");complexFilter.print("Complex Filter");env.execute("Filter Example");}public static class NumberFilter implements FilterFunction<Integer> {@Overridepublic boolean filter(Integer value) throws Exception {// 过滤既是奇数又大于3的数return value % 2 != 0 && value > 3;}}
}

3. 聚合操作

3.1 KeyBy 操作

KeyBy 操作根据指定的键对流进行分区,使得具有相同键的元素被发送到同一个算子实例。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建用户行为数据流DataStream<Tuple2<String, Integer>> userActions = env.fromElements(new Tuple2<>("user1", 10),new Tuple2<>("user2", 20),new Tuple2<>("user1", 15),new Tuple2<>("user3", 30),new Tuple2<>("user2", 25));// 根据用户ID进行分组KeyedStream<Tuple2<String, Integer>, String> keyedStream = userActions.keyBy(tuple -> tuple.f0);// 对每个用户的动作值求和DataStream<Tuple2<String, Integer>> userSums = keyedStream.sum(1);userActions.print("Original");userSums.print("Sum by User");env.execute("KeyBy Example");}
}

3.2 Reduce 操作

Reduce 操作对分组后的数据进行聚合计算。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建销售数据流DataStream<Tuple3<String, String, Integer>> sales = env.fromElements(new Tuple3<>("store1", "productA", 100),new Tuple3<>("store2", "productB", 200),new Tuple3<>("store1", "productA", 150),new Tuple3<>("store1", "productB", 80),new Tuple3<>("store2", "productA", 120));// 按商店和产品分组,计算总销售额DataStream<Tuple3<String, String, Integer>> totalSales = sales.keyBy(tuple -> Tuple2.of(tuple.f0, tuple.f1)) // 按商店和产品分组.reduce((tuple1, tuple2) -> new Tuple3<>(tuple1.f0, // 商店tuple1.f1, // 产品tuple1.f2 + tuple2.f2 // 销售额累加));sales.print("Original Sales");totalSales.print("Total Sales");env.execute("Reduce Example");}
}

3.3 Aggregations 聚合操作

Flink 提供了多种预定义的聚合操作。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class AggregationsExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建温度数据流DataStream<Tuple2<String, Double>> temperatures = env.fromElements(new Tuple2<>("sensor1", 23.5),new Tuple2<>("sensor2", 25.2),new Tuple2<>("sensor1", 24.1),new Tuple2<>("sensor3", 22.8),new Tuple2<>("sensor2", 26.0),new Tuple2<>("sensor1", 23.9));// 按传感器分组DataStream<Tuple2<String, Double>> keyedTemperatures = temperatures.keyBy(tuple -> tuple.f0);// 计算每个传感器的最高温度DataStream<Tuple2<String, Double>> maxTemperatures = keyedTemperatures.max(1);// 计算每个传感器的最低温度DataStream<Tuple2<String, Double>> minTemperatures = keyedTemperatures.min(1);// 计算每个传感器的温度总和DataStream<Tuple2<String, Double>> sumTemperatures = keyedTemperatures.sum(1);// 计算每个传感器的平均温度(使用 minBy/maxBy)DataStream<Tuple2<String, Double>> minByTemperatures = keyedTemperatures.minBy(1);DataStream<Tuple2<String, Double>> maxByTemperatures = keyedTemperatures.maxBy(1);temperatures.print("Original Temperatures");maxTemperatures.print("Max Temperatures");minTemperatures.print("Min Temperatures");sumTemperatures.print("Sum Temperatures");minByTemperatures.print("Min By Temperatures");maxByTemperatures.print("Max By Temperatures");env.execute("Aggregations Example");}
}

4. 连接操作

4.1 Connect 操作

Connect 操作可以连接两个类型不同的数据流。

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;public class ConnectExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建两个不同类型的数据流DataStream<String> controlStream = env.fromElements("START", "STOP", "RESET");DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 连接两个流ConnectedStreams<String, Integer> connectedStreams = controlStream.connect(dataStream);// 使用 CoMapFunction 处理连接的流DataStream<String> result = connectedStreams.map(new ControlFunction());result.print("Connected Stream Result");env.execute("Connect Example");}public static class ControlFunction implements CoMapFunction<String, Integer, String> {private boolean enabled = false;private int processedCount = 0;@Overridepublic String map1(String controlCommand) throws Exception {switch (controlCommand) {case "START":enabled = true;return "Control: START command received";case "STOP":enabled = false;return "Control: STOP command received";case "RESET":processedCount = 0;return "Control: RESET command received";default:return "Control: Unknown command " + controlCommand;}}@Overridepublic String map2(Integer data) throws Exception {if (enabled) {processedCount++;return "Data: " + data + " (processed count: " + processedCount + ")";} else {return "Data: " + data + " (ignored - processing disabled)";}}}
}

4.2 Union 操作

Union 操作可以合并多个类型相同的数据流。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建多个数据流DataStream<String> stream1 = env.fromElements("A", "B", "C");DataStream<String> stream2 = env.fromElements("D", "E", "F");DataStream<String> stream3 = env.fromElements("G", "H", "I");// 合并多个流DataStream<String> unionStream = stream1.union(stream2, stream3);unionStream.print("Union Stream");env.execute("Union Example");}
}

5. 分区操作

5.1 Shuffle 操作

Shuffle 操作随机重新分配数据流中的元素。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ShuffleExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 随机重新分区DataStream<Integer> shuffled = numbers.shuffle();numbers.print("Original");shuffled.print("Shuffled");env.execute("Shuffle Example");}
}

5.2 Rebalance 操作

Rebalance 操作在并行度发生变化时均匀地重新分配数据。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class RebalanceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 重新平衡数据DataStream<Integer> rebalanced = numbers.rebalance();numbers.print("Original");rebalanced.print("Rebalanced");env.execute("Rebalance Example");}
}

6. 数据分流操作

6.1 Split 和 Select 操作(已废弃)

注意:Split 和 Select 操作在 Flink 1.17 中已被废弃,推荐使用 Side Output。

6.2 Side Output 操作

Side Output 允许在主数据流之外输出额外的数据流。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 定义侧输出标签final OutputTag<Integer> evenNumbersTag = new OutputTag<Integer>("even-numbers"){};final OutputTag<Integer> oddNumbersTag = new OutputTag<Integer>("odd-numbers"){};// 使用 ProcessFunction 进行分流SingleOutputStreamOperator<Integer> mainStream = numbers.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {// 输出到偶数侧输出流ctx.output(evenNumbersTag, value);} else {// 输出到奇数侧输出流ctx.output(oddNumbersTag, value);}// 主流不输出任何数据}});// 获取侧输出流DataStream<Integer> evenNumbers = mainStream.getSideOutput(evenNumbersTag);DataStream<Integer> oddNumbers = mainStream.getSideOutput(oddNumbersTag);evenNumbers.print("Even Numbers");oddNumbers.print("Odd Numbers");env.execute("Side Output Example");}
}

详细介绍了 DataStream API 的核心操作,包括数据源操作、基本转换操作、聚合操作、连接操作、分区操作和数据分流操作。这些操作构成了 Flink 流处理的基础,掌握它们对于开发 Flink 应用程序至关重要。

http://www.dtcms.com/a/553892.html

相关文章:

  • Android EDLA 打开5G热点失败分析解决2
  • 长沙网站seo收费网站怎么做图片动态图片不显示不出来的
  • (107页PPT)园区智能楼宇BIM云平台方案(附下载方式)
  • 昆山苏州网站建设网站怎么修改好之后再上线
  • 【搭建】个人博客网站的搭建
  • Rust开发之Trait作为参数与返回值使用
  • 深入解析linux 的 rsyncd服务
  • 长沙做旅游网站多少钱建设厅网站用户名和密码
  • 设计网站推广公司网页制作怎样做安居客网站
  • Python快速入门专业版(五十五):Requests库入门:HTTP请求实战与Header伪装(避坑403反爬)
  • 软件设计师重点笔记-4
  • rabbitmq-k8s下双架构镜像+手动sts部署完全文档(上)
  • 如何使用 C# 将 CSV 数据轻松转换为 PDF
  • 【每天一个知识点】数据湖(Data Lake)与数据仓库(Data Warehouse)
  • 深入理解外边距重叠与 BFC —— 为什么粉色背景多出一块?
  • 网站开发学什么数据库网站建设美工百度百科
  • 怎样制作网站站点免费的网站认证
  • 使用cvx工具箱求解svm的原问题及其对偶问题
  • 国内免费无版权视频素材网站泉州做网站设计公司
  • CVPR-2025 | 端到端导航智能体的推理能力探究:动态系统学习、规划能力与记忆使用
  • 百度网盘下载怎么免费提速?2025最新教程分享
  • 一个交易网站开发的成本是多少钱上海市中学生典型事例网站
  • 网站 验证码错误本地南京网站建设
  • 如何通过右键实现音视频/PDF/Office 一键格式转换?
  • 深入理解 Python 的 __init_subclass__ 方法:自定义类行为的新方式 (Effective Python 第48条)
  • 用遗传算法求解“旅行商问题(TSP)”
  • 蜜桃汇免费的wordpress账号网站文章来源seo
  • 嘉立创EDA四层板PCB学习记录(44小点)
  • 使用yolov8训练自己的数据集
  • 中高端社交网站建设服务商织梦个人网站模板