当前位置: 首页 > wzjs >正文

怎样在谷歌上建设网站滕州网站建设优化

怎样在谷歌上建设网站,滕州网站建设优化,湖南网站开发,创新的南昌网站建设数据流程介绍 1.创建源表kafka接入消息队列数据,定义字段映射规则; 2.创建目标表es_sink配置Elasticsearch输出; 3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算&#x…

数据流程介绍

1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
在这里插入图片描述

Flink SQL 逻辑

SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000CREATE TABLE kafkaTable (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 'xxx','jdq.client.id' = 'xxx','jdq.password' = 'xxx','jdq.domain' = 'xxx','scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'binlog');CREATE TABLE es_sink(send_type      STRING,task_id        STRING,month_dim      STRING,day_dim        STRING,grouping_id    INTEGER,init           INTEGER,cancel         INTEGER,succ           INTEGER,fail           INTEGER,cancel_rate    float,succ_rate      float,fail_rate      float,update_date    STRING,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)with ('connector' = 'elasticsearch-6','index' = 'index01','document-type' = 'type01','hosts' = 'xx','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb');
-- 维度:
--   - send_type, 发送类型
--   - month_dim,月份维度
--   - day_dim,天维度
--   - task_id,任务IDCREATE view  tmp as
selectsend_type,task_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,update_time,opt,ts,id,proctime,SUBSTRING(publish_time,1,7) as month_dim,SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'and publish_time >= '2025-01-01 00:00:00'and(    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0));--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from(select *,row_number() over(partition by id,msg_status order by proctime desc) as rnfrom tmp) t
where rn=1;CREATE view tmp1 as
selectsend_type,task_id,month_dim,day_dim,init,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel,succ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;CREATE view tmp2 as
selectsend_type,SPLIT_INDEX(task_id,'_R',0) AS task_id,month_dim,day_dim,init,cancel,succ,-1 AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;CREATE view tmp3 as
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp1
UNION ALL
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp2;CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */COALESCE(send_type,'N') AS send_type,COALESCE(month_dim,'N') AS month_dim,COALESCE(day_dim,'N') AS day_dim,COALESCE(task_id,'N') AS task_id,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5end grouping_id,sum(init) as init,sum(cancel) as cancel,sum(succ) as succ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上INSERT INTO es_sink
selectcase when trim(send_type) = '1'  then '发送类型1'when trim(send_type) = '2'  then '发送类型2'else send_type end AS send_type,task_id,month_dim,day_dim,grouping_id,init,cancel,succ,fail,ROUND(cancel*100.0/init,2) AS cancel_rate,ROUND(succ*100.0/(init - cancel),2) AS succ_rate,ROUND(fail*100.0/(init - cancel),2) AS fail_rate,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{"type01": {"properties": {"grouping_id": {"type": "byte"},"send_type": {"type": "keyword","ignore_above": 256},"month_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM","ignore_malformed":"true" --忽略错误的各式}}},"day_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd","ignore_malformed":"true"}}},"task_id": {"type": "keyword"},"init": {"type": "integer"},"cancel": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"cancel_rate": {"type": "float"},"succ_rate": {"type": "float"},"fail_rate": {"type": "float"},"update_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}
http://www.dtcms.com/wzjs/288905.html

相关文章:

  • 贵州网站建设吧seo店铺描述
  • 天津住房城乡建设厅官方网站推广文章的推广渠道
  • 南宁3及分销网站制作腾讯云域名注册官网
  • 网站服务器容器打造龙头建设示范
  • 免费追剧网站大全镇江网站seo
  • 做淘宝代码的网站企业文化内容范本
  • 免费网站自助制作站长统计app软件下载官网
  • 本地最好的网站开发建设公司个人怎么注册自己的网站
  • 珠海网站建设制作哪家专业百度明令禁止搜索的词
  • 上海自助建站上海网站建设seo常见的优化技术
  • 2昌平区网站建设企业排名优化公司
  • 宁波外贸公司500强网站seo谷歌
  • 怎样局域网站建设免费网站建站2773
  • 东莞网网站公司简介徐汇网站建设
  • wordpress版 影视站seo前线
  • 长沙诚信做网站搜索引擎优化要考虑哪些方面?
  • 网站制作前必须做的事情有哪些如何申请域名
  • 怎么用node做动态网站志鸿优化网官网
  • 用哪个软件做网站企业网络营销目标
  • 个人网站建设济南seo外贸网站建设
  • app开发公司哪家比较好百度软件优化排名
  • 如何给公司做网站百度推广是什么
  • 购物网站建设 属于信息系统管理与设计么互联网推广话术
  • 电子购物网站设计日本进口yamawa
  • 南宁网站建设设计西安关键词优化服务
  • 网站建设与维护案列贵州百度seo整站优化
  • 网页网络游戏专业网站优化公司
  • 南通网络公司网站营销推广的方法有哪些
  • 云朵课堂网站开发怎么收费美国站外推广网站
  • 一级a做爰片免费网站瑜伽东莞搜索优化