Flink-SQL通过过滤-解析-去重-聚合计算写入到MySQL表
数据源来自于Kafka的Json结构数据,数据结构为源头不断更新的小时报表,Flink的任务是处理计算并将结果输出到MySQL中。代码如下:
-- Kafka源表:账户级报表
CREATE TEMPORARY TABLE kafka_account_hour_report (`data` STRING,`log_date` AS JSON_VALUE(`data`,'$.log_date'),`hour_id` AS JSON_VALUE(`data`,'$.hour_id'),`biz_code` AS JSON_VALUE(`data`,'$.bizCode'),`ad_pv` AS JSON_VALUE(`data`,'$.ad_pv'),`click` AS JSON_VALUE(`data`,'$.click'),`charge` AS JSON_VALUE(`data`,'$.charge'),`car_num` AS JSON_VALUE(`data`,'$.car_num'),`date` VARCHAR(20),`hour` VARCHAR(20),`brandId` VARCHAR(64),`accountId` VARCHAR(64),`isBatchEnd` INT,`offset` INT NOT NULL METADATA VIRTUAL,`my_part` BIGINT NOT NULL METADATA FROM 'partition',`my_time` TIMESTAMP(3) METADATA FROM 'timestamp',`my_date` AS CAST(`my_time` AS DATE)
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'kafka-sever1:9092,kafka-server2:9092,kafka-server3:9092','properties.group.id' = 'flink_group','topic' = 'account_hour_report','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- 结果表:品牌层级小时报表
CREATE TEMPORARY TABLE mysql_brand_report_hour (`brand_id` VARCHAR(32) COMMENT '品牌ID',`date_hour` INT COMMENT '日期时间(YYYYMMDDHH)',`platform_id` VARCHAR(32) COMMENT '平台ID',`cost` DECIMAL(20,4) COMMENT '花费',`show_num` BIGINT COMMENT '曝光量',`click_num` BIGINT COMMENT '点击量',PRIMARY KEY (`brand_id`,`date_hour`,`platform_id`) NOT ENFORCED
) WITH ('connector' = 'mysql','hostname' = 'host_name','port' = '3306','username' = 'mysql_user','password' = 'password','database-name' = 'db_name','table-name' = 'ads_brand_report_hour'
);-- 账户数据解析
CREATE TEMPORARY VIEW view_account_report_ori AS
SELECTTO_DATE(FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')) AS stat_date,LPAD(`hour_id`,2,'0') AS stat_hour,`brandId` AS brand_id,`accountId` AS account_id,`biz_code` AS biz_code,`isBatchEnd` AS batch_end,CAST(`charge` AS DECIMAL(20,5)) AS cost,CAST(`ad_pv` AS INT) AS show_num,CAST(`click` AS INT) AS click_num,CONCAT(SUBSTR(`date`,1,4),SUBSTR(`date`,6,2),SUBSTR(`date`,9,2)) AS batch_date,`hour` AS batch_hour,my_time
FROM kafka_account_hour_report
WHERE FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')>=DATE_FORMAT(TIMESTAMPADD(HOUR,-1,LOCALTIMESTAMP),'yyyy-MM-dd') AND `isBatchEnd`=0;-- 去重并汇总作为小时报中间表
CREATE TEMPORARY VIEW view_brand_report_stg AS
SELECTstat_date,brand_id,batch_date,batch_hour,IFNULL(SUM(show_num),0) AS show_num,IFNULL(SUM(click_num),0) AS click_num,IFNULL(SUM(cost),0) AS cost
FROM(SELECT *,ROW_NUMBER() OVER(PARTITION BY stat_date,brand_id,account_id,biz_code,batch_date,batch_hour ORDER BY my_time DESC) AS rnFROM view_account_report_ori t) t
WHERE rn=1
GROUP BY stat_date,brand_id,batch_date,batch_hour;-- 小时报结果
CREATE TEMPORARY VIEW view_brand_report_res AS
SELECTbrand_id,CAST(CONCAT(batch_date,batch_hour) AS INT) AS date_hour,'1003' AS platform_id,ROUND(cost,4) AS cost,show_num,click_num
FROM view_brand_report_stg;-- Sink 开始
BEGIN STATEMENT SET;-- 插入小时报 --
INSERT INTO mysql_brand_report_hour
SELECTbrand_id,date_hour,platform_id,cost,show_num,click_num
FROM view_brand_report_res;END;
-- Sink结束
以上程序实现了从Kafka源表(主题/Topic为account_hour_report)消费数据,然后进行过滤、解析、去重、聚合等计算,最后将结果写入到MySQL结果表ads_brand_report_hour中。