当前位置: 首页 > news >正文

Flink-SQL通过过滤-解析-去重-聚合计算写入到MySQL表

数据源来自于Kafka的Json结构数据,数据结构为源头不断更新的小时报表,Flink的任务是处理计算并将结果输出到MySQL中。代码如下:

-- Kafka源表:账户级报表
CREATE TEMPORARY TABLE kafka_account_hour_report (`data` STRING,`log_date` AS JSON_VALUE(`data`,'$.log_date'),`hour_id` AS JSON_VALUE(`data`,'$.hour_id'),`biz_code` AS JSON_VALUE(`data`,'$.bizCode'),`ad_pv` AS JSON_VALUE(`data`,'$.ad_pv'),`click` AS JSON_VALUE(`data`,'$.click'),`charge` AS JSON_VALUE(`data`,'$.charge'),`car_num` AS JSON_VALUE(`data`,'$.car_num'),`date` VARCHAR(20),`hour` VARCHAR(20),`brandId` VARCHAR(64),`accountId` VARCHAR(64),`isBatchEnd` INT,`offset` INT NOT NULL METADATA VIRTUAL,`my_part` BIGINT NOT NULL METADATA FROM 'partition',`my_time` TIMESTAMP(3) METADATA FROM 'timestamp',`my_date` AS CAST(`my_time` AS DATE)
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'kafka-sever1:9092,kafka-server2:9092,kafka-server3:9092','properties.group.id' = 'flink_group','topic' = 'account_hour_report','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- 结果表:品牌层级小时报表
CREATE TEMPORARY TABLE mysql_brand_report_hour (`brand_id` VARCHAR(32) COMMENT '品牌ID',`date_hour` INT COMMENT '日期时间(YYYYMMDDHH)',`platform_id` VARCHAR(32) COMMENT '平台ID',`cost` DECIMAL(20,4) COMMENT '花费',`show_num` BIGINT COMMENT '曝光量',`click_num` BIGINT COMMENT '点击量',PRIMARY KEY (`brand_id`,`date_hour`,`platform_id`) NOT ENFORCED
) WITH ('connector' = 'mysql','hostname' = 'host_name','port' = '3306','username' = 'mysql_user','password' = 'password','database-name' = 'db_name','table-name' = 'ads_brand_report_hour'
);-- 账户数据解析
CREATE TEMPORARY VIEW view_account_report_ori AS
SELECTTO_DATE(FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')) AS stat_date,LPAD(`hour_id`,2,'0') AS stat_hour,`brandId` AS brand_id,`accountId` AS account_id,`biz_code` AS biz_code,`isBatchEnd` AS batch_end,CAST(`charge` AS DECIMAL(20,5)) AS cost,CAST(`ad_pv` AS INT) AS show_num,CAST(`click` AS INT) AS click_num,CONCAT(SUBSTR(`date`,1,4),SUBSTR(`date`,6,2),SUBSTR(`date`,9,2)) AS batch_date,`hour` AS batch_hour,my_time
FROM kafka_account_hour_report
WHERE FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')>=DATE_FORMAT(TIMESTAMPADD(HOUR,-1,LOCALTIMESTAMP),'yyyy-MM-dd') AND `isBatchEnd`=0;-- 去重并汇总作为小时报中间表
CREATE TEMPORARY VIEW view_brand_report_stg AS
SELECTstat_date,brand_id,batch_date,batch_hour,IFNULL(SUM(show_num),0) AS show_num,IFNULL(SUM(click_num),0) AS click_num,IFNULL(SUM(cost),0) AS cost
FROM(SELECT *,ROW_NUMBER() OVER(PARTITION BY stat_date,brand_id,account_id,biz_code,batch_date,batch_hour ORDER BY my_time DESC) AS rnFROM view_account_report_ori t) t
WHERE rn=1
GROUP BY stat_date,brand_id,batch_date,batch_hour;-- 小时报结果
CREATE TEMPORARY VIEW view_brand_report_res AS
SELECTbrand_id,CAST(CONCAT(batch_date,batch_hour) AS INT) AS date_hour,'1003' AS platform_id,ROUND(cost,4) AS cost,show_num,click_num
FROM view_brand_report_stg;-- Sink 开始
BEGIN STATEMENT SET;-- 插入小时报 --
INSERT INTO mysql_brand_report_hour
SELECTbrand_id,date_hour,platform_id,cost,show_num,click_num
FROM view_brand_report_res;END;
-- Sink结束

以上程序实现了从Kafka源表(主题/Topic为account_hour_report)消费数据,然后进行过滤、解析、去重、聚合等计算,最后将结果写入到MySQL结果表ads_brand_report_hour中。

http://www.dtcms.com/a/398690.html

相关文章:

  • 公司网站建设记哪个科目网站建设对企业的要求
  • 汕头网页设计制作金华seo扣费
  • Vue电商数据分析大屏开发
  • 【开题答辩全过程】以 bilibili排行榜的数据分析与可视化为例,包含答辩的问题和答案
  • AI性能对决!蓝耘MaaS平台在2025大模型测评中如何脱颖而出
  • 新能源知识库(109)什么是频率死区?
  • Linux开发——开发板介绍及裸机程序设计
  • 百度网站推广关键词怎么查凡科微信小程序怎么样
  • 定制网站开发接活wordpress固定链接设置技巧
  • HTTP代理HTTP(S)、SOCKS5有哪些作用?
  • vue3+TS 前端调用海康摄像头视频流,后端用 Node.js 做 RTSP 转 WebSocket-FLV 转发,并且前后端优化延迟方案
  • 计算机视觉(opencv)练习——抠图(图像裁剪与轮廓提取)
  • 网站建设知识点的总结怎么做网站一个平台
  • 西安做网站的在网站后台设置wap模板目录
  • 软件行业|Parasoft与IAR的嵌入式DevOps测试集成
  • 设计模式-状态模式详解
  • 微信小程序通用弹窗组件封装与动画实现
  • 「日拱一码」099 数据处理——降维
  • 速通ACM省铜第十三天 赋源码(Watermelon)
  • 【C++进阶系列】:位图和布隆过滤器(附模拟实现的源码)
  • 洛阳网站建设建站系统怎么删除网站的死链
  • 山东省城乡建设厅网站wordpress academia
  • 广州番禺服装网站建设济南网站优化
  • 下载huggingface中数据集/模型
  • vue事件循环机制
  • 分布式专题——19 Zookeeper分布式一致性协议ZAB源码剖析
  • 前端核心框架vue之(组件篇2/5)
  • 【分布式】分布式事务方案:两阶段、TCC、SEATA
  • Kafka介绍
  • Netty 解码器 DelimiterBasedFrameDecoder