当前位置: 首页 > wzjs >正文

中山市区做网站公司网站点击软件排名

中山市区做网站公司,网站点击软件排名,玉环县企业网站建设,如何在网站插入地图应用场景: 双十一大屏统计 - - 订单超时汇总 项目指标概况: 应用背景:晚点超时指标,例如:出库超6小时未揽收订单量 难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标…

应用场景: 双十一大屏统计 - - 订单超时汇总

项目指标概况:

应用背景:晚点超时指标,例如:出库超6小时未揽收订单量

难点:flink消息触发式计算,没有消息到达则无法计算,而这类指标恰好是要求在指定的超时时刻计算出有多少“未到达”的消息,可以预警出订单积压等异常现象

方案1:flink往db里面高TPS写,产品前端高RPS查询OLAP数据库明细,大促数据洪峰场景因查询暴增会使得数据库压力打满,明细查询的方式势必不能支持日后大促暴涨的单量

方案2:metaQ定时消息,订单消息写入metaQ,利用metaQ的定时消息功能,根据用户写入的消息和时间,在指定时刻下发,flink接收两个数据源(kafka订单流,kafka出库流,metaQ延时消息流)判断订单是否超时揽收,这种方式除了需要维护flink程序,同时还要保障额外的消息中间层维护

方案3:flinkcep,使用起来确实比较简便,但是实际在统计上和真实结果有一定出入,原因是出库时间会被回传多次,开始回传的是9点,后面发现回传错了,又改成了8点,而cep的watermark是全局向前走的,对于这种场景,无法很好的适配

方案4:flink的processfunction,是一个low-level流处理操作,通过改写其中的Processelement方法,可以告诉flinkstate里面存什么,以及如何更新state。通过改写ontimer方法,可以告诉state何时下发超时消息

具体操作:

1.首先,根据业务主键物流订单code将消息做keyby处理,不同主键值的消息分流到不同的partition里面,生成keyedstream,因为在后续processfuntion中操作的state是valuestate类型的,即每一个key值对应一个state,更新是以key粒度(一个物流订单)进行的

2.每一条消息在processfuntion中处理时,为每个key的消息计算出timeoutmemont,并将该时刻注册到timeservice的定时器中,同时存储该消息至state,当同一个key值有多条消息到来时,可根据消息状态对state进行更新

3.当机器时间来到timeoutmemont时,timeserivcr中的定时器会自动回调ontimer函数,我们事先已经在ontimer函数中定义好操作:获取state,并判断标志位进行下发

如此一来,便做到了:制造出超时消息,并将其暂存在flink state中

该方案优势:

1.部署,运维成本比较低,不需要引入额外的消息中间件;

2.性能优良:

source rps:avg 3k/s,max:4.5k/s

sink rps:avg 1.5k/s,max:2.4k/s

延迟:avg:2.5/s,max:3.7s

3.通用化,复用性高,可以复用到各类业务场景,只需要修改下配置(超时时间)

1. 超时检测逻辑(改写ProcessFunction)

public class TimeoutDetector extends KeyedProcessFunction<String, Row, Tuple2<String, Long>> {private ValueState<Long> createTimeState;private ValueState<Boolean> scanFlagState;@Overridepublic void open(Configuration parameters) {// 初始化出库时间状态createTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("createTime", Long.class));// 初始化揽收标记状态scanFlagState = getRuntimeContext().getState(new ValueStateDescriptor<>("scanFlag", Boolean.class));}@Overridepublic void processElement(Row row, Context ctx, Collector<Tuple2<String, Long>> out) {String eventType = row.getFieldAs("event_type");Long eventTs = ((Timestamp)row.getFieldAs("ts")).getTime();if ("create".equals(eventType)) {// 记录出库时间并注册定时器createTimeState.update(eventTs);ctx.timerService().registerEventTimeTimer(eventTs + 6 * 3600 * 1000);} else if ("scan".equals(eventType)) {// 标记已揽收scanFlagState.update(true);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {if (scanFlagState.value() == null || !scanFlagState.value()) {// 输出超时订单ID与出库时间out.collect(Tuple2.of(ctx.getCurrentKey(), createTimeState.value()));}// 清理状态createTimeState.clear();scanFlagState.clear();}
}

在flinksql中调用udf

CREATE TEMPORARY FUNCTION CheckTimeout AS 'com.example.TimeoutDetector';SELECT order_id,ship_time,CheckTimeout(order_id, ship_time) AS is_timeout
FROM (SELECT order_id,event_time AS ship_timeFROM order_eventsWHERE event_type = 'SHIP'
) 
WHERE is_timeout = true;

doris sink 

INSERT INTO doris_sink_table
SELECT TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start,COUNT(order_id) AS timeout_count
FROM timeout_orders
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

方案3

flinkcep处理:

-- 步骤1:定义数据源表(Kafka输入)
CREATE TABLE order_events (order_id         STRING,event_type       STRING,   -- 'outbound'=出库, 'collection'=揽收event_time       TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 允许5秒乱序
) WITH ('connector' = 'kafka','topic'     = 'order_events','scan.startup.mode' = 'latest-offset','format'    = 'json'
);-- 步骤2:使用 MATCH_RECOGNIZE 进行超时模式匹配
CREATE TABLE timeout_orders (order_id         STRING,outbound_time    TIMESTAMP(3),timeout_reason   STRING
) WITH ('connector' = 'kafka','topic'     = 'timeout_alerts','format'    = 'json'
);INSERT INTO timeout_orders
SELECT order_id, outbound_time, '未在6小时内揽收' AS timeout_reason
FROM order_events
MATCH_RECOGNIZE (PARTITION BY order_idORDER BY event_timeMEASURESA.event_time AS outbound_time,LAST(B.event_time) AS collection_timeONE ROW PER MATCHAFTER MATCH SKIP TO LAST BPATTERN (A NOT? B*) WITHIN INTERVAL '6' HOUR  -- 超时窗口定义DEFINEA AS event_type = 'outbound',B AS event_type = 'collection' AND B.event_time <= A.event_time + INTERVAL '6' HOUR
)
WHERE collection_time IS NULL;  -- 未匹配到揽收事件

关键设计解析

  1. 时间语义处理

    • WATERMARK 定义处理5秒内的乱序事件,与物流场景常见的网络延迟匹配 
    • WITHIN INTERVAL '6' HOUR 精确控制超时窗口,符合出库后6小时未揽收的业务规则 
  2. 模式匹配逻辑

    • PATTERN (A NOT? B*) 表示匹配出库事件后未出现揽收事件(NOT操作符)
    • DEFINE B 中增加时间约束,确保揽收事件在出库后6小时内发生
  3. 结果处理优化

    • ONE ROW PER MATCH 减少重复告警,每个订单仅触发一次超时事件
    • AFTER MATCH SKIP TO LAST B 跳过已处理事件,提升处理效率 

优化方式:

1.rocksdb statebackend RocksDB 将状态存储在磁盘而非内存,适合 TB 级状态;增量检查点仅保存差异数据,减少 IO 压力。

2.大字段/无关字段去除

3.statettl=13h(12h+1h缓冲)自动清理已完成窗口的过期状态,避免状态无限膨胀

4.检查点间隔与异步快照,增大checkpoint时间间隔

SET 'execution.checkpointing.interval' = '10min';  -- 增大间隔降低IO压力
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.unaligned' = 'true';  -- 启用非对齐检查点

三、技术优化策略

  1. 状态管理优化

    • 启用RocksDB状态后端:state.backend: rocksdb
    • 设置TTL自动清理过期订单状态:table.exec.state.ttl = 2h
  2. 性能调优

    • 调整并行度:SET 'parallelism.default' = 8;
    • 启用MiniBatch聚合:table.exec.mini-batch.enabled = true
  3. 容错机制


 

http://www.dtcms.com/wzjs/310726.html

相关文章:

  • 怎么做自助购物网站专业网站seo推广
  • 建设网站都需要准备什么苏州百度推广服务中心
  • 烟台公司网站开发国外网站建设
  • 网站开发相关参考文献资料优化seo是什么
  • 可以做视频网站的源码关键词百度云
  • wordpress get term嘉兴seo外包公司费用
  • 金安合肥网站建设专业郑州怎么优化网站排名靠前
  • 毕业论文可不可以写网页设计的seo技术博客
  • 网络空间租用价格seo营销名词解释
  • 云南建设厅网站设计促销方案
  • 如何做企业网站php关键词seo资源
  • 网站程序seo自学教程seo免费教程
  • 酒店网站建设功能个人网页生成器
  • 建筑行业网站有哪些百度官网下载安装免费
  • 深圳网站建设怎样容易seo的全称是什么
  • 网站编程设计方向百度快照在哪里找
  • 网站可以微信支付是怎么做的网络营销型网站
  • 重庆网站推广系统网络优化软件
  • 品牌画册设计公司网址东莞网站优化
  • 国外网站空间放置成人内容站长工具seo综合查询权重
  • 个人优秀网站无锡网站制作推广
  • 有哪些做的好的营销型网站网络推广培训去哪里好
  • 合肥行业网站建设国内手机搜索引擎十大排行
  • WordPress微博客主题奶糖 seo 博客
  • 网站建设规划书的空间seo软文代写
  • 购物网站 app企业文化的重要性和意义
  • 专业的网站开发建设关键词在线挖掘网站
  • 推荐黄石网站建设上海排名优化seobwyseo
  • 书画院网站建设方案奶茶的营销推广软文
  • 金山区网站制作百度竞价登陆