Flink -DataStream API 流处理的核心接口
DataStream API 核心操作
DataStream API 是 Flink 流处理的核心接口,提供了丰富的操作来处理无界数据流。本章将详细介绍 DataStream API 的核心操作。
1. 数据源操作
1.1 基于集合的数据源
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;public class CollectionSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从元素创建流DataStream<String> fromElements = env.fromElements("Hello", "World", "Flink");// 从集合创建流List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);DataStream<Integer> fromCollection = env.fromCollection(numbers);// 从迭代器创建流DataStream<Long> fromIterator = env.fromSequence(1, 10);fromElements.print("Elements");fromCollection.print("Collection");fromIterator.print("Iterator");env.execute("Collection Source Example");}
}
1.2 基于 Socket 的数据源
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocketSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 Socket 创建流DataStream<String> socketStream = env.socketTextStream("localhost", 9999);socketStream.print("Socket Data");env.execute("Socket Source Example");}
}
1.3 自定义数据源
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class CustomSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 自定义数据源DataStream<String> customSource = env.addSource(new RandomWordSource());customSource.print("Custom Source");env.execute("Custom Source Example");}public static class RandomWordSource implements SourceFunction<String> {private volatile boolean isRunning = true;private final String[] words = {"Apache", "Flink", "Stream", "Processing", "Java", "BigData"};private final Random random = new Random();@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {// 随机选择一个单词String word = words[random.nextInt(words.length)];ctx.collect(word);// 每秒发送一个单词Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}}
}
2. 基本转换操作
2.1 Map 操作
Map 操作对流中的每个元素应用一个函数,将其转换为另一个元素。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromElements("hello", "world", "flink", "streaming");// 使用匿名内部类DataStream<String> upperCase = text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}});// 使用 Lambda 表达式DataStream<Integer> lengths = text.map(String::length);// 使用自定义 MapFunctionDataStream<String> prefixed = text.map(new AddPrefixMapFunction());upperCase.print("Upper Case");lengths.print("Lengths");prefixed.print("Prefixed");env.execute("Map Example");}public static class AddPrefixMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {return "Prefix_" + value;}}
}
2.2 FlatMap 操作
FlatMap 操作对流中的每个元素应用一个函数,可以产生零个、一个或多个元素。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> sentences = env.fromElements("Hello World Flink","Stream Processing","Apache Flink is powerful");// 使用匿名内部类分割句子为单词DataStream<String> words = sentences.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String sentence, Collector<String> out) throws Exception {for (String word : sentence.split(" ")) {if (!word.isEmpty()) {out.collect(word.toLowerCase());}}}});// 使用 Lambda 表达式DataStream<String> filteredWords = sentences.flatMap((String sentence, Collector<String> out) -> {for (String word : sentence.split(" ")) {if (!word.isEmpty() && word.length() > 3) {out.collect(word);}}}).returns(String.class);words.print("All Words");filteredWords.print("Filtered Words");env.execute("FlatMap Example");}
}
2.3 Filter 操作
Filter 操作根据指定条件过滤流中的元素。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 过滤偶数DataStream<Integer> evenNumbers = numbers.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}});// 过滤大于5的数DataStream<Integer> greaterThanFive = numbers.filter(value -> value > 5);// 复杂过滤条件DataStream<Integer> complexFilter = numbers.filter(new NumberFilter());evenNumbers.print("Even Numbers");greaterThanFive.print("Greater Than Five");complexFilter.print("Complex Filter");env.execute("Filter Example");}public static class NumberFilter implements FilterFunction<Integer> {@Overridepublic boolean filter(Integer value) throws Exception {// 过滤既是奇数又大于3的数return value % 2 != 0 && value > 3;}}
}
3. 聚合操作
3.1 KeyBy 操作
KeyBy 操作根据指定的键对流进行分区,使得具有相同键的元素被发送到同一个算子实例。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建用户行为数据流DataStream<Tuple2<String, Integer>> userActions = env.fromElements(new Tuple2<>("user1", 10),new Tuple2<>("user2", 20),new Tuple2<>("user1", 15),new Tuple2<>("user3", 30),new Tuple2<>("user2", 25));// 根据用户ID进行分组KeyedStream<Tuple2<String, Integer>, String> keyedStream = userActions.keyBy(tuple -> tuple.f0);// 对每个用户的动作值求和DataStream<Tuple2<String, Integer>> userSums = keyedStream.sum(1);userActions.print("Original");userSums.print("Sum by User");env.execute("KeyBy Example");}
}
3.2 Reduce 操作
Reduce 操作对分组后的数据进行聚合计算。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建销售数据流DataStream<Tuple3<String, String, Integer>> sales = env.fromElements(new Tuple3<>("store1", "productA", 100),new Tuple3<>("store2", "productB", 200),new Tuple3<>("store1", "productA", 150),new Tuple3<>("store1", "productB", 80),new Tuple3<>("store2", "productA", 120));// 按商店和产品分组,计算总销售额DataStream<Tuple3<String, String, Integer>> totalSales = sales.keyBy(tuple -> Tuple2.of(tuple.f0, tuple.f1)) // 按商店和产品分组.reduce((tuple1, tuple2) -> new Tuple3<>(tuple1.f0, // 商店tuple1.f1, // 产品tuple1.f2 + tuple2.f2 // 销售额累加));sales.print("Original Sales");totalSales.print("Total Sales");env.execute("Reduce Example");}
}
3.3 Aggregations 聚合操作
Flink 提供了多种预定义的聚合操作。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class AggregationsExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建温度数据流DataStream<Tuple2<String, Double>> temperatures = env.fromElements(new Tuple2<>("sensor1", 23.5),new Tuple2<>("sensor2", 25.2),new Tuple2<>("sensor1", 24.1),new Tuple2<>("sensor3", 22.8),new Tuple2<>("sensor2", 26.0),new Tuple2<>("sensor1", 23.9));// 按传感器分组DataStream<Tuple2<String, Double>> keyedTemperatures = temperatures.keyBy(tuple -> tuple.f0);// 计算每个传感器的最高温度DataStream<Tuple2<String, Double>> maxTemperatures = keyedTemperatures.max(1);// 计算每个传感器的最低温度DataStream<Tuple2<String, Double>> minTemperatures = keyedTemperatures.min(1);// 计算每个传感器的温度总和DataStream<Tuple2<String, Double>> sumTemperatures = keyedTemperatures.sum(1);// 计算每个传感器的平均温度(使用 minBy/maxBy)DataStream<Tuple2<String, Double>> minByTemperatures = keyedTemperatures.minBy(1);DataStream<Tuple2<String, Double>> maxByTemperatures = keyedTemperatures.maxBy(1);temperatures.print("Original Temperatures");maxTemperatures.print("Max Temperatures");minTemperatures.print("Min Temperatures");sumTemperatures.print("Sum Temperatures");minByTemperatures.print("Min By Temperatures");maxByTemperatures.print("Max By Temperatures");env.execute("Aggregations Example");}
}
4. 连接操作
4.1 Connect 操作
Connect 操作可以连接两个类型不同的数据流。
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;public class ConnectExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建两个不同类型的数据流DataStream<String> controlStream = env.fromElements("START", "STOP", "RESET");DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 连接两个流ConnectedStreams<String, Integer> connectedStreams = controlStream.connect(dataStream);// 使用 CoMapFunction 处理连接的流DataStream<String> result = connectedStreams.map(new ControlFunction());result.print("Connected Stream Result");env.execute("Connect Example");}public static class ControlFunction implements CoMapFunction<String, Integer, String> {private boolean enabled = false;private int processedCount = 0;@Overridepublic String map1(String controlCommand) throws Exception {switch (controlCommand) {case "START":enabled = true;return "Control: START command received";case "STOP":enabled = false;return "Control: STOP command received";case "RESET":processedCount = 0;return "Control: RESET command received";default:return "Control: Unknown command " + controlCommand;}}@Overridepublic String map2(Integer data) throws Exception {if (enabled) {processedCount++;return "Data: " + data + " (processed count: " + processedCount + ")";} else {return "Data: " + data + " (ignored - processing disabled)";}}}
}
4.2 Union 操作
Union 操作可以合并多个类型相同的数据流。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建多个数据流DataStream<String> stream1 = env.fromElements("A", "B", "C");DataStream<String> stream2 = env.fromElements("D", "E", "F");DataStream<String> stream3 = env.fromElements("G", "H", "I");// 合并多个流DataStream<String> unionStream = stream1.union(stream2, stream3);unionStream.print("Union Stream");env.execute("Union Example");}
}
5. 分区操作
5.1 Shuffle 操作
Shuffle 操作随机重新分配数据流中的元素。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ShuffleExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 随机重新分区DataStream<Integer> shuffled = numbers.shuffle();numbers.print("Original");shuffled.print("Shuffled");env.execute("Shuffle Example");}
}
5.2 Rebalance 操作
Rebalance 操作在并行度发生变化时均匀地重新分配数据。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class RebalanceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 重新平衡数据DataStream<Integer> rebalanced = numbers.rebalance();numbers.print("Original");rebalanced.print("Rebalanced");env.execute("Rebalance Example");}
}
6. 数据分流操作
6.1 Split 和 Select 操作(已废弃)
注意:Split 和 Select 操作在 Flink 1.17 中已被废弃,推荐使用 Side Output。
6.2 Side Output 操作
Side Output 允许在主数据流之外输出额外的数据流。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 定义侧输出标签final OutputTag<Integer> evenNumbersTag = new OutputTag<Integer>("even-numbers"){};final OutputTag<Integer> oddNumbersTag = new OutputTag<Integer>("odd-numbers"){};// 使用 ProcessFunction 进行分流SingleOutputStreamOperator<Integer> mainStream = numbers.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {// 输出到偶数侧输出流ctx.output(evenNumbersTag, value);} else {// 输出到奇数侧输出流ctx.output(oddNumbersTag, value);}// 主流不输出任何数据}});// 获取侧输出流DataStream<Integer> evenNumbers = mainStream.getSideOutput(evenNumbersTag);DataStream<Integer> oddNumbers = mainStream.getSideOutput(oddNumbersTag);evenNumbers.print("Even Numbers");oddNumbers.print("Odd Numbers");env.execute("Side Output Example");}
}
详细介绍了 DataStream API 的核心操作,包括数据源操作、基本转换操作、聚合操作、连接操作、分区操作和数据分流操作。这些操作构成了 Flink 流处理的基础,掌握它们对于开发 Flink 应用程序至关重要。
