大数据离线数仓之业务域设计
一、概要说明
业务域的数据来自业务系统的数据库
表模型非常多且关系复杂
一般来说,这些表可以分为如下类别
- 维度表
对一个事物(实体)进行属性描述的表
比如,商品信息表
produce_id,name,price,stock_num,cat3_id,...会员信息表
member_id,account,nick_name,birthday,star_zuo,gender,age,integration,level_id运营位信息
id,name,image_url,ad_campain,...优惠券信息表
coupon_id,coupon_name,amount,valid_start,valid_end,member_level_id
- 事实表
对一件发生过的事情(事实)进行描述的表
比如,订单表
order_id,member_id,timestamp,amount,discount,coupon_dikou,hongbao_dikou,receive_addr,receive_phone.订单商品详情表
order_id,product_id,price,cnt,coupon_id,amount,...购物车表
member_id,product_id,number,price,create_time,update_time,submit_time,cancel_time,.......优惠券领取、使用记录表
member_id,create_time,coupon_id,number,timeout_dt,use_time,left_number,..........
- 字典表
一个“码”,对应一个名称
id,color_name
1,菁蓝
2,土黄
3,金黄
4,天蓝
二、Sqoop数据抽取
1. Sqoop工具介绍
sqoop 是 apache 旗下一款Hadoop中的各种存储系统(HDFS、HIVE、HBASE) 和关系数据库(mysql、oracle、sqlserver等)服务器之间传送数据”的工具。
核心的功能有两个:
导入(迁入)、导出(迁出)
导入数据:MySQL,Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统
导出数据:从 Hadoop 的文件系统中导出数据到关系数据库 mysql 等 Sqoop 的本质还是一个命令行工具,和 HDFS,Hive 相比,并没有什么高深的理论。
底层工作机制
将导入或导出命令翻译成 MapReduce 程序来实现
在翻译出的 MapReduce 中主要是对InputFormat 和 OutputFormat 进行定制
2. 数据抽取策略
维度表
维度表小表(品类信息表,活动信息表,优惠券信息表等),每天抽取过来一份全量(或者一周、一月);
维度表大表(商品信息表),每天抽取过来一份增量数据事实表
订单相关表
优惠券领取使用记录表
秒杀订阅记录表事实表每天都会抽取一份增量数据
三、ODS层
本层,表模型结构与业务库中的表模型结构保持一致
只是,本层表的数据,主要是通过sqoop增量导入后的每日增量数据;
所以,本层表是分区增量表;
主要表模型
- 商品信息(主要信息、详情信息、类目信息、属性信息、商品相册信息)
- 用户信息(主要信息、附加信息、会员等级信息)
- 订单信息及购物车相关(主要信息、详情信息、物流信息、评论信息)
- 内容管理(话题,文章,评论)
- 营销管理(优惠券、代金券、活动规则、主题推荐)
订单表增量抽取实战
- mysql建表
create table t_order(id int primary key auto_increment,amt decimal(10,2),`status` int default 0,user_id int,create_time timestamp DEFAULT CURRENT_TIMESTAMP,modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)-- 如果status没有值,则使用默认值0
`status` int default 0,
-- CURRENT_TIMESTAMP作为关键字可以获取系统时间
create_time timestamp DEFAULT CURRENT_TIMESTAMP
-- 当前行数据修改,则这一列自动修改为系统时间
modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
- 添加测试数据
insert into t_order values(null,100,0,1001,null,null);
insert into t_order values(null,99,0,1002,null,null);
a) 会覆盖原有的表数据
sqoop import \
--driver com.mysql.jdbc.Driver \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--table t_order \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "," \
--delete-target-dir \
--hive-overwrite \
--hive-database ods \
--hive-table t_order
b) 带有分区增量数据导入
1.添加测试数据
insert into t_order values(null,100,0,1001,null,null);
insert into t_order values(null,99,0,1002,null,null);mysql查询结果
5 100 0 1001 2022-12-01 16:24:10 2022-12-01 16:24:10
6 99 0 1002 2022-12-01 16:24:10 2022-12-01 16:24:102.执行sqoop导入脚本
sqoop import \
-Dmapred.job.queue.name=default \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--target-dir /user/root/t_order \
--delete-target-dir \
--hive-import \
--hive-database ods \
--hive-table t_order \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--hive-overwrite \
--split-by id \
--query 'select id,amt,status,user_id,create_time,modify_time from t_order where modify_time >= "1970-01-01 00:00:00" AND $CONDITIONS' \
--hive-partition-key dt \
--hive-partition-value '2022-12-01' \
-m 13.添加和修改数据 (并且假设现在已经是12-02了)
insert into t_order values(null,220,0,1001,'2022-12-02 10:10:10','2022-12-02 10:10:10');
update t_order set `status` = 2 , modify_time = '2022-12-02 11:00:00' where id = 5mysql查询结果如下
5 100 2 1001 2022-12-01 16:24:10 2022-12-02 11:00:00
6 99 0 1002 2022-12-01 16:24:10 2022-12-01 16:24:10
7 220 0 1001 2022-12-02 10:10:10 2022-12-02 10:10:104.执行sqoop导入脚本
-- 期望结果:将添加和修改的数据以增量的方式导入到hive的表的新分区下
sqoop import \
-Dmapred.job.queue.name=default \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--target-dir /user/root/t_order \
--delete-target-dir \
--hive-import \
--hive-database ods \
--hive-table t_order \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--hive-overwrite \
--split-by id \
--query 'select id,amt,status,user_id,create_time,modify_time from t_order where modify_time >= "2022-12-02 00:00:00" AND $CONDITIONS' \
--hive-partition-key dt \
--hive-partition-value '2022-12-02' \
-m 1
–split-by id 、$CONDITIONS、-m 1
-m 指定maptask的个数 ,如果指定-m为2,底层两个maptask则会处理不同的数据
如果表中有5条数据,哪些数据给第一个maptask,哪些数据给第二个maptask呢?
–split-by id 让底层根据id这一列进行拆分 , 例如拆分结果 1<= id <= 2 2 < id <= 5
CONDITIONS上述拆分数据形成的条件会替换CONDITIONS 上述拆分数据形成的条件会替换CONDITIONS上述拆分数据形成的条件会替换CONDITIONS
四、DWD层设计开发
本层主要表类型:
-
存储各业务表的
全量快照
-
存储各业务表的
拉链表
在我们项目中,本层存储的业务表[拉链表]
主要就是ods中的各种增量导入数据进行全量合并后生成的全量快照表或者拉链表;
快照表和拉链表,都是分区全量表!
只不过,快照表需要保存每一天的分区,才能查询到每一天的该表的数据状态;
而拉链表,则只要保留最后一天的分区即可;
问题:在ODS是增量导入的情况的下,如何在DWD层获取完整数据?
增量合并成全量快照
为了便于后续的统计分析方便,用增量抽取策略抽取过来的增量数据,都要每天进行滚动合并
a). 在DWD层创建订单全量快照表
create table dwd.t_order_full(id int ,amt double ,status int ,user_id int ,create_time string ,modify_time string
)partitioned by(dt string)b).添加数据
注意我们的逻辑:
1.先添加ods.t_order(dt=2022-12-02)的数据
2.然后添加dwd.t_order_full(dt='2022-12-02')的数据
3.然后添加ods.t_order(dt=2022-12-03)的数据
4.然后添加dwd.t_order_full(dt='2022-12-03')的数据添加dwd.t_order_full(dt='2022-12-03')的数据是根据ods.t_order(dt=2022-12-03)和dwd.t_order_full(dt='2022-12-02')进行full join而来insert overwrite table dwd.t_order_full
partition(dt='2022-12-02')
select if(t1.id is not null,t1.id,t2.id) id ,if(t1.id is not null,t1.amt,t2.amt) amt,if(t1.id is not null,t1.`status`,t2.`status`) `status`,if(t1.id is not null,t1.user_id,t2.user_id) user_id,if(t1.id is not null,t1.create_time,t2.create_time) create_time,if(t1.id is not null,t1.modify_time,t2.modify_time) modify_time
from
(select * from ods.t_order where dt = '2022-12-02'
)t1 full join
(select * from dwd.t_order_full where dt = '2022-12-01'
)t2 on t1.id = t2.id
拉链表概念及实现逻辑
https://blog.csdn.net/weixin_42913992/article/details/122845034
a) 创建一张订单拉链表
create table dwd.t_order_zip(id int ,amt double ,status int ,user_id int ,create_time string ,modify_time string ,start_dt string ,end_dt string
)partitioned by(dt string)b) 向拉链表添加数据insert overwrite table dwd.t_order_zip
partition(dt='2022-12-02')
select t1.id,t1.amt,t1.`status`,t1.user_id,t1.create_time,t1.modify_time,t1.start_dt,case when t1.end_dt = '9999-12-31' and t2.id is not null then '2022-12-01'else t1.end_dt end end_dt
from (select * from dwd.t_order_zip where dt = '2022-12-01'
)t1 left join (select * from ods.t_order where dt = '2022-12-02'
)t2 on t1.id = t2.id
union all
select id,amt,`status`,user_id,create_time,modify_time, '2022-12-02' start_dt,'9999-12-31' end_dt
from ods.t_order where dt = '2022-12-02'insert overwrite table dwd.t_order_zip
partition(dt='2022-12-03')
select t1.id,t1.amt,t1.`status`,t1.user_id,t1.create_time,t1.modify_time,t1.start_dt,case when t1.end_dt = '9999-12-31' and t2.id is not null then '2022-12-02'else t1.end_dt end end_dt
from (select * from dwd.t_order_zip where dt = '2022-12-02'
)t1 left join (select * from ods.t_order where dt = '2022-12-03'
)t2 on t1.id = t2.id
union all
select id,amt,`status`,user_id,create_time,modify_time, '2022-12-03' start_dt,'9999-12-31' end_dt
from ods.t_order where dt = '2022-12-03'
五、DWS层设计开发
本层主要处理:
根据业务特点,提炼出若干“主题”(电商系统可以划分:活动主题、订单主题、用户主题、商品主题)
然后,区分每一个主题中的事实表[由用户的某个行动不断产生数据的表]、维度表[是对事实的一种描述]
张三是一个河南省郑州人,在京东平台上购买了一部
华为手机 [维度建模]
按照维度建模的思想,按各主题,将核心事实表关联需要的维度表,得到宽表
需求:用户消费订单统计画像标签表
给用户打上一些消费相关(下单、退货、金额、客单价)的统计数据标签
- 建表
create table dws.user_profile_consumer_tag(member_id bigint ,--用户first_order_time string ,--首单日期last_order_time string ,--末单日期first_order_ago bigint ,--首单距今天数last_order_ago bigint ,--末单距今天数month1_order_cnt bigint ,--近30天下单次数month1_order_amt double ,--近30天购买金额(总金额)month2_order_cnt bigint ,--近60天购买次数month2_order_amt double ,--近60天购买金额month3_order_cnt bigint ,--近90天购买次数month3_order_amt double ,--近90天购买金额max_order_amt double ,--最大订单金额min_order_amt double ,--最小订单金额total_order_cnt bigint ,--累计订单数(不含退拒)total_order_amt double ,--累计消费金额(不含退拒)total_coupon_amt double ,--累计使用代金券金额user_avg_order_amt double ,--平均订单金额(含退拒)month3_user_avg_amt double ,--近90天平均订单金额(含退拒)common_address string ,--常用收货地址common_paytype string ,--常用支付方式month1_cart_goods_cnt_30 bigint ,--最近30天加购商品件数month1_cart_goods_amt_30 bigint ,--最近30天加购商品金额month1_cart_cancel_cnt bigint ,--最近30天取消商品件数month1_cart_cancel_amt bigint ,--最近30天取消商品金额dw_date string --计算日期
) partitioned by
(dt string)
;
- SQL开发
with part1 as (select t1.member_id,min(t1.create_time) first_order_time, -- 首单日期max(t1.create_time) last_order_time, -- 末单日期datediff('2022-12-06',min(t1.create_time)) first_order_ago, -- 首单距今天数datediff('2022-12-06',max(t1.create_time)) last_order_ago, -- 末单距今天数count( if(datediff('2022-12-06',t1.create_time) <= 30 , 1 , null ) ) month1_order_cnt, -- 近30天下单次数sum( if(datediff('2022-12-06',t1.create_time) <= 30 , t1.total_amount , 0 ) ) month1_order_amt, -- 近30天购买金额count( if(datediff('2022-12-06',t1.create_time) <= 60 , 1 , null ) ) month2_order_cnt, -- 近60天下单次数sum( if(datediff('2022-12-06',t1.create_time) <= 60 , t1.total_amount , 0 ) ) month2_order_amt, -- 近60天购买金额count( if(datediff('2022-12-06',t1.create_time) <= 90 , 1 , null ) ) month3_order_cnt, -- 近90天下单次数sum( if(datediff('2022-12-06',t1.create_time) <= 90 , t1.total_amount , 0 ) ) month3_order_amt, -- 近90天购买金额max(t1.total_amount) max_order_amt, -- 最大订单金额min(t1.total_amount) min_order_amt, -- 最小订单金额count(if(t2.order_id is null,1 , null)) total_order_cnt, -- 累计订单数(不含退拒)sum(if(t2.order_id is null,t1.total_amount,0)) total_order_amt, -- 累计消费金额(不含退拒) sum(t1.coupon_amount) total_coupon_amt, --累计使用代金券金额avg(t1.total_amount) user_avg_order_amt, --平均订单金额(含退拒)avg(if(datediff('2022-12-06',create_time)<=90,total_amount,null)) month3_user_avg_amt -- 近90天平均订单金额(含退拒)from(select id,member_id,create_time,total_amount,coupon_amount,receiver_detail_addressfrom dwd.oms_order_zip where dt = '2022-12-06' and start_dt <= '2022-12-06' and end_dt >= '2022-12-06')t1 left join(select order_id,sum(return_amount) return_amountfrom dwd.oms_order_return_apply_zipwhere dt = '2022-12-06' and start_dt <= '2022-12-06' and end_dt >= '2022-12-06' and status = 2group by order_id)t2 on t1.id = t2.order_idgroup by t1.member_id
)
,part2 as (select member_id,receiver_detail_address common_addressfrom(select member_id,receiver_detail_address,count(*) c1,row_number() over(partition by member_id order by count(*) desc) rnfrom dwd.oms_order_zip where dt = '2022-12-06' and start_dt <= '2022-12-06' and end_dt >= '2022-12-06'group by member_id,receiver_detail_address)t1 where rn = 1
),part3 as (select member_id,pay_type common_paytypefrom(select member_id,pay_type,count(*) c1,row_number() over(partition by member_id order by count(*) desc) rnfrom dwd.oms_order_zip where dt = '2022-12-06' and start_dt <= '2022-12-06' and end_dt >= '2022-12-06' and pay_type != 0group by member_id,pay_type)t1 where rn = 1),part4 as (select member_id,sum(if(datediff('2022-12-06',create_date)<=30,quantity,0)) month1_cart_goods_cnt_30,sum(if(datediff('2022-12-06',create_date)<=30,quantity*price,0)) month1_cart_goods_amt_30,sum(if(datediff('2022-12-06',create_date)<=30 and delete_status='1',quantity,0)) month1_cart_cancel_cnt,sum(if(datediff('2022-12-06',create_date)<=30 and delete_status='1',quantity*price,0)) month1_cart_cancel_amt from dwd.oms_cart_item_zipwhere dt = '2022-12-06' and start_dt <= '2022-12-06' and end_dt >= '2022-12-06' group by member_id
)
insert overwrite table dws.user_profile_consumer_tag
partition(dt = '2022-12-06')
select part1.member_id ,part1.first_order_time ,part1.last_order_time ,part1.first_order_ago ,part1.last_order_ago ,part1.month1_order_cnt ,part1.month1_order_amt ,part1.month2_order_cnt ,part1.month2_order_amt ,part1.month3_order_cnt ,part1.month3_order_amt ,part1.max_order_amt ,part1.min_order_amt ,part1.total_order_cnt ,part1.total_order_amt ,part1.total_coupon_amt ,part1.user_avg_order_amt ,part1.month3_user_avg_amt ,part2.common_address ,part3.common_paytype ,part4.month1_cart_goods_cnt_30 ,part4.month1_cart_goods_amt_30 ,part4.month1_cart_cancel_cnt ,part4.month1_cart_cancel_amt ,'2022-12-06' dw_date
from part1
left join part2 on part1.member_id = part2.member_id
left join part3 on part1.member_id = part3.member_id
left join part4 on part1.member_id = part4.member_id