如何正确理解flink 消费kafka时的watermark
案例1
在source 层面用全量数据watermark 对后面的窗口计算是否有影响?
KafkaSource<Event> source = KafkaSource.<Event>builder().setWatermarkStrategy(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(e -> e.eventTime)) // 全量WM:order+click污染!.build();tream.filter(e -> e.type.equals("order")) .window(...).sum(); tream.filter(e -> e.type.equals("click")) .window(...).sum();
分析
10s(order) → 12s(click) → 15s(order) → 18s(click) → 20s(order)
全是数据生成的watermark是20-5=15
order 数据生成的watermark是 20-5=15
click 数据生成的watermark是 18-5=13
假设窗口大小是10s,现在又来了一条数据25s(order)
这里会导致窗口生成的watermark是25-5=20,刚好可以触发窗口计算了,可实际上可click这个数据流还有部分数据未到达,比如19(click) 就会漏算。所以这样生成watermark会有问题
案例2
上面的问题可以通过filter 精确分流,然后再生成watermark
public class WatermarkMasterTemplate {public static void main(String[] args) {// 1. Source:无WM(零污染)KafkaSource<Event> source = KafkaSource.builder().build();// 2. 业务分流 + 独立WM(最精确)stream.filter(e -> e.type.equals("order")).assignTimestampsAndWatermarks(preciseWM("order")).window(...).print("ORDER");stream.filter(e -> e.type.equals("click")) .assignTimestampsAndWatermarks(preciseWM("click")).window(...).print("CLICK");}static WatermarkStrategy<Event> preciseWM(String type) {return WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(e -> e.eventTime);}
}
分析
10s(order) → 12s(click) → 15s(order) → 18s(click) → 20s(order)
order 数据生成的watermark是 20-5=15
click 数据生成的watermark是 18-5=13
这样不同流生成的watemark 生成的流是精确的
案例3
如果先加一个rebance 操作,是否会影响per-partition watermark 语义
public class WatermarkMasterTemplate {public static void main(String[] args) {// 1. Source:无WM(零污染)KafkaSource<Event> source = KafkaSource.builder().build();source.stream().rebalance().assignTimestampsAndWatermarks(preciseWM("order")).window(...).sum().print("ORDER");}static WatermarkStrategy<Event> preciseWM(String type) {return WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(e -> e.eventTime);}
}
分析
分区
partition a 10s(order) → 11s(click) → 14s(order) → 16s(click) → 18s(order)
partition b 12s(order) → 13s(click) → 15s(order) → 17s(click) → 19s(order)
但是如果是这样rebalance的话,会打乱单分区的watermark 的递增性,导致watermark 生成不精确