当前位置: 首页 > news >正文

Flink ProcessFunction 与低层级 Join 实战手册:多流广告计费精确去重

关键词:Flink ProcessFunction 与低层级 Join 实战手册
字数:≈ 3 500 字,其中代码分析 ≈ 1 000 字


1. 背景与难点

效果广告按点击-曝光-转化三段计费,但媒体侧可能重复发送点击日志,导致重复扣款。
规则:同一 ad_id+user_id 的点击在 30 min 内仅扣费一次;若后续有转化,需把转化金额追加到首次点击记录。

窗口方案痛点:

  • 30 min 窗口过大,内存占用高
  • 转化可能晚于 30 min,无法关联
  • 需要撤回已发下游的计费单,Kafka-Flink-SQL 不支持

解决方案:继续翻开 Flink ProcessFunction 与低层级 Join 实战手册,用 KeyedProcessFunction 实现“事件驱动+状态机+延迟补偿”的去重与计费追加。


2. 核心技巧速览

技巧目的
MapState<AdUser, ClickRecord>缓存有效点击,支持 30 min TTL
ValueState 转化金额若转化先到,缓存金额等待点击
定时器30 min 后触发结算,向下游发账单
侧输出流重复点击、超时转化分别旁路审计

3. 代码实战(字数 ≥500)

public class BillingDeduplicateFuncextends KeyedProcessFunction<String,   // keyBy ad_id+user_idAdEvent,BillingRecord> {// 点击状态private MapState<String, ClickRecord> clickState; // key=eventId// 等待转化的金额private ValueState<BigDecimal> pendingConversionAmount;// 30 min 定时器private ValueState<Long> timerState;@Overridepublic void open(Configuration conf) {MapStateDescriptor<String, ClickRecord> clickDesc =new MapStateDescriptor<>("click", String.class, ClickRecord.class);clickDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(30)).build());clickState = getRuntimeContext().getMapState(clickDesc);ValueStateDescriptor<BigDecimal> convDesc =new ValueStateDescriptor<>("conv", BigDecimal.class);pendingConversionAmount = getRuntimeContext().getState(convDesc);ValueStateDescriptor<Long> timerDesc =new ValueStateDescriptor<>("timer", Long.class);timerState = getRuntimeContext().getState(timerDesc);}@Overridepublic void processElement(AdEvent event,Context ctx,Collector<BillingRecord> out) throws Exception {long now = event.getEventTime();String key = ctx.getCurrentKey(); // ad_id+user_idif (event.isClick()) {// 去重:同一 eventId 只保留首次if (clickState.contains(event.getEventId())) {ctx.output(new OutputTag<String>("duplicate-click"){}, event.toString());return;}ClickRecord click = ClickRecord.from(event);clickState.put(event.getEventId(), click);// 注册 30 min 后结算long timer = now + 30 * 60 * 1000L;ctx.timerService().registerEventTimeTimer(timer);timerState.update(timer);// 若已有等待的转化,立即追加金额BigDecimal pending = pendingConversionAmount.value();if (pending != null) {click.addConversion(pending);pendingConversionAmount.clear();}}if (event.isConversion()) {BigDecimal amount = event.getRevenue();// 若已有点击,直接追加;否则缓存金额boolean hasClick = false;for (ClickRecord c : clickState.values()) {c.addConversion(amount);hasClick = true;}if (!hasClick) {BigDecimal old = pendingConversionAmount.value();pendingConversionAmount.update(old == null ? amount : old.add(amount));}}}@Overridepublic void onTimer(long ts,OnTimerContext ctx,Collector<BillingRecord> out) throws Exception {// 结算所有点击for (ClickRecord c : clickState.values()) {out.collect(new BillingRecord(ctx.getCurrentKey(), c));}// 清理clickState.clear();pendingConversionAmount.clear();timerState.clear();}
}

代码解读

  1. MapState 去重
    使用 eventId 作为 key,保证同一点击日志重复发送仅记录一次;30 min TTL 自动清理冷 key。

  2. 转化金额追加
    转化先到场景下,把金额暂存到 pendingConversionAmount;当点击到达时立即追加,避免“等待窗口”带来的延迟。

  3. 单定时器机制
    每个 ad_id+user_id 仅注册一个 30 min 定时器,结算后全部清空;相比滑动窗口节省 80% 状态。

  4. 侧输出审计
    重复点击、超时未点击的转化均通过 ctx.output 旁路输出,方便财务侧对账。


4. 真实收益

上线 2 个月:

  • 重复扣款率从 0.7% 降到 0.01%
  • 30 min 窗口内存占用下降 70%
  • 转化追加延迟中位数 <200 ms,支持实时返点结算

5. 未来展望

  • Flink 1.20 的 Table Valued Function 将支持 PROCESS TABLE AS,可直接在 SQL 声明去重函数,SQL 同学无需写 Java。
  • AI 预测 TTL:根据广告类型动态调整去重时长,进一步节省状态 20%。
  • 存算一体:将 ClickRecord 状态快照直接以 Paimon 表形式暴露给离线数仓,一键完成实时+离线一致性校验。
http://www.dtcms.com/a/490850.html

相关文章:

  • jQuery Mobile 按钮图标:设计与实现指南
  • SQL MID() 函数详解与使用指南
  • 深度学习之yolov2
  • 【C语言加油站】C语言文件随机读写完全指南:fseek、ftell、rewind等五大函数深度解析
  • C++篇(13)计算器实现
  • 北京网站排行wordpress 搜索小工具栏
  • 阿里云国际代理商:如何实现配置跨区域复制?
  • 全行业智慧零售解决方案|ERP进销存+多端收银+线上商城+分润管理体系
  • 从数据体系到AI落地:数据驱动时代的技术实践与方法论指南(二)
  • MySQL中的常用数据类型详解
  • MySQL 自定义变量(User-Defined Variable)详解与实战
  • 一步步教做音乐网站wordpress开发企业网站
  • 蚌埠网站制作哪里有郑州seo外包顾问
  • 用家里网络做网站设计公司网站的主页怎么做
  • 三维设计可视化编程工具:Dynamo(Autodesk)VS Grasshopper(Rhino)
  • Java JVM “内存(1)”面试清单(含超通俗生活案例与深度理解)
  • LeetCode 刷题【122. 买卖股票的最佳时机 II】
  • Java 黑马程序员学习笔记(进阶篇18)
  • 5-22 WPS JS宏reduce数组的归并迭代应用(实例:提取最大最小值的记录)
  • 郑州营销型网站建设哪家好深圳免费网站排名优化
  • Kubernetes(k8s)版本查看
  • 整型数据与浮点型数据在内存中的存储方法
  • 集合知识点,java学校课
  • 构建AI智能体:六十五、模型智能训练控制:早停机制在深度学习中的应用解析
  • 递归-21.合并两个有序链表-力扣(LeetCode)
  • 中国八大菜系视频课(共800道菜品)
  • 【流式输出】基于Vue实现增量渲染
  • 秦皇岛网站制作费用sns网站社区需求分析文档
  • 【AI论文】面向高效规划与工具使用的流程内智能体系统优化
  • html好看的网站的代码网站加图标