Flink 多流转换
Flink 的多流转换可以分为“分流”和“合流”两大类。目前分流的操作一般是通过侧输出流来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行合并操作。
一、分流
所谓“分流”就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream 得到完全平等的多个子 DataStream,如下图所使。一般会定义一些筛选条件,将符合条件的数据挑选出来,放到对应的流里。

1.1 使用 filter() 简单实现
根据条件筛选数据非常容易实现,只要针对同一条流多次独立调用 filter() 方法进行筛选,就可以得到拆分之后的流。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 筛选 Mary 的浏览行为放入 MaryStream 流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 筛选 Bob 的购买行为放入 BobStream 流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 筛选其他人的浏览行为放入 elseStream 流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob");}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}}这样实现非常简单,但是代码显得有些冗余,我们的处理逻辑对拆分出的三条流是一样的,却重复写了三次。而且这样实现,是将原始数据流 stream 复制了三分,然后对每一份分别做筛选。
1.2 使用侧输出流实现
process function 本身可以认为是一个转换算子,它的输出类型是单一的,处理之后得到的仍然是一个 DataStream。而侧输出流则不受限制,可以任意自定义输出数据,它就像从“主流”上分叉出的“支流”。尽管看起来和主流有所区别,但实际上都是某种类型的 DataStream,所以本质上是平等的。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SplitStreamByOutputTag {// 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = newOutputTag<Tuple3<String, String, Long>>("Mary-pv") {};private static OutputTag<Tuple3<String, String, Long>> BobTag = newOutputTag<Tuple3<String, String, Long>>("Bob-pv") {};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {if (value.user.equals("Mary")) {ctx.output(MaryTag, new Tuple3<>(value.user, value.url,value.timestamp));} else if (value.user.equals("Bob")) {ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}二、基本合流操作
2.1 联合(Union)
最简单的合流操作,就是直接将多条流合并在一起,叫作流的“联合”(union),如下图所示。union 操作要求流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,只要基于 DataStream 直接调用 union() 方法,传入其他 DataStream 作为参数,就可以实现流的联合了,得到的依然是一个 DataStream。
stream1.union(stream2, stream3, ...)这里需要考虑一个问题:在事件时间语义下,水位线是时间的进度标志,不同流中可能水位线的进展快慢不同,如果合并在一起,水位线要以最小的那个为准。
2.2 连接(Connect)
流的联合虽然简单,但是受限于数据类型不能改变,灵活性大打折扣。除了 union,Flink 还提供了另一种方便的合流操作——连接(connect)。顾名思义,这种操作就是直接把两条流像线一样对接起来。
2.2.1 连接流(ConnectedStreams)
为了处理更加灵活,连接操作允许流的数据类型不同。但是一个 DataStream 中的数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个 ConnectedStream。connectedStream 可以看成是两条流形式上的“统一”,而被放在了同一个流中。实际上内部仍各自保持各自的数据类型不变,彼此之间相互独立。想要得到新的 DataStream 还需要进一步定义一个 “co-process” 转换操作,用来说明对于不同来源、不同类型的数据,分别怎样进行转换处理、得到统一的输出类型。

在代码实现上,需要分为两步:
- 基于一条 DataStream 调用 connect() 方法,传入另外一条 DataStream 作为参数,将两条流连接起来,输出一个 ConnectedStream。
- ConnectedStream 调用 map() 、flatMap()、process() 方法做 co-process 处理。
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class CoMapExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<Long> stream2 = env.fromElements(1L, 2L, 3L);ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {@Overridepublic String map1(Integer value) {return "Integer: " + value;}@Overridepublic String map2(Long value) {return "Long: " + value;}});result.print();env.execute();}
}上面的代码中,ConnectedStream 有两个类型的参数,分别表示内部包含的两条流各自的数据类型,由于需要分别对两条流进行 map() 操作,因此 map() 方法中传入的不再是一个简单的 MapFunction,而是一个 CoMapFunction。这个接口有三个类型参数,依次表示第一条流、第二条流、合并后的流中数据类型。
值得一提的是,ConnectedStream 也可以直接调用 keyBy() 方法纪念性按键分区的操作,得到的还是一个 ConnectedStream:
connectedStreams.keyBy(keySelector1, keySelector2);这里传入的两个参数分别是两条流中各自的键选择器,也可以直接传入键的位置值,或者键的字段名。
2.2.2 CoProcessFunction
对于连接流 ConnectedStream 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,在两条流中的数据分别到来时分别调用。这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用 flatMap() 就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2() 两个方法;而调用 process() 时,传入的则是一个 CoProcessFunction。
下面是 CoProcessFunction 的一个具体实例:实现一个实时对账的需求,也就是 app 的支付操作和第三方的支付操作的一个双流 join。app 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不到对应的支付事件,那么就会输出报警信息。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;// 实时对账
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自 app 的支付日志SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream =env.fromElements(Tuple3.of("order-1", "app", 1000L),Tuple3.of("order-2", "app", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperator<Tuple4<String, String, String, Long>>thirdpartStream = env.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L),Tuple4.of("order-3", "third-party", "success", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String,String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配,不匹配就报警appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现 CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{// 定义状态变量,用来保存已经到达的事件private ValueState<Tuple3<String, String, Long>> appEventState;private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;@Overridepublic void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String,Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String,Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING,Types.STRING,Types.LONG)));}@Overridepublic void processElement1(Tuple3<String, String, Long> value, Context ctx,Collector<String> out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() != null){out.collect(" 对 账 成 功 : " + value + " " +thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个 5 秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);}}@Overridepublic void processElement2(Tuple4<String, String, String, Long> value,Context ctx, Collector<String> out) throws Exception {if (appEventState.value() != null){out.collect("对账成功:" + appEventState.value() + " " + value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个 5 秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String>out) throws Exception {// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来if (appEventState.value() != null) {out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到");}if (thirdPartyEventState.value() != null) {out.collect("对账失败:" + thirdPartyEventState.value() + " " + "app信息未到");}appEventState.clear();thirdPartyEventState.clear();}}}输出结果是:
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到
对账失败:(order-3,third-party,success,4000) app 信息未到在程序中,我们声明了两个状态变量分别用来保存 App 的支付信息和第三方的支付信息。App 的支付信息到达以后,会检查对应的第三方支付信息是否已经先到达(先到达会保存在对应的状态变量中),如果已经到达了,那么对账成功,直接输出对账成功的信息,并将保存第三方支付消息的状态变量清空。如果 App 对应的第三方支付信息没有到来,那么我们会注册一个 5 秒钟之后的定时器,也就是说等待第三方支付事件 5 秒钟。当定时器触发时,检查保存 app 支付信息的状态变量是否还在,如果还在,说明对应的第三方支付信息没有到来,所以输出报警信息。
2.2.3 广播连接流(BroadcastConnectedStream)
关于两条流的连接,还有一种比较特殊的用法:DataStream 调用 connect() 方法时,传入的参数也可以不是一个 DataStream,而是一个广播流(BroadcastStream),这时合并两条流得到的就变成了一个广播连接流(BroadcatConnectedStream)。
这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)。
广播状态底层是用一个 map 结构来保存的。在代码实现上,可以直接调用 DataStream 的 broadcast() 方法,传入一个 “映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到规则数据的广播流。
MapStateDescriptor<String, Rule> ruleStateDescriptor = new
MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);接下来就可以将要处理的数据流,与这条广播流进行 connect,得到的就是广播连接流。基于 BroadcastConnectedStream 调用 process() 方法,就可以同时获取规则和数据,进行动态处理了。这里既然调用了 process() 方法,当然传入的参数也应该是处理函数大家族中一员——如果对数据流调用过 keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction;如果没有按键分区,就传入 BroadcastProcessFunction。
DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );三、基于时间的合流——双流联结(join)
尽管 connect 能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了两种内置的 join 算子,以及 coGroup 算子。
3.1 窗口联结(Window Join)
基于时间的操作,最基本的就是时间窗口了。
3.1.1 Window Join 的调用
Window Join 在代码中的实现:
- 首先需要调用 DataStream 的 join() 方法来合并两条流,得到一个 JoinedStream;
- 再通过调用 where() 方法和 equalTo() 方法指定两条流中联结的 key;
- 然后通过 window() 方法开窗;
- 最后调用 apply() 方法传入一个 JoinFunction 处理计算。
stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)这里的 WindowAssigner 参数可以是滚动窗口、滑动窗口、会话窗口。
在 window() 和 apply() 之间也可以调用可选 API 去做一些自定义,比如用 trigger() 定义触发器,用 allowedLateness() 定义允许延迟时间,等等。
3.1.2 Window Join 的处理流程
- 两条流的数据到来后,首先会按照 key 分组,进入对应的窗口中存储;
- 当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积;
- 然后进行遍历,把每一对匹配的数据作为参数传入 JoinFunction 的 join() 方法进行计算处理;
- 窗口中每有一堆数据成功联结匹配,JoinFunction 的 join() 方法就会被调用一次,并且输出一个结果;

除了 JoinFunction,在.apply()方法中还可以传入 FlatJoinFunction,用法非常类似,只是内部需要实现的.join()方法没有返回值。结果的输出是通过收集器(Collector)来实现的,所以对于一对匹配数据可以输出任意条结果。
3.2 间隔联结(Interval Join)
在某些场景下,我们要处理的时间间隔可能不是固定的。比如在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出的数额相等,也就是实时对账。两次事件的时间戳应该相差不大,可以考虑只统计一段时间内是否有出账入账的数据匹配。这时如果用滚动窗口或者滑动窗口来处理,可能会出现两个数据刚好卡在窗口边缘两侧的情况,在窗口内就没有匹配。
这时就可以使用 Interval Join,顾名思义,Interval Join 的思路是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看着期间是否有来自另一条流的数据匹配。
3.2.1 间隔联结的原理
间隔联结具体的定义方式是,给定两个时间点,分别叫作间隔的上界和下界。于是对于一条流中的任意一个元素 a 都可以开辟一段时间间隔,即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的窗口范围。对于另一条流中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。这里需要注意,做间隔联结的两条流也必须基于相同的 key。

3.2.2 Interval Join 的调用
Interval Join 在代码中是基于 KeyedStream 的 Join 操作。DataStream 在 keyBy 得到 KeyedStream 之后,可以调用 intervalJoin() 来合并两条流,传入的参数同样是一个 KeyedStream,两者的 key 类型应该一致;然后通过 between() 方法指定间隔的上下界,再调用 process() 方法,定义对匹配数据对的处理操作。
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {out.collect(left + "," + right);}});3.2.3 Interval Join 实例
举一个例子,有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
// 基于间隔的 join
public class IntervalJoinExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream =env.fromElements(Tuple3.of("Mary", "order-1", 5000L),Tuple3.of("Alice", "order-2", 5000L),Tuple3.of("Bob", "order-3", 20000L),Tuple3.of("Alice", "order-4", 20000L),Tuple3.of("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));SingleOutputStreamOperator<Event> clickStream = env.fromElements(new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Alice", "./prod?id=200", 3500L),new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 36000L),new Event("Bob", "./home", 30000L),new Event("Bob", "./prod?id=1", 23000L),new Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>(){@Overridepublic long extractTimestamp(Event element, long recordTimestamp){return element.timestamp;}}));orderStream.keyBy(data -> data.f0).intervalJoin(clickStream.keyBy(data -> data.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction<Tuple3<String, String, Long>,Event, String>() {@Overridepublic void processElement(Tuple3<String, String, Long> left,Event right, Context ctx, Collector<String> out) throws Exception {out.collect(right + " => " + left);}}).print();env.execute();}}输出结果:
Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,order-2,5000)
Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.5} => (Alice,order-2,5000)
Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:30.0} => (Bob,order-3,20000)
Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:23.0} => (Bob,order-3,20000)3.3 窗口同组联结(Window CoGroup)
它的用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将 join() 换为 coGroup() 就可以了。
stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)它与 window join 的区别在于,调用 apply() 方法定义具体操作时,传入的是一个 coGroupFunction。内部的 coGroup() 方法,有些类似于 FlatJoinFunction 中 join() 的形式,同样有三个参数,分别代表两条流中的数据以及用于输出的收集器。不同的是,这里的前两个参数不再是单独的每一组配对数据了,而是传入了可遍历的数据集合。也就是说,现在不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义的。这样 coGroup() 方法只会被调用一次,而且即使一条流的数据没有任何另一条流的数据匹配,也可以出现在集合中、当然也可以定义输出结果了。所以 coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的内连接,也可以实现左外连接、右外连接和全外连接。事实上,窗口 join 的底层,也是通过 coGroup 来实现的。
下面是一段 coGroup 的示例代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;// 基于窗口的 join
public class CoGroupExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L),Tuple2.of("b", 1000L),Tuple2.of("a", 2000L),Tuple2.of("b", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String,Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L),Tuple2.of("b", 3000L),Tuple2.of("a", 4000L),Tuple2.of("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String,Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.coGroup(stream2).where(r -> r.f0).equalTo(r -> r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String,Long>, String>() {@Overridepublic void coGroup(Iterable<Tuple2<String, Long>> iter1,Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throwsException {collector.collect(iter1 + "=>" + iter2);}}).print();env.execute();}
}输出结果是:
[(a,1000), (a,2000)]=>[(a,3000), (a,4000)]
[(b,1000), (b,2000)]=>[(b,3000), (b,4000)]