当前位置: 首页 > news >正文

flink超时未揽收单量统计

应用场景: 双十一大屏统计 - - 订单超时汇总

项目指标概况:

应用背景:晚点超时指标,例如:出库超6小时未揽收订单量

难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标恰好是要求在指定的超时时刻计算出有多少“未到达”的消息,可以预警出订单积压等异常现象

方案1:flink往db里面高TPS写,产品前端高RPS查询OLAP数据库明细,大促数据洪峰场景因查询暴增会使得数据库压力打满,明细查询的方式势必不能支持日后大促暴涨的单量

方案2:metaQ定时消息,订单消息写入metaQ,利用metaQ的定时消息功能,根据用户写入的消息和时间,在指定时刻下发,flink接收两个数据源(kafka订单流,kafka出库流,metaQ延时消息流)判断订单是否超时揽收,这种方式除了需要维护flink程序,同时还要保障额外的消息中间层维护

方案3:flinkcep,使用起来确实比较简便,但是实际在统计上和真实结果有一定出入,原因是出库时间会被回传多次,开始回传的是9点,后面发现回传错了,又改成了8点,而cep的watermark是全局向前走的,对于这种场景,无法很好的适配

方案4:flink的processfunction,是一个low-level流处理操作,通过改写其中的Processelement方法,可以告诉flinkstate里面存什么,以及如何更新state。通过改写ontimer方法,可以告诉state何时下发超时消息

具体操作:

1.首先,根据业务主键物流订单code将消息做keyby处理,不同主键值的消息分流到不同的partition里面,生成keyedstream,因为在后续processfuntion中操作的state是valuestate类型的,即每一个key值对应一个state,更新是以key粒度(一个物流订单)进行的

2.每一条消息在processfuntion中处理时,为每个key的消息计算出timeoutmemont,并将该时刻注册到timeservice的定时器中,同时存储该消息至state,当同一个key值有多条消息到来时,可根据消息状态对state进行更新

3.当机器时间来到timeoutmemont时,timeserivcr中的定时器会自动回调ontimer函数,我们事先已经在ontimer函数中定义好操作:获取state,并判断标志位进行下发

如此一来,便做到了:制造出超时消息,并将其暂存在flink state中

该方案优势:

1.部署,运维成本比较低,不需要引入额外的消息中间件;

2.性能优良:

source rps:avg 3k/s,max:4.5k/s

sink rps:avg 1.5k/s,max:2.4k/s

延迟:avg:2.5/s,max:3.7s

3.通用化,复用性高,可以复用到各类业务场景,只需要修改下配置(超时时间)

1. 超时检测逻辑(改写ProcessFunction)

public class TimeoutDetector extends KeyedProcessFunction<String, Row, Tuple2<String, Long>> {private ValueState<Long> createTimeState;private ValueState<Boolean> scanFlagState;@Overridepublic void open(Configuration parameters) {// 初始化出库时间状态createTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("createTime", Long.class));// 初始化揽收标记状态scanFlagState = getRuntimeContext().getState(new ValueStateDescriptor<>("scanFlag", Boolean.class));}@Overridepublic void processElement(Row row, Context ctx, Collector<Tuple2<String, Long>> out) {String eventType = row.getFieldAs("event_type");Long eventTs = ((Timestamp)row.getFieldAs("ts")).getTime();if ("create".equals(eventType)) {// 记录出库时间并注册定时器createTimeState.update(eventTs);ctx.timerService().registerEventTimeTimer(eventTs + 6 * 3600 * 1000);} else if ("scan".equals(eventType)) {// 标记已揽收scanFlagState.update(true);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {if (scanFlagState.value() == null || !scanFlagState.value()) {// 输出超时订单ID与出库时间out.collect(Tuple2.of(ctx.getCurrentKey(), createTimeState.value()));}// 清理状态createTimeState.clear();scanFlagState.clear();}
}

在flinksql中调用udf

CREATE TEMPORARY FUNCTION CheckTimeout AS 'com.example.TimeoutDetector';SELECT order_id,ship_time,CheckTimeout(order_id, ship_time) AS is_timeout
FROM (SELECT order_id,event_time AS ship_timeFROM order_eventsWHERE event_type = 'SHIP'
) 
WHERE is_timeout = true;

doris sink 

INSERT INTO doris_sink_table
SELECT TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start,COUNT(order_id) AS timeout_count
FROM timeout_orders
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

方案3

flinkcep处理:

-- 步骤1:定义数据源表(Kafka输入)
CREATE TABLE order_events (order_id         STRING,event_type       STRING,   -- 'outbound'=出库, 'collection'=揽收event_time       TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 允许5秒乱序
) WITH ('connector' = 'kafka','topic'     = 'order_events','scan.startup.mode' = 'latest-offset','format'    = 'json'
);-- 步骤2:使用 MATCH_RECOGNIZE 进行超时模式匹配
CREATE TABLE timeout_orders (order_id         STRING,outbound_time    TIMESTAMP(3),timeout_reason   STRING
) WITH ('connector' = 'kafka','topic'     = 'timeout_alerts','format'    = 'json'
);INSERT INTO timeout_orders
SELECT order_id, outbound_time, '未在6小时内揽收' AS timeout_reason
FROM order_events
MATCH_RECOGNIZE (PARTITION BY order_idORDER BY event_timeMEASURESA.event_time AS outbound_time,LAST(B.event_time) AS collection_timeONE ROW PER MATCHAFTER MATCH SKIP TO LAST BPATTERN (A NOT? B*) WITHIN INTERVAL '6' HOUR  -- 超时窗口定义DEFINEA AS event_type = 'outbound',B AS event_type = 'collection' AND B.event_time <= A.event_time + INTERVAL '6' HOUR
)
WHERE collection_time IS NULL;  -- 未匹配到揽收事件

关键设计解析

  1. 时间语义处理

    • WATERMARK 定义处理5秒内的乱序事件,与物流场景常见的网络延迟匹配 
    • WITHIN INTERVAL '6' HOUR 精确控制超时窗口,符合出库后6小时未揽收的业务规则 
  2. 模式匹配逻辑

    • PATTERN (A NOT? B*) 表示匹配出库事件后未出现揽收事件(NOT操作符)
    • DEFINE B 中增加时间约束,确保揽收事件在出库后6小时内发生
  3. 结果处理优化

    • ONE ROW PER MATCH 减少重复告警,每个订单仅触发一次超时事件
    • AFTER MATCH SKIP TO LAST B 跳过已处理事件,提升处理效率 

优化方式:

1.rocksdb statebackend RocksDB 将状态存储在磁盘而非内存,适合 TB 级状态;增量检查点仅保存差异数据,减少 IO 压力。

2.大字段/无关字段去除

3.statettl=13h(12h+1h缓冲)自动清理已完成窗口的过期状态,避免状态无限膨胀

4.检查点间隔与异步快照,增大checkpoint时间间隔

SET 'execution.checkpointing.interval' = '10min';  -- 增大间隔降低IO压力
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.unaligned' = 'true';  -- 启用非对齐检查点

三、技术优化策略

  1. 状态管理优化

    • 启用RocksDB状态后端:state.backend: rocksdb
    • 设置TTL自动清理过期订单状态:table.exec.state.ttl = 2h
  2. 性能调优

    • 调整并行度:SET 'parallelism.default' = 8;
    • 启用MiniBatch聚合:table.exec.mini-batch.enabled = true
  3. 容错机制


 

相关文章:

  • 华为首款鸿蒙电脑正式亮相,开启国产操作系统新篇章
  • 多线程初阶(2)
  • 长难句。。
  • Kafka消息队列之 【消费者分组】 详解
  • maven 安装 本地 jar
  • 紫禁城多语言海外投资理财返利源码带前端uniapp纯工程文件
  • 带你玩转 Flink TumblingWindow:从理论到代码的深度探索
  • DMC-1410/1411/1417USER MANUAL 手侧
  • 视频编解码学习8之视频历史
  • 艾体宝方案丨深度解析生成式 AI 安全风险,Lepide 为数据安全护航
  • 垃圾回收的三色标记算法
  • Petalinux开发Linux
  • 最新CDGP单选题(第四章)补充
  • fastjson2 json.tojsonstring 会自动忽略过滤掉 key: null的数据
  • Linux Shell编程之条件语句
  • SGLang 实战介绍 (张量并行 / Qwen3 30B MoE 架构部署)
  • 红黑树详解初版
  • 公链钱包开发:技术逻辑与产品设计实践
  • Asp.Net Core IIS发布后PUT、DELETE请求错误405
  • 飞云分仓操盘副图指标操作技术图文分解
  • 明明睡够了,怎么还有黑眼圈?可能是身体在求救
  • 美政府被曝下令加强对格陵兰岛间谍活动,丹麦将召见美代办
  • 美联储如期按兵不动,强调“失业率和通胀上升的风险均已上升”(声明全文)
  • 外交部:印巴都表示不希望局势升级,望双方都能保持冷静克制
  • 山大齐鲁医院通报“子宫肌瘤论文现男性患者”:存在学术不端
  • 单阶段遭遇零封偶像奥沙利文,赵心童要让丁俊晖预言成真