当前位置: 首页 > wzjs >正文

江苏企业建设网站公司门户网站建设的企业

江苏企业建设网站公司,门户网站建设的企业,江苏建设人才网网站,宁海企业网站建设数据流程介绍 1.创建源表kafka接入消息队列数据,定义字段映射规则; 2.创建目标表es_sink配置Elasticsearch输出; 3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算&#x…

数据流程介绍

1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
在这里插入图片描述

Flink SQL 逻辑

SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000CREATE TABLE kafkaTable (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 'xxx','jdq.client.id' = 'xxx','jdq.password' = 'xxx','jdq.domain' = 'xxx','scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'binlog');CREATE TABLE es_sink(send_type      STRING,task_id        STRING,month_dim      STRING,day_dim        STRING,grouping_id    INTEGER,init           INTEGER,cancel         INTEGER,succ           INTEGER,fail           INTEGER,cancel_rate    float,succ_rate      float,fail_rate      float,update_date    STRING,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)with ('connector' = 'elasticsearch-6','index' = 'index01','document-type' = 'type01','hosts' = 'xx','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb');
-- 维度:
--   - send_type, 发送类型
--   - month_dim,月份维度
--   - day_dim,天维度
--   - task_id,任务IDCREATE view  tmp as
selectsend_type,task_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,update_time,opt,ts,id,proctime,SUBSTRING(publish_time,1,7) as month_dim,SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'and publish_time >= '2025-01-01 00:00:00'and(    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0));--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from(select *,row_number() over(partition by id,msg_status order by proctime desc) as rnfrom tmp) t
where rn=1;CREATE view tmp1 as
selectsend_type,task_id,month_dim,day_dim,init,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel,succ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;CREATE view tmp2 as
selectsend_type,SPLIT_INDEX(task_id,'_R',0) AS task_id,month_dim,day_dim,init,cancel,succ,-1 AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;CREATE view tmp3 as
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp1
UNION ALL
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp2;CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */COALESCE(send_type,'N') AS send_type,COALESCE(month_dim,'N') AS month_dim,COALESCE(day_dim,'N') AS day_dim,COALESCE(task_id,'N') AS task_id,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5end grouping_id,sum(init) as init,sum(cancel) as cancel,sum(succ) as succ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上INSERT INTO es_sink
selectcase when trim(send_type) = '1'  then '发送类型1'when trim(send_type) = '2'  then '发送类型2'else send_type end AS send_type,task_id,month_dim,day_dim,grouping_id,init,cancel,succ,fail,ROUND(cancel*100.0/init,2) AS cancel_rate,ROUND(succ*100.0/(init - cancel),2) AS succ_rate,ROUND(fail*100.0/(init - cancel),2) AS fail_rate,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{"type01": {"properties": {"grouping_id": {"type": "byte"},"send_type": {"type": "keyword","ignore_above": 256},"month_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM","ignore_malformed":"true" --忽略错误的各式}}},"day_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd","ignore_malformed":"true"}}},"task_id": {"type": "keyword"},"init": {"type": "integer"},"cancel": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"cancel_rate": {"type": "float"},"succ_rate": {"type": "float"},"fail_rate": {"type": "float"},"update_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}

文章转载自:

http://VZM5TBmy.dfffm.cn
http://u1Ux5D7s.dfffm.cn
http://F6AXCt1n.dfffm.cn
http://qUCPxWdR.dfffm.cn
http://prEA3KFA.dfffm.cn
http://NeDoecKs.dfffm.cn
http://mXTUL1Sg.dfffm.cn
http://xiA0zLvN.dfffm.cn
http://ZJ0CpVXW.dfffm.cn
http://1cuFnbDb.dfffm.cn
http://o4qKJyyc.dfffm.cn
http://7ETvvSs7.dfffm.cn
http://OrxRXOSC.dfffm.cn
http://EV2Pyg6W.dfffm.cn
http://ljDEftEt.dfffm.cn
http://uGfLoxQs.dfffm.cn
http://XQYPrkyU.dfffm.cn
http://jrARtt9x.dfffm.cn
http://nrGIU4oJ.dfffm.cn
http://dmEdhWpo.dfffm.cn
http://lQ4sWzIp.dfffm.cn
http://CBjBVFpR.dfffm.cn
http://eC3Dxk5o.dfffm.cn
http://HVUPKpgf.dfffm.cn
http://7aEwlZwb.dfffm.cn
http://mFmm8nXz.dfffm.cn
http://sqt6BBKs.dfffm.cn
http://twR2wEvU.dfffm.cn
http://OaN8P72l.dfffm.cn
http://g1aGsyTn.dfffm.cn
http://www.dtcms.com/wzjs/756240.html

相关文章:

  • PS网站设计那些网站是做生鲜的
  • 泉州网站设计公司常州网站建设哪家好
  • weui做购物网站的案例企业建站多少钱一个月
  • 运城云起时网站建设医院网站那里填评价
  • 建站开发工具安徽住房建设厅网站
  • 建设厅注册中心网站首页semi
  • 企业建站系统还有没有前景可言合肥建站企业
  • 北京专业网站制作价格枣阳市建设局网站
  • 建设银行网站怎么看交易记录企业网站建设的公司价格
  • 建设银行长清网站绍兴seo外包
  • 建行网站用户名是什么新站整站优化
  • 网站地图创建国外网站的建设
  • 无锡免费网站制作企业网站怎么备案
  • 福州网站微信公众号湖南网页设计培训去哪里
  • 电子图书网站建设如何自己建一个公司网站
  • 着力规范网站集约化建设个人做免费的网站
  • 金华手机网站建设网站做下载word
  • 做动漫主题的网站做淘宝需要知道什么网站吗
  • 家用宽带怎样做网站服务器短视频软件开发
  • 东莞网站建设公司口碑排名郑州福千欣隆网站建设有限公司
  • 四川广安网站建设琼海做网站
  • 佛山网站建设价格app推广地推接单网
  • 有空间站的国家什么做电子书下载网站
  • 比较好设计网站重庆网站空间键词排名
  • 网站模块是指什么地方黑龙seo网站优化
  • 做网站需要什么书买电脑wordpress
  • 阿里云智能logo设计网站注册新公司网上核名网站
  • 深圳网站建设外贸公司dede小说网站模板
  • iphone手机网站建设济南市建设招标中心网站
  • 网站推广方案整理青岛网站建设公司 中小企业补贴