当前位置: 首页 > news >正文

数据入仓和数据ETL(七)

在这里插## 标题入图片描述

> 						大家好,我是程序员小羊!

✨博客主页: https://blog.csdn.net/m0_63815035?type=blog

💗《博客内容》:大数据、Java、测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识
📢博客专栏: https://blog.csdn.net/m0_63815035/category_11954877.html
📢欢迎点赞 👍 收藏 ⭐留言 📝
📢本文为学习笔记资料,如有侵权,请联系我删除,疏漏之处还请指正🙉
📢大厦之成,非一木之材也;大海之阔,非一流之归也✨

在这里插入图片描述

前言&课程重点

大家好,我是程序员小羊!接下来一周,咱们将用 “实战拆解 + 技术落地” 的方式,带大家吃透一个完整的大数据电商项目 ——不管你是想靠项目经验敲开大厂就业门,还是要做毕业设计、提升技术深度,这门课都能帮你 “从懂概念到能落地”。

毕竟大数据领域不缺 “会背理论” 的人,缺的是 “能把项目跑通、能跟业务结合” 的实战型选手。咱们这一周的内容,不搞虚的,全程围绕 “电商业务痛点→数据解决方案→技术栈落地” 展开,每天聚焦 1 个核心模块,最后还能输出可放进简历的项目成果。

进入正题:

学习重点:

本项目是一门面向有 Java 基础 SQL 基础、初入大数据生态的实战课程,以“跨境电商销售与产品域”为主线,构建从数据采集、分层建模到指标产出的端到端体系。学习将围绕三条主线展开:

其一,夯实平台能力——搭建并优化 Hadoop 高可用集群,理解 HDFS 存储与 YARN 调度,确保大规模数据处理的稳定性与可扩展性;

其二,掌握数仓方法——在 Hive 上完成 ODS→DWD→DWS→ADS 的分层设计,形成销售域与产品域的维度模型,并以 Spark SQL 完成核心指标的高效计算;

其三,解决跨境场景难题——处理多币种结算、汇率归一、时区差异、订单生命周期(下单/支付/发货/签收/退款/拒付)以及税费/运费口径等问题,保证各指标在不同市场和站点间的可比性。

在工程落地上,你将通过 Flume 采集 Web/应用日志并入仓,结合批式 ETL 处理交易与商品主数据;通过 Shell 与 crontab 将 Spark 作业定时编排,生成面向经营的 ADS 指标宽表;最终以 FineBI 等工具完成销售/产品看板展示与答辩汇报。课程强调“指标=模型+口径+验证”,所有结果需给出口径声明与抽样复核方法,贴近企业真实的报表生产流程。

课程介绍:

课程以“跨境电商产品销售建模计算”为主题,围绕销售域(Sales Domain)与产品域(Product Domain)展开。

首先完成基础设施与数据层的统一:Hadoop 高可用环境的部署与调优,HDFS 的冗余与吞吐优化,YARN 资源隔离策略;其后,在 Hive 上建立分层数据仓库,明确事实表与维度表的职责边界,例如订单事实(含支付/退款明细)、商品维度(SKU/SPU/类目树)、市场维度(站点/国家/币种/时区)与客户维度等。

在模型稳定后,进入 Spark SQL 指标计算阶段。你将以“净销售额”“订单笔数”“折扣率”“月度增长率”等为牵引,系统演练跨币种归一(引入日汇率表并固定“结算基准日”)、跨时区对齐(统一至站点本地日或集团 UTC 口径)、订单生命周期过滤(排除未支付、合并退款/拒付的净额口径)与税运费处理(按口径纳入/剔除)。

课程最后以自动化数据管道与可视化收尾:Spark 作业实现日/周/月产出,Shell+crontab 定时调度,ADS 层形成可直接支撑经营分析的指标宽表,FineBI 构建“销售总览”“热销商品”“类目结构”主题看板。

https://www.quanxiaoha.com/article/jetbrains-active-codes.html
Jetbrains 其他产品对应激活码

课程计划:

大数据跨境电商产品销售建模计算

在这里插入图片描述

建议在这里再插入有关对数据仓库的层级进行讲解,防止大家忘记不知道数仓中的每一层在做什么。

本期重点

数据入库

下一步就是在 Hive 的 ods 库里建一张 sales 表,直接能读你现在拉取的 HDFS上ods/sales/ 里的原始数据。

如果你的目录出现了SalesDa 和 default 运行下面的命令删除他们

hdfs dfs -rm -r hdfs://hdfs-yjx/yjxshop/ods/sales/SalesDa

hdfs dfs -rm -r hdfs://hdfs-yjx/yjxshop/ods/sales/default

如果你查询的时候出现了:[08S01][1] Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Permission denied: user=anonymous, access=EXECUTE, inode=“/tmp/hadoop-yarn”:root:supergroup:drwx------
错误,运行下面代码:

hdfs dfs -chmod 777 /tmp

hdfs dfs -chmod -R 777 /tmp/hadoop-yarn

注意,本数据集计算量过大,你需要调整虚拟机至少为 5 4 3 才可以启动,并且Spark应该提高内存

sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 900M \
--num-executors 1 --executor-cores 1 --executor-memory 1G

启动Scala之后你需要现在DataGrip运行下面命令确定你的资源可以运行下面的数据

-- 在进入datagrip连接Spark之后先不要急着算指标,先运行下面代码-- —— Adaptive Query Execution:自动并行度与倾斜修复 ——
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.localShuffleReader.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true;
SET spark.sql.adaptive.advisoryPartitionSizeInBytes=64m;   -- 目标分区大小(写/聚合更稳)
-- SET spark.sql.adaptive.coalescePartitions.minPartitionSize=32m;  -- 需要更细再开-- —— Shuffle 并行度(小而稳) ——
SET spark.sql.shuffle.partitions=16;     -- 12~24 之间调;内存更紧可降到 12-- —— 文件读取切分(单 Task 峰值更低) ——
SET spark.sql.files.maxPartitionBytes=64m;
SET spark.sql.files.openCostInBytes=16m;-- —— Join 行为(谨慎广播,优先 SMJ 稳定) ——
SET spark.sql.autoBroadcastJoinThreshold=16m;  -- 小表才广播;遇到不稳可改为 -1 全禁用
SET spark.sql.broadcastTimeout=120;
SET spark.sql.join.preferSortMergeJoin=true;-- —— 存储/压缩(I/O 省内存) ——
SET spark.sql.parquet.compression.codec=zstd;
SET spark.sql.inMemoryColumnarStorage.compressed=true;-- —— 动态分区写入(只覆盖当月/当前分区) ——
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 可选:避免全表覆盖
SET spark.sql.sources.partitionOverwriteMode=dynamic;# 或者使用下面命令启动Spark
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 900M \
--num-executors 1 --executor-cores 1 --executor-memory 1G \
\
--conf spark.executor.memoryOverhead=512m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=64m \
\
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.sql.adaptive.localShuffleReader.enabled=true \
\
--conf spark.sql.shuffle.partitions=16 \
--conf spark.default.parallelism=16 \
--conf spark.sql.files.maxPartitionBytes=64m \
--conf spark.sql.files.openCostInBytes=16m \
\
--conf spark.sql.autoBroadcastJoinThreshold=16m \
--conf spark.sql.broadcastTimeout=120 \
\
--conf spark.memory.fraction=0.5 \
--conf spark.memory.storageFraction=0.2 \
\
--conf spark.sql.parquet.compression.codec=zstd \
--conf spark.sql.inMemoryColumnarStorage.compressed=true \
\
--conf spark.driver.maxResultSize=256m
ods 数据入库

为避免Saprk没有权限计算 你需要 运行 hdfs dfs -chmod -R 777 /yjxshop 放置权限问题报错

  1. 为 ods_sales 创建一个表,并管理上分区规则保证数据正常的能被查询。
-- 进入ODS数据库
USE ods;-- 创建销售表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_sales (SalesID BIGINT,SalesPersonID BIGINT,CustomerID BIGINT,ProductID BIGINT,Quantity INT,Discount DOUBLE,TotalPrice DOUBLE,SalesDate STRING,TransactionNumber STRING
)
PARTITIONED BY (ym STRING)  -- 按年月分区
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/sales/';MSCK REPAIR TABLE ods_sales;
show tables;
select * from ods_sales limit 100;
select count(*) from ods_sales;
-- 预估数据量:669w - 700w
  1. 为 ods_products创建表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_products (ProductID BIGINT,ProductName STRING,CategoryID BIGINT,UnitPrice DOUBLE
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE 
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/products/';select count(*) from ods_products;
select * from ods_products limit 100;
-- 产品数量预估 452
  1. 为 ods_employees 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_employees (EmployeeID BIGINT,FirstName STRING,LastName STRING,BirthDate STRING,Gender STRING,HireDate STRING,CityID BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/employees/';select * from ods_employees limit 100;
select count(*) from ods_employees;
-- 员工数量 23
  1. 为 ods_customers 建一个表
DROP TABLE IF EXISTS ods_customers;CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_customers (CustomerID BIGINT,FirstName STRING,MiddleInitial STRING,LastName STRING,CityID BIGINT,Address STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/customers/';-- 98759
select count(*) from ods.ods_customers;
select * from ods_customers limit 10;
dim 数据入库

dim里面有我们之前临时采集放在ods的例如 customers、employees、products 我们要拉出来做拉链表放在dim中,同时还有不变的,例如 cities,categories,countries

但是这里注意,我们之前没放在ods的数据没办法只能给dim再找另外的hdfs位置存储,这边我们换一个文件夹

  1. 为 dim_cities 创建一个表,并且倒数据到数据干净
use dim;CREATE EXTERNAL TABLE IF NOT EXISTS dim_cities (CityID BIGINT,CityName STRING,Zipcode STRING,CountryID BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/cities/';select * from dim_cities limit 100;
select count(*) from dim_cities;
-- 参考值 96
  1. 为 dim_countries 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS dim_countries (CountryID BIGINT,CountryName STRING,CountryCode STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/countries/';select * from dim_countries limit 100;
select count(*) from dim_countries;
-- 参考值 206
  1. 为 dim_categories 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS dim_categories (CategoryID BIGINT,CategoryName STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/categories/';select * from dim_categories limit 100;
select count(*) from dim_categories;
-- 参考值 11
表名类型预计数据量(行数)备注说明
ods_sales事实表(ODS)669万 - 700万按年月(ym)分区管理
ods_products维度表(ODS)453产品信息
ods_employees维度表(ODS)24员工信息
ods_customers维度表(ODS)98760客户信息
dim_cities维度表(DIM)97城市列表
dim_countries维度表(DIM)207国家列表
dim_categories维度表(DIM)12商品类别列表
dim 拉链表入仓抽取

我们之前在ods中存储了几个表,包括 ods_customers,ods_employees,ods_products 我们要给他们在dim中创建拉链表并且插入进去。

下面的代码请在 hive 中运行

  1. dim_products_l 表 之前我们先创建一个hdfs目录确保数据可以以外部表的方式创建

hdfs dfs -mkdir /yjxshop/dim/products_l/

hdfs dfs -chmod 751 /yjxshop/dim/products_l/

USE dim;
DROP TABLE IF EXISTS dim_products_l;-- 2. 重新用 ORC 正确创建
CREATE EXTERNAL TABLE IF NOT EXISTS dim_products_l (ProductID BIGINT,ProductName STRING,CategoryID BIGINT,UnitPrice DOUBLE,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/products_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");-- 3. 再执行插入
INSERT INTO TABLE dim_products_l
SELECTProductID,ProductName,CategoryID,UnitPrice,'2025-04-26' AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_products
WHERE ProductID IS NOT NULL;select * from dim_products_l limit 10;
  1. dim_customers_l 我们还是先创建一个hdfs文件夹,保证后面的数据可以正确的进去。

hdfs dfs -mkdir /yjxshop/dim/customers_l/

hdfs dfs -chmod 751 /yjxshop/dim/customers_l/

USE dim;DROP TABLE IF EXISTS dim_customers_l;CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_customers_l (CustomerID BIGINT,FirstName STRING,Middleinitial STRING,LastName STRING,CityID BIGINT,address STRING,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/customers_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");INSERT INTO TABLE dim.dim_customers_l
SELECTCustomerID,FirstName,Middleinitial,LastName,CityID,address,date_format(now(),'yyyy-MM-dd') AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_customers
WHERE CustomerID IS NOT NULL;
  1. dim_employees_l 我们还是先创建一个hdfs文件夹,保证后面的数据可以正确的进去。

hdfs dfs -mkdir /yjxshop/dim/employees_l/

hdfs dfs -chmod 751 /yjxshop/dim/employees_l/

USE dim;DROP TABLE IF EXISTS dim_employees_l;CREATE EXTERNAL TABLE IF NOT EXISTS dim_employees_l (EmployeeID BIGINT,FirstName STRING,LastName STRING,BirthDate STRING,Gender STRING,HireDate STRING,CityID BIGINT,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/employees_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");INSERT INTO TABLE dim_employees_l
SELECTEmployeeID,FirstName,LastName,BirthDate,Gender,HireDate,CityID,date_format(now(),'yyyy-MM-dd') AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_employees
WHERE EmployeeID IS NOT NULL;

数据清洗

这边我们看数据,其实基本上我们只需要关注事实表数据是否清洁即可,我们来分析一下里面的每一个字段,看看那些东西需要确定质量完备。

检查项说明必须
salesid 不为空主键,不能为空必须
salespersonid 不为空销售人员ID,不能为空(否则记录是谁的订单都不知道)为空改为 -1建议
customerid 不为空客户ID,不能为空 为空改为 -1建议
productid 不为空产品ID,不能为空 为空改为 -1建议
quantity 大于0销售数量不能为负,也不能为0 否则为 -1建议
discount 介于0~1之间折扣比例必须合理,不能小于0或者大于1建议
salesdate 有值且格式正常必须有销售时间,没有剔除必须
transactionnumber 不强制校验交易编号,异常也不影响主业务

确定好清洗和过滤方案后,我们就开始把数据经过过滤SQL放置到dwd层。

为了在dwd层中创建一个新的外部表我们应该在hdfs中对应位置创建文件夹:

hdfs dfs -mkdir /yjxshop/dwd/sales/

hdfs dfs -chmod 751 /yjxshop/dwd/sales/

USE dwd;CREATE EXTERNAL TABLE IF NOT EXISTS dwd_sales (SalesID BIGINT,SalesPersonID BIGINT,CustomerID BIGINT,ProductID BIGINT,Quantity INT,Discount DOUBLE,TotalPrice DOUBLE,SalesDate STRING,TransactionNumber STRING
)
PARTITIONED BY (ym STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dwd/sales/';-- 开启动态分区
set hive.exec.dynamic.partition.mode=nonstrict;-- 开始插入
INSERT INTO TABLE dwd_sales
PARTITION (ym)
SELECTSalesID,IF(SalesPersonID IS NULL, -1, SalesPersonID),IF(CustomerID IS NULL, -1, CustomerID),IF(ProductID IS NULL, -1, ProductID),CASE WHEN Quantity > 0 THEN Quantity ELSE -1 END,Discount,TotalPrice,SalesDate,TransactionNumber,substr(SalesDate, 1, 7) AS ym
FROM ods.ods_sales
WHERESalesID IS NOT NULLAND cast(SalesID as string) RLIKE '^[0-9]+$'AND SalesDate IS NOT NULLAND length(SalesDate) >= 7AND Discount >= 0AND Discount <= 1;

DWS宽表补维

在运行这个代码之前先在HDFS中创建 hdfs://hdfs-yjx/yjxshop/dws/sales_wide/文件后再执行下面代码

USE dws;CREATE EXTERNAL TABLE dws_sales_wide (SalesID BIGINT,SalesDate STRING,ym STRING,TotalPrice DOUBLE,Quantity INT,Discount DOUBLE,CustomerID BIGINT,CityName STRING,CountryCode STRING,CountryName STRING,ProductID BIGINT,ProductName STRING,CategoryID BIGINT,CategoryName STRING,EmployeeID BIGINT,EmployeeName STRING
)
STORED AS PARQUET
TBLPROPERTIES ('orc.compress' = 'SNAPPY')
LOCATION 'hdfs://hdfs-yjx/yjxshop/dws/sales_wide/';
INSERT OVERWRITE TABLE dws.dws_sales_wide
SELECTs.SalesID,s.SalesDate,substr(s.SalesDate, 1, 7) AS ym,s.TotalPrice,s.Quantity,s.Discount,c.CustomerID,ci.CityName,ci.CountryID,co.CountryName,p.ProductID,p.ProductName,p.CategoryID,cat.CategoryName,e.EmployeeID,concat_ws(' ', e.FirstName, e.LastName) AS EmployeeNameFROM dwd.dwd_sales s
LEFT JOIN dim.dim_customers_l c ON s.CustomerID = c.CustomerID
LEFT JOIN dim.dim_cities ci ON c.CityID = ci.CityID
LEFT JOIN dim.dim_countries co ON ci.CountryID = co.CountryID
LEFT JOIN dim.dim_products_l p ON s.ProductID = p.ProductID
LEFT JOIN dim.dim_categories cat ON p.CategoryID = cat.CategoryID
LEFT JOIN dim.dim_employees_l e ON s.SalesPersonID = e.EmployeeID
WHERE s.SalesID IS NOT NULL;select * from dws.dws_sales_wide limit 100;

理论上来说这样就可以了,但是这并不完美还差一些,我们可以检查一下,
例如: TotalPrice 是本订单的价值 全是 是0,检查ods发现他本来就是 0,检查发现有销售量但是没有销售额。思考怎么解决?
检查国家,发现国家都是一个国家只是有不同的城市,那好吧就当国家维度废了就行没有什么大碍。

于是我们在原来 dws.dws_sales_wide 表的基础上做一个 dws.dws_sales_wide_2

CREATE EXTERNAL TABLE IF NOT EXISTS dws_sales_wide_2 (SalesID        BIGINT,SalesDate      STRING,ym             STRING,TotalPrice     DOUBLE,Quantity       INT,Discount       DOUBLE,CustomerID     BIGINT,CityName       STRING,CountryCode    STRING,CountryName    STRING,ProductID      BIGINT,ProductName    STRING,CategoryID     BIGINT,CategoryName   STRING,EmployeeID     BIGINT,EmployeeName   STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dws/dws_sales_wide_2/';INSERT OVERWRITE TABLE dws.dws_sales_wide_2
selectSalesID,SalesDate,ym,(rand()*100*Quantity)-Discount as TotalPrice,Quantity,Discount,CustomerID,CityName,CountryCode,CountryName,ProductID,ProductName,CategoryID,CategoryName,EmployeeID,EmployeeNamefrom dws_sales_wide;
今天这篇文章就到这里了,大厦之成,非一木之材也;大海之阔,非一流之归也。感谢大家观看本文

在这里插入图片描述

http://www.dtcms.com/a/594042.html

相关文章:

  • 怎么做网站评估遵义网站
  • Makefile常见错误与快速修复指南
  • 嵌入式Linux学习——文件目录
  • 中科院网站建设WordPress做头部的插件
  • python做网站有什么弊端台州seo网站排名优化
  • PostgreSQL基操
  • 光纤传输20公里的音频、USB光纤传输一体机深度解析
  • DIC多相机协同方案在复杂结构360°全景形貌与变形场检测中的应用研究
  • 发布建设网站一个优秀的个人网站
  • 做网站是干什么用的广州竞价托管公司
  • 梧州网站建设服务商电子商务网站建设
  • 做婚恋网站挣钱吗工商营业执照官网
  • 【ESP32接入最新国产豆包大模型教程】
  • 股指期货和融资融券:对冲交易的两大工具详解
  • 【javaEE】多线程--认识线程、多线程
  • 网站做淘宝客排名会掉吗重庆新闻频道直播在线观看
  • 专业建站流程佛山百度网站快速排名
  • 万能视频解析接口网站怎么做有没有专门做根雕的网站
  • 做网站定金一般多少个人网站seo
  • 青岛营销型网站设计公司开网站做外贸
  • 中国购物网站大全排名Wordpress右侧返回顶部按钮
  • 花都网站建设公司公众号平台网页版登录入口
  • 基于动态规划的潜能觉醒数学模型
  • 中文网站建设和英文网站建设的区别微信公众平台绑定网站
  • 百度站长网站文件验证公司基本介绍模版
  • iis 网站打不开如何做好一个外贸进网站的编辑
  • next.js学习——react入门
  • Java【缓存设计】定时任务+分布式锁实战:Redis vs Redisson实现状态自动扭转以及全量刷新预热机制
  • 缓存更新策略
  • 网站海外推广方案品牌策划公司的市场