flink超时未揽收单量统计
应用场景: 双十一大屏统计 - - 订单超时汇总
项目指标概况:
应用背景:晚点超时指标,例如:出库超6小时未揽收订单量
难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标恰好是要求在指定的超时时刻计算出有多少“未到达”的消息,可以预警出订单积压等异常现象
方案1:flink往db里面高TPS写,产品前端高RPS查询OLAP数据库明细,大促数据洪峰场景因查询暴增会使得数据库压力打满,明细查询的方式势必不能支持日后大促暴涨的单量
方案2:metaQ定时消息,订单消息写入metaQ,利用metaQ的定时消息功能,根据用户写入的消息和时间,在指定时刻下发,flink接收两个数据源(kafka订单流,kafka出库流,metaQ延时消息流)判断订单是否超时揽收,这种方式除了需要维护flink程序,同时还要保障额外的消息中间层维护
方案3:flinkcep,使用起来确实比较简便,但是实际在统计上和真实结果有一定出入,原因是出库时间会被回传多次,开始回传的是9点,后面发现回传错了,又改成了8点,而cep的watermark是全局向前走的,对于这种场景,无法很好的适配
方案4:flink的processfunction,是一个low-level流处理操作,通过改写其中的Processelement方法,可以告诉flinkstate里面存什么,以及如何更新state。通过改写ontimer方法,可以告诉state何时下发超时消息
具体操作:
1.首先,根据业务主键物流订单code将消息做keyby处理,不同主键值的消息分流到不同的partition里面,生成keyedstream,因为在后续processfuntion中操作的state是valuestate类型的,即每一个key值对应一个state,更新是以key粒度(一个物流订单)进行的
2.每一条消息在processfuntion中处理时,为每个key的消息计算出timeoutmemont,并将该时刻注册到timeservice的定时器中,同时存储该消息至state,当同一个key值有多条消息到来时,可根据消息状态对state进行更新
3.当机器时间来到timeoutmemont时,timeserivcr中的定时器会自动回调ontimer函数,我们事先已经在ontimer函数中定义好操作:获取state,并判断标志位进行下发
如此一来,便做到了:制造出超时消息,并将其暂存在flink state中
该方案优势:
1.部署,运维成本比较低,不需要引入额外的消息中间件;
2.性能优良:
source rps:avg 3k/s,max:4.5k/s
sink rps:avg 1.5k/s,max:2.4k/s
延迟:avg:2.5/s,max:3.7s
3.通用化,复用性高,可以复用到各类业务场景,只需要修改下配置(超时时间)
1. 超时检测逻辑(改写ProcessFunction)
public class TimeoutDetector extends KeyedProcessFunction<String, Row, Tuple2<String, Long>> {private ValueState<Long> createTimeState;private ValueState<Boolean> scanFlagState;@Overridepublic void open(Configuration parameters) {// 初始化出库时间状态createTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("createTime", Long.class));// 初始化揽收标记状态scanFlagState = getRuntimeContext().getState(new ValueStateDescriptor<>("scanFlag", Boolean.class));}@Overridepublic void processElement(Row row, Context ctx, Collector<Tuple2<String, Long>> out) {String eventType = row.getFieldAs("event_type");Long eventTs = ((Timestamp)row.getFieldAs("ts")).getTime();if ("create".equals(eventType)) {// 记录出库时间并注册定时器createTimeState.update(eventTs);ctx.timerService().registerEventTimeTimer(eventTs + 6 * 3600 * 1000);} else if ("scan".equals(eventType)) {// 标记已揽收scanFlagState.update(true);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {if (scanFlagState.value() == null || !scanFlagState.value()) {// 输出超时订单ID与出库时间out.collect(Tuple2.of(ctx.getCurrentKey(), createTimeState.value()));}// 清理状态createTimeState.clear();scanFlagState.clear();}
}
在flinksql中调用udf
CREATE TEMPORARY FUNCTION CheckTimeout AS 'com.example.TimeoutDetector';SELECT order_id,ship_time,CheckTimeout(order_id, ship_time) AS is_timeout
FROM (SELECT order_id,event_time AS ship_timeFROM order_eventsWHERE event_type = 'SHIP'
)
WHERE is_timeout = true;
doris sink
INSERT INTO doris_sink_table
SELECT TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start,COUNT(order_id) AS timeout_count
FROM timeout_orders
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
方案3
flinkcep处理:
-- 步骤1:定义数据源表(Kafka输入)
CREATE TABLE order_events (order_id STRING,event_type STRING, -- 'outbound'=出库, 'collection'=揽收event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 允许5秒乱序
) WITH ('connector' = 'kafka','topic' = 'order_events','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- 步骤2:使用 MATCH_RECOGNIZE 进行超时模式匹配
CREATE TABLE timeout_orders (order_id STRING,outbound_time TIMESTAMP(3),timeout_reason STRING
) WITH ('connector' = 'kafka','topic' = 'timeout_alerts','format' = 'json'
);INSERT INTO timeout_orders
SELECT order_id, outbound_time, '未在6小时内揽收' AS timeout_reason
FROM order_events
MATCH_RECOGNIZE (PARTITION BY order_idORDER BY event_timeMEASURESA.event_time AS outbound_time,LAST(B.event_time) AS collection_timeONE ROW PER MATCHAFTER MATCH SKIP TO LAST BPATTERN (A NOT? B*) WITHIN INTERVAL '6' HOUR -- 超时窗口定义DEFINEA AS event_type = 'outbound',B AS event_type = 'collection' AND B.event_time <= A.event_time + INTERVAL '6' HOUR
)
WHERE collection_time IS NULL; -- 未匹配到揽收事件
关键设计解析
-
时间语义处理
WATERMARK
定义处理5秒内的乱序事件,与物流场景常见的网络延迟匹配WITHIN INTERVAL '6' HOUR
精确控制超时窗口,符合出库后6小时未揽收的业务规则
-
模式匹配逻辑
PATTERN (A NOT? B*)
表示匹配出库事件后未出现揽收事件(NOT
操作符)DEFINE B
中增加时间约束,确保揽收事件在出库后6小时内发生
-
结果处理优化
ONE ROW PER MATCH
减少重复告警,每个订单仅触发一次超时事件AFTER MATCH SKIP TO LAST B
跳过已处理事件,提升处理效率
优化方式:
1.rocksdb statebackend RocksDB 将状态存储在磁盘而非内存,适合 TB 级状态;增量检查点仅保存差异数据,减少 IO 压力。
2.大字段/无关字段去除
3.statettl=13h(12h+1h缓冲)自动清理已完成窗口的过期状态,避免状态无限膨胀
4.检查点间隔与异步快照,增大checkpoint时间间隔
SET 'execution.checkpointing.interval' = '10min'; -- 增大间隔降低IO压力
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.unaligned' = 'true'; -- 启用非对齐检查点
三、技术优化策略
-
状态管理优化
- 启用RocksDB状态后端:
state.backend: rocksdb
- 设置TTL自动清理过期订单状态:
table.exec.state.ttl = 2h
- 启用RocksDB状态后端:
-
性能调优
- 调整并行度:
SET 'parallelism.default' = 8;
- 启用MiniBatch聚合:
table.exec.mini-batch.enabled = true
- 调整并行度:
-
容错机制