当前位置: 首页 > news >正文

flink 流式窗口

窗口操作

窗口操作是流处理中的核心概念,用于处理无界数据流中的有限数据集。Flink 提供了丰富的窗口操作来支持各种业务场景。

1. 窗口概念

窗口是将无界流划分为有限数据块的机制,每个窗口包含一定时间范围或数量范围内的数据元素。窗口操作使得我们可以在这些有限的数据集上进行计算。

1.1 窗口类型

Flink 支持多种窗口类型:

  1. 时间窗口(Time Windows):基于时间的窗口
  2. 计数窗口(Count Windows):基于元素数量的窗口
  3. 会话窗口(Session Windows):基于用户活动间隙的窗口

1.2 窗口生命周期

窗口的生命周期包括三个阶段:

  1. 创建:当第一个属于该窗口的元素到达时创建
  2. 添加元素:后续属于该窗口的元素被添加到窗口中
  3. 触发计算:当水印时间超过窗口结束时间时触发计算并输出结果

2. 时间窗口

2.1 滚动窗口(Tumbling Windows)

滚动窗口具有固定的大小,窗口之间不重叠。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class TumblingWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置水印策略WatermarkStrategy<Tuple2<String, Integer>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());// 创建销售数据流DataStream<Tuple2<String, Integer>> salesStream = env.fromElements(new Tuple2<>("productA", 100),new Tuple2<>("productB", 200),new Tuple2<>("productA", 150),new Tuple2<>("productC", 80),new Tuple2<>("productB", 120));// 每5秒计算一次滚动窗口的销售总额DataStream<Tuple2<String, Integer>> tumblingWindowResult = salesStream.keyBy(tuple -> tuple.f0) // 按产品分组.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口.sum(1); // 计算销售额总和// 使用聚合函数的滚动窗口DataStream<String> aggregatedResult = salesStream.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new SalesAggregateFunction());tumblingWindowResult.print("Tumbling Window Result");aggregatedResult.print("Aggregated Result");env.execute("Tumbling Window Example");}// 自定义聚合函数public static class SalesAggregateFunction implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<>("", 0);}@Overridepublic Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {return new Tuple2<>(value.f0, accumulator.f1 + value.f1);}@Overridepublic String getResult(Tuple2<String, Integer> accumulator) {return "Product: " + accumulator.f0 + ", Total Sales: " + accumulator.f1;}@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {return new Tuple2<>(a.f0, a.f1 + b.f1);}}
}

2.2 滑动窗口(Sliding Windows)

滑动窗口具有固定的大小和滑动间隔,窗口之间可能重叠。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class SlidingWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建股票价格数据流DataStream<Tuple2<String, Double>> stockPrices = env.fromElements(new Tuple2<>("AAPL", 150.0),new Tuple2<>("GOOGL", 2800.0),new Tuple2<>("AAPL", 151.5),new Tuple2<>("MSFT", 300.0),new Tuple2<>("GOOGL", 2805.0),new Tuple2<>("AAPL", 149.8));// 每10秒计算一次过去30秒的平均股价DataStream<Tuple2<String, Double>> slidingWindowResult = stockPrices.keyBy(tuple -> tuple.f0) // 按股票代码分组.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 30秒窗口,10秒滑动.aggregate(new AveragePriceAggregateFunction());slidingWindowResult.print("Sliding Window Average Price");env.execute("Sliding Window Example");}// 计算平均价格的聚合函数public static class AveragePriceAggregateFunction implements org.apache.flink.api.common.functions.AggregateFunction<Tuple2<String, Double>, Tuple3<String, Double, Integer>, Tuple2<String, Double>> {@Overridepublic Tuple3<String, Double, Integer> createAccumulator() {return new Tuple3<>("", 0.0, 0);}@Overridepublic Tuple3<String, Double, Integer> add(Tuple2<String, Double> value, Tuple3<String, Double, Integer> accumulator) {return new Tuple3<>(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1);}@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Double, Integer> accumulator) {double average = accumulator.f2 > 0 ? accumulator.f1 / accumulator.f2 : 0.0;return new Tuple2<>(accumulator.f0, average);}@Overridepublic Tuple3<String, Double, Integer> merge(Tuple3<String, Double, Integer> a, Tuple3<String, Double, Integer> b) {return new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2);}}// 用于存储累计值的辅助类public static class Tuple3<T0, T1, T2> {public T0 f0;public T1 f1;public T2 f2;public Tuple3(T0 f0, T1 f1, T2 f2) {this.f0 = f0;this.f1 = f1;this.f2 = f2;}}
}

2.3 会话窗口(Session Windows)

会话窗口将元素分组到不活动间隙不超过指定持续时间的会话中。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class SessionWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置事件时间水印策略WatermarkStrategy<Tuple3<String, String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((event, timestamp) -> event.f2);// 创建用户点击流数据(用户ID, 页面, 时间戳)DataStream<Tuple3<String, String, Long>> clickStream = env.fromElements(new Tuple3<>("user1", "home", 1000L),new Tuple3<>("user1", "product", 1500L),new Tuple3<>("user1", "cart", 2000L),new Tuple3<>("user2", "home", 1200L),new Tuple3<>("user1", "checkout", 8000L), // 6秒后,会话结束new Tuple3<>("user2", "product", 1800L)).assignTimestampsAndWatermarks(watermarkStrategy);// 5秒会话窗口:如果用户5秒内没有新活动,则结束会话DataStream<String> sessionResult = clickStream.keyBy(tuple -> tuple.f0) // 按用户分组.window(EventTimeSessionWindows.withGap(Time.seconds(5))) // 5秒会话间隙.aggregate(new SessionAggregateFunction());sessionResult.print("Session Window Result");env.execute("Session Window Example");}// 会话聚合函数:统计用户会话中的页面访问次数public static class SessionAggregateFunction implements org.apache.flink.api.common.functions.AggregateFunction<Tuple3<String, String, Long>, Tuple2<String, Integer>, String> {@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<>("", 0);}@Overridepublic Tuple2<String, Integer> add(Tuple3<String, String, Long> value, Tuple2<String, Integer> accumulator) {return new Tuple2<>(value.f0, accumulator.f1 + 1);}@Overridepublic String getResult(Tuple2<String, Integer> accumulator) {return "User: " + accumulator.f0 + ", Pages visited in session: " + accumulator.f1;}@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {return new Tuple2<>(a.f0, a.f1 + b.f1);}}
}

3. 计数窗口

3.1 滚动计数窗口

滚动计数窗口基于元素数量而不是时间来划分窗口。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TumblingCountWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建传感器数据流DataStream<Tuple2<String, Double>> sensorData = env.fromElements(new Tuple2<>("sensor1", 23.5),new Tuple2<>("sensor2", 25.2),new Tuple2<>("sensor1", 24.1),new Tuple2<>("sensor3", 22.8),new Tuple2<>("sensor2", 26.0),new Tuple2<>("sensor1", 23.9),new Tuple2<>("sensor3", 24.5),new Tuple2<>("sensor2", 25.8),new Tuple2<>("sensor1", 24.2),new Tuple2<>("sensor3", 23.7));// 每3个元素计算一次平均温度DataStream<Tuple2<String, Double>> countWindowResult = sensorData.keyBy(tuple -> tuple.f0) // 按传感器分组.countWindow(3) // 每3个元素一个窗口.aggregate(new AverageTemperatureAggregateFunction());countWindowResult.print("Count Window Average Temperature");env.execute("Tumbling Count Window Example");}// 计算平均温度的聚合函数public static class AverageTemperatureAggregateFunction implements org.apache.flink.api.common.functions.AggregateFunction<Tuple2<String, Double>, Tuple3<String, Double, Integer>, Tuple2<String, Double>> {@Overridepublic Tuple3<String, Double, Integer> createAccumulator() {return new Tuple3<>("", 0.0, 0);}@Overridepublic Tuple3<String, Double, Integer> add(Tuple2<String, Double> value, Tuple3<String, Double, Integer> accumulator) {return new Tuple3<>(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1);}@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Double, Integer> accumulator) {double average = accumulator.f2 > 0 ? accumulator.f1 / accumulator.f2 : 0.0;return new Tuple2<>(accumulator.f0, average);}@Overridepublic Tuple3<String, Double, Integer> merge(Tuple3<String, Double, Integer> a, Tuple3<String, Double, Integer> b) {return new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2);}}// 用于存储累计值的辅助类public static class Tuple3<T0, T1, T2> {public T0 f0;public T1 f1;public T2 f2;public Tuple3(T0 f0, T1 f1, T2 f2) {this.f0 = f0;this.f1 = f1;this.f2 = f2;}}
}

3.2 滑动计数窗口

滑动计数窗口基于元素数量和滑动步长来划分窗口。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SlidingCountWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建交易数据流DataStream<Tuple2<String, Double>> transactions = env.fromElements(new Tuple2<>("account1", 100.0),new Tuple2<>("account2", 200.0),new Tuple2<>("account1", 150.0),new Tuple2<>("account3", 80.0),new Tuple2<>("account2", 120.0),new Tuple2<>("account1", 90.0),new Tuple2<>("account3", 200.0),new Tuple2<>("account2", 180.0),new Tuple2<>("account1", 110.0),new Tuple2<>("account3", 150.0));// 每处理2个元素就计算最近4个元素的交易总额DataStream<Tuple2<String, Double>> slidingCountWindowResult = transactions.keyBy(tuple -> tuple.f0) // 按账户分组.countWindow(4, 2) // 窗口大小为4,滑动步长为2.sum(1); // 计算交易总额slidingCountWindowResult.print("Sliding Count Window Result");env.execute("Sliding Count Window Example");}
}

4. 全局窗口

全局窗口将所有具有相同键的元素分配到同一个窗口中,通常需要自定义触发器来控制何时计算窗口结果。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class GlobalWindowExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建订单数据流DataStream<Tuple2<String, Double>> orders = env.fromElements(new Tuple2<>("customer1", 100.0),new Tuple2<>("customer2", 200.0),new Tuple2<>("customer1", 150.0),new Tuple2<>("customer3", 80.0),new Tuple2<>("customer2", 120.0),new Tuple2<>("customer1", 90.0));// 全局窗口:当客户订单总额达到500时触发计算DataStream<String> globalWindowResult = orders.keyBy(tuple -> tuple.f0) // 按客户分组.window(GlobalWindows.create()) // 全局窗口.trigger(new OrderAmountTrigger()) // 自定义触发器.aggregate(new OrderAggregateFunction(), new OrderProcessWindowFunction());globalWindowResult.print("Global Window Result");env.execute("Global Window Example");}// 自定义触发器:当订单总额达到500时触发public static class OrderAmountTrigger extends Trigger<Tuple2<String, Double>, GlobalWindow> {private final double threshold = 500.0;@Overridepublic TriggerResult onElement(Tuple2<String, Double> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {// 获取当前累计金额状态ValueState<Double> sumState = ctx.getPartitionedState(new ValueStateDescriptor<>("sum", Double.class, 0.0));double currentSum = sumState.value() + element.f1;sumState.update(currentSum);// 如果累计金额达到阈值,则触发窗口计算if (currentSum >= threshold) {sumState.clear(); // 清除状态,为下一轮计算做准备return TriggerResult.FIRE_AND_PURGE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic void clear(GlobalWindow window, TriggerContext ctx) throws Exception {ctx.getPartitionedState(new ValueStateDescriptor<>("sum", Double.class, 0.0)).clear();}}// 订单聚合函数public static class OrderAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {@Overridepublic Tuple2<String, Double> createAccumulator() {return new Tuple2<>("", 0.0);}@Overridepublic Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {if (accumulator.f0.isEmpty()) {return new Tuple2<>(value.f0, accumulator.f1 + value.f1);}return new Tuple2<>(accumulator.f0, accumulator.f1 + value.f1);}@Overridepublic Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {return accumulator;}@Overridepublic Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) {return new Tuple2<>(a.f0, a.f1 + b.f1);}}// 窗口处理函数public static class OrderProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Double>, String, String, GlobalWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Double>> elements, Collector<String> out) throws Exception {Tuple2<String, Double> result = elements.iterator().next();out.collect("Customer " + result.f0 + " reached threshold with amount: " + result.f1);}}
}

5. 窗口函数

5.1 ReduceFunction

ReduceFunction 在窗口中增量地聚合元素。

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class ReduceFunctionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建销售数据流DataStream<Tuple2<String, Integer>> sales = env.fromElements(new Tuple2<>("store1", 100),new Tuple2<>("store2", 200),new Tuple2<>("store1", 150),new Tuple2<>("store3", 80),new Tuple2<>("store2", 120),new Tuple2<>("store1", 90));// 使用 ReduceFunction 计算每个商店的最高销售额DataStream<Tuple2<String, Integer>> maxSales = sales.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new MaxSalesReduceFunction());maxSales.print("Max Sales per Store");env.execute("Reduce Function Example");}// 计算最大销售额的 ReduceFunctionpublic static class MaxSalesReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {// 返回销售额较高的记录return value1.f1 > value2.f1 ? value1 : value2;}}
}

5.2 AggregateFunction

AggregateFunction 是 ReduceFunction 的通用版本,支持更复杂的聚合逻辑。

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class AggregateFunctionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建用户行为数据流(用户ID, 行为类型, 时间戳)DataStream<Tuple3<String, String, Long>> userActions = env.fromElements(new Tuple3<>("user1", "click", System.currentTimeMillis()),new Tuple3<>("user2", "view", System.currentTimeMillis()),new Tuple3<>("user1", "purchase", System.currentTimeMillis()),new Tuple3<>("user3", "click", System.currentTimeMillis()),new Tuple3<>("user2", "click", System.currentTimeMillis()),new Tuple3<>("user1", "view", System.currentTimeMillis()));// 使用 AggregateFunction 统计每个用户的活跃度(行为次数)DataStream<Tuple2<String, Integer>> userActivity = userActions.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).aggregate(new UserActivityAggregateFunction());userActivity.print("User Activity");env.execute("Aggregate Function Example");}// 统计用户活跃度的聚合函数public static class UserActivityAggregateFunction implements AggregateFunction<Tuple3<String, String, Long>, Integer, Tuple2<String, Integer>> {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(Tuple3<String, String, Long> value, Integer accumulator) {return accumulator + 1;}@Overridepublic Tuple2<String, Integer> getResult(Integer accumulator) {// 注意:这里简化处理,实际应用中需要保存用户IDreturn new Tuple2<>("user", accumulator);}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}
}

5.3 ProcessWindowFunction

ProcessWindowFunction 可以访问窗口的元数据,并且可以输出多个元素。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;public class ProcessWindowFunctionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建日志数据流DataStream<String> logs = env.fromElements("INFO: User login","ERROR: Database connection failed","INFO: User logout","WARN: High memory usage","ERROR: Invalid user input","INFO: Data processed");// 使用 ProcessWindowFunction 分析日志级别统计DataStream<String> logAnalysis = logs.map(log -> {String level = log.split(":")[0];return new Tuple2<>(level, 1);}).keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new LogAnalysisProcessWindowFunction());logAnalysis.print("Log Analysis");env.execute("Process Window Function Example");}// 日志分析处理函数public static class LogAnalysisProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {int count = 0;for (Tuple2<String, Integer> element : elements) {count += element.f1;}TimeWindow window = context.window();String startTime = dateFormat.format(new Date(window.getStart()));String endTime = dateFormat.format(new Date(window.getEnd()));out.collect(String.format("Level: %s, Count: %d, Window: %s - %s", key, count, startTime, endTime));}}
}

6. 窗口操作最佳实践

6.1 窗口大小和滑动间隔的选择

// 选择合适的窗口大小
// 对于实时性要求高的场景,选择较小的窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 对于需要平滑数据的场景,选择滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))// 对于用户行为分析,选择会话窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))

6.2 状态管理优化

// 使用增量聚合减少状态存储
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.aggregate(new EfficientAggregateFunction())// 及时清理过期状态
// 在 WatermarkStrategy 中设置合理的延迟时间
WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofMinutes(1))

6.3 处理延迟数据

// 设置允许延迟处理的时间
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(10))// 处理迟到数据的侧输出
final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};DataStream<Tuple2<String, Integer>> mainStream = source.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.hours(1))).allowedLateness(Time.minutes(10)).sideOutputLateData(lateOutputTag).sum(1);// 获取迟到数据
DataStream<Tuple2<String, Integer>> lateStream = mainStream.getSideOutput(lateOutputTag);

介绍了 Flink 的窗口操作,包括时间窗口、计数窗口、全局窗口以及各种窗口函数。窗口操作是流处理中的核心概念,掌握这些操作对于构建复杂的流处理应用至关重要。

http://www.dtcms.com/a/553831.html

相关文章:

  • 仿京东电商的网站开发智慧团建网站什么时候维护好
  • 23.NAT之Easy-IP
  • Godot游戏开发——C# (一)
  • PyTorch实战:从零搭建CV模型技术文章大纲
  • 高效的DDC可编程控制器风机水泵空调节能控制器公司
  • PIL (Python Imaging Library) 相关方法详解1
  • U-Net 家族全解析
  • 德语网站制作网页设计建网站
  • Windows Linux 子系统 (WSL) 上的 Ubuntu
  • ftp工具下载网站源码教程手机视频wordpress
  • LeetCode 二叉树 437. 路径总和 III
  • 【GoLang】【框架学习】【GORM】4. 使用 BeforeUpdate hook 操作时,出现反射报错
  • 有哪些好点的单页网站公司管理制度完整版
  • 庭田科技亮相成都复材盛会,以仿真技术赋能产业革新
  • 网站安全认证去哪做国内十大咨询公司排名
  • Maven高级-分模块设计与开发
  • markdown转为pdf导出
  • python - day 11
  • 污水处理厂三菱FX5U系列PLC通过Modbus TCP转CCLKIE工业智能网关和多个不同的仪表进行通讯案例
  • 东莞专业网站设计建站公司无锡网站推广优化费用
  • 海南网络公司网站建设wordpress 禁止收录
  • 哪里有学习做网站的html网站模板 免费
  • 网站中的表格phonegap下载
  • JAVA攻防-专题漏洞SPEL表达式SSTI模版Swagger接口Actuator泄露Spring特检
  • vue-day03
  • 高效稳定的命理测算平台:基于Linux+Nginx+PHP+MySQL的技术架构解析
  • 威海做网站哪家好西数网站助手
  • 企业商旅平台推荐:合思——全流程合规管控与生态协同标杆
  • 专业电商网站开发自己做图网站
  • 【flutter报错:Build failed due to use of deprecated Android v1 embedding.】