数据入仓和数据ETL(七)


✨博客主页: https://blog.csdn.net/m0_63815035?type=blog
💗《博客内容》:大数据、Java、测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识
📢博客专栏: https://blog.csdn.net/m0_63815035/category_11954877.html
📢欢迎点赞 👍 收藏 ⭐留言 📝
📢本文为学习笔记资料,如有侵权,请联系我删除,疏漏之处还请指正🙉
📢大厦之成,非一木之材也;大海之阔,非一流之归也✨

前言&课程重点
大家好,我是程序员小羊!接下来一周,咱们将用 “实战拆解 + 技术落地” 的方式,带大家吃透一个完整的大数据电商项目 ——不管你是想靠项目经验敲开大厂就业门,还是要做毕业设计、提升技术深度,这门课都能帮你 “从懂概念到能落地”。
毕竟大数据领域不缺 “会背理论” 的人,缺的是 “能把项目跑通、能跟业务结合” 的实战型选手。咱们这一周的内容,不搞虚的,全程围绕 “电商业务痛点→数据解决方案→技术栈落地” 展开,每天聚焦 1 个核心模块,最后还能输出可放进简历的项目成果。
进入正题:
学习重点:
本项目是一门面向有 Java 基础 SQL 基础、初入大数据生态的实战课程,以“跨境电商销售与产品域”为主线,构建从数据采集、分层建模到指标产出的端到端体系。学习将围绕三条主线展开:
其一,夯实平台能力——搭建并优化 Hadoop 高可用集群,理解 HDFS 存储与 YARN 调度,确保大规模数据处理的稳定性与可扩展性;
其二,掌握数仓方法——在 Hive 上完成 ODS→DWD→DWS→ADS 的分层设计,形成销售域与产品域的维度模型,并以 Spark SQL 完成核心指标的高效计算;
其三,解决跨境场景难题——处理多币种结算、汇率归一、时区差异、订单生命周期(下单/支付/发货/签收/退款/拒付)以及税费/运费口径等问题,保证各指标在不同市场和站点间的可比性。
在工程落地上,你将通过 Flume 采集 Web/应用日志并入仓,结合批式 ETL 处理交易与商品主数据;通过 Shell 与 crontab 将 Spark 作业定时编排,生成面向经营的 ADS 指标宽表;最终以 FineBI 等工具完成销售/产品看板展示与答辩汇报。课程强调“指标=模型+口径+验证”,所有结果需给出口径声明与抽样复核方法,贴近企业真实的报表生产流程。
课程介绍:
课程以“跨境电商产品销售建模计算”为主题,围绕销售域(Sales Domain)与产品域(Product Domain)展开。
首先完成基础设施与数据层的统一:Hadoop 高可用环境的部署与调优,HDFS 的冗余与吞吐优化,YARN 资源隔离策略;其后,在 Hive 上建立分层数据仓库,明确事实表与维度表的职责边界,例如订单事实(含支付/退款明细)、商品维度(SKU/SPU/类目树)、市场维度(站点/国家/币种/时区)与客户维度等。
在模型稳定后,进入 Spark SQL 指标计算阶段。你将以“净销售额”“订单笔数”“折扣率”“月度增长率”等为牵引,系统演练跨币种归一(引入日汇率表并固定“结算基准日”)、跨时区对齐(统一至站点本地日或集团 UTC 口径)、订单生命周期过滤(排除未支付、合并退款/拒付的净额口径)与税运费处理(按口径纳入/剔除)。
课程最后以自动化数据管道与可视化收尾:Spark 作业实现日/周/月产出,Shell+crontab 定时调度,ADS 层形成可直接支撑经营分析的指标宽表,FineBI 构建“销售总览”“热销商品”“类目结构”主题看板。
https://www.quanxiaoha.com/article/jetbrains-active-codes.html
Jetbrains 其他产品对应激活码
课程计划:
大数据跨境电商产品销售建模计算

建议在这里再插入有关对数据仓库的层级进行讲解,防止大家忘记不知道数仓中的每一层在做什么。
本期重点
数据入库
下一步就是在 Hive 的 ods 库里建一张 sales 表,直接能读你现在拉取的 HDFS上ods/sales/ 里的原始数据。
如果你的目录出现了SalesDa 和 default 运行下面的命令删除他们
hdfs dfs -rm -r hdfs://hdfs-yjx/yjxshop/ods/sales/SalesDa
hdfs dfs -rm -r hdfs://hdfs-yjx/yjxshop/ods/sales/default
如果你查询的时候出现了:[08S01][1] Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Permission denied: user=anonymous, access=EXECUTE, inode=“/tmp/hadoop-yarn”:root:supergroup:drwx------
错误,运行下面代码:
hdfs dfs -chmod 777 /tmp
hdfs dfs -chmod -R 777 /tmp/hadoop-yarn
注意,本数据集计算量过大,你需要调整虚拟机至少为 5 4 3 才可以启动,并且Spark应该提高内存
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 900M \
--num-executors 1 --executor-cores 1 --executor-memory 1G
启动Scala之后你需要现在DataGrip运行下面命令确定你的资源可以运行下面的数据
-- 在进入datagrip连接Spark之后先不要急着算指标,先运行下面代码-- —— Adaptive Query Execution:自动并行度与倾斜修复 ——
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.localShuffleReader.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true;
SET spark.sql.adaptive.advisoryPartitionSizeInBytes=64m; -- 目标分区大小(写/聚合更稳)
-- SET spark.sql.adaptive.coalescePartitions.minPartitionSize=32m; -- 需要更细再开-- —— Shuffle 并行度(小而稳) ——
SET spark.sql.shuffle.partitions=16; -- 12~24 之间调;内存更紧可降到 12-- —— 文件读取切分(单 Task 峰值更低) ——
SET spark.sql.files.maxPartitionBytes=64m;
SET spark.sql.files.openCostInBytes=16m;-- —— Join 行为(谨慎广播,优先 SMJ 稳定) ——
SET spark.sql.autoBroadcastJoinThreshold=16m; -- 小表才广播;遇到不稳可改为 -1 全禁用
SET spark.sql.broadcastTimeout=120;
SET spark.sql.join.preferSortMergeJoin=true;-- —— 存储/压缩(I/O 省内存) ——
SET spark.sql.parquet.compression.codec=zstd;
SET spark.sql.inMemoryColumnarStorage.compressed=true;-- —— 动态分区写入(只覆盖当月/当前分区) ——
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 可选:避免全表覆盖
SET spark.sql.sources.partitionOverwriteMode=dynamic;# 或者使用下面命令启动Spark
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 900M \
--num-executors 1 --executor-cores 1 --executor-memory 1G \
\
--conf spark.executor.memoryOverhead=512m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=64m \
\
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.sql.adaptive.localShuffleReader.enabled=true \
\
--conf spark.sql.shuffle.partitions=16 \
--conf spark.default.parallelism=16 \
--conf spark.sql.files.maxPartitionBytes=64m \
--conf spark.sql.files.openCostInBytes=16m \
\
--conf spark.sql.autoBroadcastJoinThreshold=16m \
--conf spark.sql.broadcastTimeout=120 \
\
--conf spark.memory.fraction=0.5 \
--conf spark.memory.storageFraction=0.2 \
\
--conf spark.sql.parquet.compression.codec=zstd \
--conf spark.sql.inMemoryColumnarStorage.compressed=true \
\
--conf spark.driver.maxResultSize=256m
ods 数据入库
为避免Saprk没有权限计算 你需要 运行
hdfs dfs -chmod -R 777 /yjxshop放置权限问题报错
- 为 ods_sales 创建一个表,并管理上分区规则保证数据正常的能被查询。
-- 进入ODS数据库
USE ods;-- 创建销售表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_sales (SalesID BIGINT,SalesPersonID BIGINT,CustomerID BIGINT,ProductID BIGINT,Quantity INT,Discount DOUBLE,TotalPrice DOUBLE,SalesDate STRING,TransactionNumber STRING
)
PARTITIONED BY (ym STRING) -- 按年月分区
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/sales/';MSCK REPAIR TABLE ods_sales;
show tables;
select * from ods_sales limit 100;
select count(*) from ods_sales;
-- 预估数据量:669w - 700w
- 为 ods_products创建表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_products (ProductID BIGINT,ProductName STRING,CategoryID BIGINT,UnitPrice DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/products/';select count(*) from ods_products;
select * from ods_products limit 100;
-- 产品数量预估 452
- 为 ods_employees 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS ods_employees (EmployeeID BIGINT,FirstName STRING,LastName STRING,BirthDate STRING,Gender STRING,HireDate STRING,CityID BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/employees/';select * from ods_employees limit 100;
select count(*) from ods_employees;
-- 员工数量 23
- 为 ods_customers 建一个表
DROP TABLE IF EXISTS ods_customers;CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_customers (CustomerID BIGINT,FirstName STRING,MiddleInitial STRING,LastName STRING,CityID BIGINT,Address STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/ods/customers/';-- 98759
select count(*) from ods.ods_customers;
select * from ods_customers limit 10;
dim 数据入库
dim里面有我们之前临时采集放在ods的例如 customers、employees、products 我们要拉出来做拉链表放在dim中,同时还有不变的,例如 cities,categories,countries
但是这里注意,我们之前没放在ods的数据没办法只能给dim再找另外的hdfs位置存储,这边我们换一个文件夹
- 为 dim_cities 创建一个表,并且倒数据到数据干净
use dim;CREATE EXTERNAL TABLE IF NOT EXISTS dim_cities (CityID BIGINT,CityName STRING,Zipcode STRING,CountryID BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/cities/';select * from dim_cities limit 100;
select count(*) from dim_cities;
-- 参考值 96
- 为 dim_countries 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS dim_countries (CountryID BIGINT,CountryName STRING,CountryCode STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/countries/';select * from dim_countries limit 100;
select count(*) from dim_countries;
-- 参考值 206
- 为 dim_categories 创建一个表
CREATE EXTERNAL TABLE IF NOT EXISTS dim_categories (CategoryID BIGINT,CategoryName STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/categories/';select * from dim_categories limit 100;
select count(*) from dim_categories;
-- 参考值 11
| 表名 | 类型 | 预计数据量(行数) | 备注说明 |
|---|---|---|---|
ods_sales | 事实表(ODS) | 669万 - 700万 | 按年月(ym)分区管理 |
ods_products | 维度表(ODS) | 453 | 产品信息 |
ods_employees | 维度表(ODS) | 24 | 员工信息 |
ods_customers | 维度表(ODS) | 98760 | 客户信息 |
dim_cities | 维度表(DIM) | 97 | 城市列表 |
dim_countries | 维度表(DIM) | 207 | 国家列表 |
dim_categories | 维度表(DIM) | 12 | 商品类别列表 |
dim 拉链表入仓抽取
我们之前在ods中存储了几个表,包括 ods_customers,ods_employees,ods_products 我们要给他们在dim中创建拉链表并且插入进去。
下面的代码请在 hive 中运行
- dim_products_l 表 之前我们先创建一个hdfs目录确保数据可以以外部表的方式创建
hdfs dfs -mkdir /yjxshop/dim/products_l/
hdfs dfs -chmod 751 /yjxshop/dim/products_l/
USE dim;
DROP TABLE IF EXISTS dim_products_l;-- 2. 重新用 ORC 正确创建
CREATE EXTERNAL TABLE IF NOT EXISTS dim_products_l (ProductID BIGINT,ProductName STRING,CategoryID BIGINT,UnitPrice DOUBLE,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/products_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");-- 3. 再执行插入
INSERT INTO TABLE dim_products_l
SELECTProductID,ProductName,CategoryID,UnitPrice,'2025-04-26' AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_products
WHERE ProductID IS NOT NULL;select * from dim_products_l limit 10;
- dim_customers_l 我们还是先创建一个hdfs文件夹,保证后面的数据可以正确的进去。
hdfs dfs -mkdir /yjxshop/dim/customers_l/
hdfs dfs -chmod 751 /yjxshop/dim/customers_l/
USE dim;DROP TABLE IF EXISTS dim_customers_l;CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_customers_l (CustomerID BIGINT,FirstName STRING,Middleinitial STRING,LastName STRING,CityID BIGINT,address STRING,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/customers_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");INSERT INTO TABLE dim.dim_customers_l
SELECTCustomerID,FirstName,Middleinitial,LastName,CityID,address,date_format(now(),'yyyy-MM-dd') AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_customers
WHERE CustomerID IS NOT NULL;
- dim_employees_l 我们还是先创建一个hdfs文件夹,保证后面的数据可以正确的进去。
hdfs dfs -mkdir /yjxshop/dim/employees_l/
hdfs dfs -chmod 751 /yjxshop/dim/employees_l/
USE dim;DROP TABLE IF EXISTS dim_employees_l;CREATE EXTERNAL TABLE IF NOT EXISTS dim_employees_l (EmployeeID BIGINT,FirstName STRING,LastName STRING,BirthDate STRING,Gender STRING,HireDate STRING,CityID BIGINT,start_dt STRING,end_dt STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dim/employees_l/'
TBLPROPERTIES ("orc.compress"="SNAPPY");INSERT INTO TABLE dim_employees_l
SELECTEmployeeID,FirstName,LastName,BirthDate,Gender,HireDate,CityID,date_format(now(),'yyyy-MM-dd') AS start_dt,'9999-12-31' AS end_dt
FROM ods.ods_employees
WHERE EmployeeID IS NOT NULL;
数据清洗
这边我们看数据,其实基本上我们只需要关注事实表数据是否清洁即可,我们来分析一下里面的每一个字段,看看那些东西需要确定质量完备。
| 检查项 | 说明 | 必须 |
|---|---|---|
salesid 不为空 | 主键,不能为空 | 必须 |
salespersonid 不为空 | 销售人员ID,不能为空(否则记录是谁的订单都不知道)为空改为 -1 | 建议 |
customerid 不为空 | 客户ID,不能为空 为空改为 -1 | 建议 |
productid 不为空 | 产品ID,不能为空 为空改为 -1 | 建议 |
quantity 大于0 | 销售数量不能为负,也不能为0 否则为 -1 | 建议 |
discount 介于0~1之间 | 折扣比例必须合理,不能小于0或者大于1 | 建议 |
salesdate 有值且格式正常 | 必须有销售时间,没有剔除 | 必须 |
transactionnumber 不强制校验 | 交易编号,异常也不影响主业务 |
确定好清洗和过滤方案后,我们就开始把数据经过过滤SQL放置到dwd层。
为了在dwd层中创建一个新的外部表我们应该在hdfs中对应位置创建文件夹:
hdfs dfs -mkdir /yjxshop/dwd/sales/
hdfs dfs -chmod 751 /yjxshop/dwd/sales/
USE dwd;CREATE EXTERNAL TABLE IF NOT EXISTS dwd_sales (SalesID BIGINT,SalesPersonID BIGINT,CustomerID BIGINT,ProductID BIGINT,Quantity INT,Discount DOUBLE,TotalPrice DOUBLE,SalesDate STRING,TransactionNumber STRING
)
PARTITIONED BY (ym STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dwd/sales/';-- 开启动态分区
set hive.exec.dynamic.partition.mode=nonstrict;-- 开始插入
INSERT INTO TABLE dwd_sales
PARTITION (ym)
SELECTSalesID,IF(SalesPersonID IS NULL, -1, SalesPersonID),IF(CustomerID IS NULL, -1, CustomerID),IF(ProductID IS NULL, -1, ProductID),CASE WHEN Quantity > 0 THEN Quantity ELSE -1 END,Discount,TotalPrice,SalesDate,TransactionNumber,substr(SalesDate, 1, 7) AS ym
FROM ods.ods_sales
WHERESalesID IS NOT NULLAND cast(SalesID as string) RLIKE '^[0-9]+$'AND SalesDate IS NOT NULLAND length(SalesDate) >= 7AND Discount >= 0AND Discount <= 1;
DWS宽表补维
在运行这个代码之前先在HDFS中创建
hdfs://hdfs-yjx/yjxshop/dws/sales_wide/文件后再执行下面代码
USE dws;CREATE EXTERNAL TABLE dws_sales_wide (SalesID BIGINT,SalesDate STRING,ym STRING,TotalPrice DOUBLE,Quantity INT,Discount DOUBLE,CustomerID BIGINT,CityName STRING,CountryCode STRING,CountryName STRING,ProductID BIGINT,ProductName STRING,CategoryID BIGINT,CategoryName STRING,EmployeeID BIGINT,EmployeeName STRING
)
STORED AS PARQUET
TBLPROPERTIES ('orc.compress' = 'SNAPPY')
LOCATION 'hdfs://hdfs-yjx/yjxshop/dws/sales_wide/';
INSERT OVERWRITE TABLE dws.dws_sales_wide
SELECTs.SalesID,s.SalesDate,substr(s.SalesDate, 1, 7) AS ym,s.TotalPrice,s.Quantity,s.Discount,c.CustomerID,ci.CityName,ci.CountryID,co.CountryName,p.ProductID,p.ProductName,p.CategoryID,cat.CategoryName,e.EmployeeID,concat_ws(' ', e.FirstName, e.LastName) AS EmployeeNameFROM dwd.dwd_sales s
LEFT JOIN dim.dim_customers_l c ON s.CustomerID = c.CustomerID
LEFT JOIN dim.dim_cities ci ON c.CityID = ci.CityID
LEFT JOIN dim.dim_countries co ON ci.CountryID = co.CountryID
LEFT JOIN dim.dim_products_l p ON s.ProductID = p.ProductID
LEFT JOIN dim.dim_categories cat ON p.CategoryID = cat.CategoryID
LEFT JOIN dim.dim_employees_l e ON s.SalesPersonID = e.EmployeeID
WHERE s.SalesID IS NOT NULL;select * from dws.dws_sales_wide limit 100;
理论上来说这样就可以了,但是这并不完美还差一些,我们可以检查一下,
例如: TotalPrice 是本订单的价值 全是 是0,检查ods发现他本来就是 0,检查发现有销售量但是没有销售额。思考怎么解决?
检查国家,发现国家都是一个国家只是有不同的城市,那好吧就当国家维度废了就行没有什么大碍。
于是我们在原来 dws.dws_sales_wide 表的基础上做一个 dws.dws_sales_wide_2
CREATE EXTERNAL TABLE IF NOT EXISTS dws_sales_wide_2 (SalesID BIGINT,SalesDate STRING,ym STRING,TotalPrice DOUBLE,Quantity INT,Discount DOUBLE,CustomerID BIGINT,CityName STRING,CountryCode STRING,CountryName STRING,ProductID BIGINT,ProductName STRING,CategoryID BIGINT,CategoryName STRING,EmployeeID BIGINT,EmployeeName STRING
)
STORED AS PARQUET
LOCATION 'hdfs://hdfs-yjx/yjxshop/dws/dws_sales_wide_2/';INSERT OVERWRITE TABLE dws.dws_sales_wide_2
selectSalesID,SalesDate,ym,(rand()*100*Quantity)-Discount as TotalPrice,Quantity,Discount,CustomerID,CityName,CountryCode,CountryName,ProductID,ProductName,CategoryID,CategoryName,EmployeeID,EmployeeNamefrom dws_sales_wide;
今天这篇文章就到这里了,大厦之成,非一木之材也;大海之阔,非一流之归也。感谢大家观看本文

