当前位置: 首页 > news >正文

大数据电商流量分析项目实战:Spark SQL 基础(四)

在这里插## 标题入图片描述

> 						大家好,我是程序员小羊!

✨博客主页: https://blog.csdn.net/m0_63815035?type=blog

💗《博客内容》:大数据、Java、测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识
📢博客专栏: https://blog.csdn.net/m0_63815035/category_11954877.html
📢欢迎点赞 👍 收藏 ⭐留言 📝
📢本文为学习笔记资料,如有侵权,请联系我删除,疏漏之处还请指正🙉
📢大厦之成,非一木之材也;大海之阔,非一流之归也✨

在这里插入图片描述

前言&课程重点

大家好,我是程序员小羊!接下来一周,咱们将用 “实战拆解 + 技术落地” 的方式,带大家吃透一个完整的大数据电商项目 ——不管你是想靠项目经验敲开大厂就业门,还是要做毕业设计、提升技术深度,这门课都能帮你 “从懂概念到能落地”。

毕竟大数据领域不缺 “会背理论” 的人,缺的是 “能把项目跑通、能跟业务结合” 的实战型选手。咱们这一周的内容,不搞虚的,全程围绕 “电商业务痛点→数据解决方案→技术栈落地” 展开,每天聚焦 1 个核心模块,最后还能输出可放进简历的项目成果。

进入正题:

本项目是一门实战导向的大数据课程,专为具备Java基础但对大数据生态系统不熟悉的同学量身打造。你将从零开始,逐步掌握大数据的基本概念、架构原理以及在电商流量分析中的实际应用,迅速融入当下热门的离线数据处理技术。

在这门课程中,你将学会如何搭建和优化Hadoop高可用环境,了解HDFS存储、YARN资源调度的核心原理,为数据处理打下坚实的基础。同时,你将掌握Hive数据仓库的构建和数仓建模方法,了解如何将海量原始数据经过层次化处理,转化为高质量的数据资产。

课程还将引领你深入Spark SQL的世界,通过实际案例学习如何利用Spark高效计算PV、UV以及各类衍生指标,提升数据分析效率。此外,你还将学习Flume的安装与配置,实现Web日志的实时采集和ETL入仓,确保数据传输的稳定与高效。

为了贴近企业实际运作,本项目还包括定时任务的设置和自动化数据管道构建,教你如何编写Shell脚本并利用crontab定时调度Spark作业,让数据处理过程实现自动化与智能化。最后,通过可视化展示模块,你将学会用FineBI等工具将数据分析结果直观呈现

总之,这是一门集大数据基础、系统搭建、数据处理与智能分析于一体的全链路实战课程。无论你是初入大数据领域的新手,还是希望提升数据处理能力的开发者,都将在这里收获满满,掌握最前沿的大数据技术。

课程计划:

天数主题主要内容
Day 1大数据基础+项目分组 (ZK补充)大数据概念、数仓建模、组件介绍、分组;简单介绍项目。
Day 2Hadoop初认识+ HA环境搭建初认识Hadoop,了解HDFS 基本操作,YARN 资源调度,数据存储测试等,并且完成Hadoop高可用的环境搭建。
Day 3Hive 数据仓库Hive SQL 基础、表设计、加载数据,搭建Hive环境并融入Hadoop实现高可用
Day 4Spark SQL 基础讲解Spark基础,DataFrame & SQL 查询,Hive 集成和环境的搭建
Day 5Flume 数据采集及ETL入仓安装Flume高可用,学习基础的Flume知识并且使用Flume 采集 Web 日志,存入 HDFS;数据格式解析,数据传输优化
Day 6数据入仓 & 指标计算解析 PV、UV 计算逻辑,Hive 数据清洗、分层存储(ODS → DWD)
Day 7Spark 计算 & 指标优化使用 Spark SQL 计算 PV、UV 及衍生指标(如跳出率、人均访问时长等)
Day 8定时任务 & 数据管道编写 Shell 脚本,使用 crontab 实现定时任务,调度 Spark SQL
Day 9可视化 & 数据分析搭建一个简单的项目使用 FineBI 进行数据展示,分析趋势。
Day 10项目答辩小组演示分析结果,可以后台联系程序员小羊点评

今日学习重点【Spark on Hive】:

Apache Spark 是专门为大规模数据处理而设计的快速通用的计算引擎。是一种类似 Hadoop MapReduce 的通用并行计算框架,它拥有 Hadoop MapReduce 所具有的优点,但不同于的是 Spark Job 的中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少磁盘数据交互,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法。 Spark 也被称为基于内存的分布式计算框架。其底层使用的是 Scala 语言编写对于使用要求有一定 Scala 水平。

声明 要是想要学懂Spark需要你有一定的Java水平并且带有一定的Scala语言的水平,本文章只是快速入门

安装Spark on Hive

安装主要分为两个部分,首先他是基于Scala为基础的所以必须要安装Scala语言的运行环境,其次是他的安装需要和Hive建立连接。

首先去下载这两组件的安装包 scala:scala-2.12.17.rpm Spark:spark-3.3.2-bin-hadoop3.tgz 下载地址:https://cloud.189.cn/web/share?code=NzuA3aiiYNFf(访问码:fad9)

1.进入软件的预安装目录,上传这两个软件的安装包

[root@node01 ~]# cd /opt/yjx/
【上传文件scala-2.12.17.rpm 注意这个文件三个机器都要上传】# 三个机器都运行安装scala的代码,这是运行spark的必备环境
[root@node01 yjx]# rpm -ivh scala-2.12.17.rpm
[root@node02 yjx]# rpm -ivh scala-2.12.17.rpm
[root@node03 yjx]# rpm -ivh scala-2.12.17.rpm
  1. 通过 whereis scala 命令查询 Scala 的安装目录,配置环境变量 vim /etc/profile 修改完成后*source /etc/profile* 重新加载环境变量。
[root@node01 yjx]# whereis scala
例如返回数据:/usr/share/scala[root@node01 yjx]# vim /etc/profile
【添加环境变量】
export SCALA_HOME=/usr/share/scala
export PATH=$SCALA_HOME/bin:$PATH[root@node01 yjx]# source /etc/profile
  1. 将环境变量发送给剩下的两个节点
[root@node01 yjx]# cd /etc
[root@node01 etc]# scp -r profile node02:$PWD
[root@node01 etc]# scp -r profile node03:$PWD
【别忘了在其他节点上运行 source /etc/profile 重新加载环境变量】
  1. 检查环境变量 java -version scala -version

在这里插入图片描述

  1. 以YARN模式安装Spark,首先将 Spark 安装包上传至服务器,解压后重命名为 spark-3.3.2
[root@node01 ~]# tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /opt/yjx/
[root@node01 ~]# mv /opt/yjx/spark-3.3.2-bin-hadoop3/ /opt/yjx/spark-3.3.2解压目录说明:
bin      可执行脚本
conf     配置文件
data     示例程序使用数据
examples 示例程序
jars     依赖 jar 包
kubernetes k8s 容器相关文件
python   Python API
R R API
sbin     集群管理命令
yarn     整合 yarn 需要的文件
  1. 修改 Hadoop 的 yarn-site.xml 文件(三台机器都需要修改)添加以下内容(搭建 Hadoop YARN 环境时已经配置):
[root@node01 ~]# vim /opt/yjx/hadoop-3.3.4/etc/hadoop/yarn-site.xml# 在config中添加下面的内容
<!-- 设置是否对容器强制执行物理内存限制 -->
<!-- 是否启动一个线程检查每个任务正在使用的物理内存量,如果任务超出分配值,则将其直接杀掉,默认为 true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!-- 设置是否对容器强制执行虚拟内存限制 -->
<!-- 是否启动一个线程检查每个任务正在使用的虚拟内存量,如果任务超出分配值,则将其直接杀掉,默认为 true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
  1. 修改 spark-env.sh 环境配置脚本文件。在文件末尾添加以下内容:
[root@node01 ~]# cd /opt/yjx/spark-3.3.2/conf/
[root@node01 conf]# cp spark-env.sh.template spark-env.sh
[root@node01 conf]# vim spark-env.sh在文件末尾添加以下内容:export JAVA_HOME=/usr/java/jdk1.8.0_351-amd64
export HADOOP_HOME=/opt/yjx/hadoop-3.3.4
export HADOOP_USER_NAME=root
HADOOP_CONF_DIR=/opt/yjx/hadoop-3.3.4/etc/hadoop
YARN_CONF_DIR=/opt/yjx/hadoop-3.3.4/etc/hadoop
SPARK_DIST_CLASSPATH=$(/opt/yjx/hadoop-3.3.4/bin/hadoop classpath)
  1. 修改 spark-defaults.conf 配置文件。在文件后面添加数据
[root@node01 conf]# cp spark-defaults.conf.template spark-defaults.conf
[root@node01 conf]# vi spark-defaults.conf在文件末尾添加以下内容:# 开启记录事件日志的功能
spark.eventLog.enabled=true
# 设置事件日志存储的目录。注意:配置的 HDFS 路径要存在
spark.eventLog.dir=hdfs://hdfs-yjx/logs/spark-jobs
spark.history.fs.logDirectory=hdfs://hdfs-yjx/logs/spark-jobs
# 日志优化选项,压缩日志
spark.eventLog.compress=true
# 历史服务器与 YARN 关联
spark.yarn.historyServer.address=node01:16066
# 历史服务器 WEBUI 访问端口
spark.history.ui.port=16066
  1. 修改 spark-env.sh 环境配置脚本文件。在文件末尾添加以下内容:
[root@node01 conf]# vim spark-env.sh在文件末尾添加以下内容:
SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=16066
-Dspark.history.fs.logDirectory=hdfs://hdfs-yjx/logs/spark-jobs
-Dspark.history.retainedApplications=30"
  1. 批量连接三个节点 将 Hive 的 hive-site.xml 拷贝到 Spark 的 conf 目录;将 MySQL 的 驱动包 拷贝到 Spark 的 jars 目录;将 core-site.xml 和 hdfs-site.xml 拷贝到 Spark 的 conf 目录(为了连接 HDFS)
cp /opt/yjx/apache-hive-3.1.3-bin/conf/hive-site.xml /opt/yjx/spark-3.3.2/conf
cp /opt/yjx/apache-hive-3.1.3-bin/lib/mysql-connector-java-8.0.18.jar /opt/yjx/spark-3.3.2/jars
cp /opt/yjx/hadoop-3.3.4/etc/hadoop/core-site.xml /opt/yjx/spark-3.3.2/conf
cp /opt/yjx/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /opt/yjx/spark-3.3.2/conf
  1. 将 node01 配置好的 Spark 拷贝至 node02 和 node03。
[root@node02 ~]# scp -r root@node01:/opt/yjx/spark-3.3.2/ /opt/yjx/
[root@node03 ~]# scp -r root@node01:/opt/yjx/spark-3.3.2/ /opt/yjx/
  1. 启动Spark
# 启动 ZooKeeper(三台机器都需要执行)。 三个节点全部依次运行
zkServer.sh start
zkServer.sh status# 启动 Hadoop 集群并在 HDFS 中创建 /logs/spark-jobs 目录。
[root@node01 ~]# start-all.sh
[root@node01-02-03 ~]# start-yarn.sh
[root@node01 ~]# hdfs dfs -mkdir -p /logs/spark-jobs# 启动 YARN 历史服务。
[root@node01 ~]# mapred --daemon start historyserver# 启动 Spark 历史服务。
[root@node01 ~]# /opt/yjx/spark-3.3.2/sbin/start-history-server.sh# 启动Hive
nohup hive --service metastore >> /dev/hive-runlog 2>&1 &
nohup hiveserver2 >> /dev/hive2-runlog 2>&1 &cd /opt/yjx/spark-3.3.2/
sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10015 \
--master yarn --deploy-mode client \
--queue default \
--driver-cores 1 --driver-memory 1024M \
--num-executors 1 --executor-cores 1 --executor-memory 1G

手动连接:/opt/yjx/spark-3.3.2/bin/beeline -u jdbc:hive2://node01:10015 -n root

  1. 测试运行是否成功 接下来我们可以在 Node01:8088 上看到 YARN 的后台节点上的任务,点击 History 即可查看通过 Spark 历史服务查看日志

访问:http://node01:4040/ 出现下面的页面就算成功

在这里插入图片描述

在DataGrip中添加Spark的连接,注意没有密码,用户root。服务端口号 10015

Spark简介

MapReduce 主要基于映射(Map)和归约(Reduce)操作,而 Spark 提供了更多的数据处理功能。Spark 能够更快速高效地完成任务,因为它可以在内存中缓存中间结果无需频繁读写磁盘简化了原来的 shuffle 过程;此外, Spark 具有 DAG(有向无环图)调度器以优化执行计划

相比 MapReduce 仅适用于简单批处理任务,Spark 可以应对包括流处理、交互查询在内的多种计算需求,并且提供了更灵活的数据源支持,这让Spark 是一个高效、多功能并依赖于内存的大数据处理引擎,适合各种复杂任务。

Spark特点

速度快:Spark 在内存中的运算速度快于 100 倍,在硬盘上的运算快 10 倍以上。它通过基于内存的数据流高效处理实现了这一点。

易用性:Spark 支持多种编程语言(如 Scala、Java、Python、R 和 SQL),内置超过 80 种高级算子,提供交互式 Shell 工具,简化应用开发与验证过程。

通用性:Spark 是一个单一框架解决方案,可用于批处理、查询、流处理、机器学习和图计算等多个领域,并在这些场景中无缝集成使用。

兼容性:Spark 能够很好地融入现有的 Hadoop 生态系统,可直接利用 HDFS 等存储系统,并通过 YARN 或 Apache Mesos 进行资源管理和调度。这意味着无需迁移数据即可获取 Spark 的高性能优势。

Spark的运行模式

Local :本地模式。不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于调试,演示等。本地模式就是一个独立的进程,通过其内部的多个线程来模拟整个 Spark 运行时环境。

Standalone :独立模式。Standalone 是 Spark 自带的一个资源调度框架,支持单节点和集群两种模式。集群模式需要构建一个由 Master + Worker 构成的 Spark 集群,Spark 运行在集群中。Spark 中的各个角色以独立进程的形式存在,并组成 Spark 集群环境。

**YARN :**Spark 也可以基于 YARN 来计算(将 Spark 应用提交到 YARN 上运行)。Spark 客户端直接连接 YARN,不需要额外构建 Spark 集群,国内使用较多。Spark 中的各个角色运行在 YARN 的容器内部,并组成 Spark 集群环境。

离线和实时的计算

Spark通常用于离线计算,同时他又有 SprkSteamming 可以实现实时计算,Spark 并不是纯的实时计算,他的实时计算是基于流批一体的,融合了流式计算和批处理的能力,能够同时支持实时和离线数据处理。将进来的文件也当做小批量的批处理任务处理,只不过比较快可以做到实时。

离线计算是指将大量的数据收集起来,批量处理这些数据。通常是在固定的时间段内(如每天、每小时)收集数据,然后进行统一处理。处理的结果不会立即反馈结果而是在所有数据处理完成后才产生结果。

处理过程的时间一般比较长,适合处理大量历史数据,如日志分析、报表生成等,适用于不急切计算和业务对实时计算要求不高的场景。如:数据仓库加载、定时数据分析、日志处理等

实时计算是指对持续不断生成的数据流进行即时处理。数据在生成后几乎立即被处理,处理结果可以实时或准实时地反馈给用户。

数据在生成后立即处理,反馈时间非常短,适合低延迟项目,适合处理需要即时响应的数据,如实时监控、在线推荐等。

在这里插入图片描述

Spark的核心组件

在 Spark on YARN 环境中,主要的核心组件有:

  1. YARN ResourceManager
    – 负责整个集群资源的统一管理和调度,根据各个应用(Application Master)的请求分配资源。

  2. YARN NodeManager
    – 运行在每个集群节点上,负责监控和管理该节点的资源使用情况,以及在本节点上启动和管理容器。

  3. Spark ApplicationMaster
    – 当你以集群模式运行 Spark 应用时,Spark Driver 会作为 ApplicationMaster 运行在一个 Yarn 容器内。它负责协调整个应用、申请资源和监控任务执行。

  4. Spark Driver
    – 无论是在客户端模式还是集群模式,Driver 负责整个 Spark 应用的调度、任务分发和结果收集。在集群模式下,它嵌入在 ApplicationMaster 中;在客户端模式下,则运行在提交应用的客户端机器上。

  5. Spark Executors
    – 在 YARN 节点上启动的容器内运行,用于实际执行任务、处理数据并将结果返回给 Driver。

此外,通常还会有辅助组件,比如 Spark History Server 用于收集和展示作业的运行历史

执行架构和流程

在这里插入图片描述

  1. 提交应用程序 客户端(Client)通过 spark-submit 命令向集群提交一个应用程序(Application)。可以指定执行器的数量(num-executors)、每个执行器的核心数(executor-cores)、每个执行器的内存大小(executor-memory)等参数。

  2. 初始化 SparkContext : Driver 程序(通常是用户提交的应用程序中的主程序)初始化一个 SparkContext。SparkContext 是 Spark 应用的入口,它负责与集群交互,分配资源。

  3. 资源申请 Driver 向集群管理器请求资源:SparkContext 向集群管理器(例如 YARN 的 ResourceManager)发送请求,申请运行执行器(Executor)所需的资源。

  4. 资源分配 集群管理器根据当前的资源情况决定是否满足资源请求。如果资源充足,则分配所需的资源,否则继续等待。

  5. 创建执行器 集群管理器向工作节点(Worker Nodes)发送指令,启动执行器进程。执行器启动后会向 Driver 注册。

  6. 任务调度和执行 转化算子(懒加载):当用户在应用程序中定义的 RDD(弹性分布式数据集)操作是惰性的,即不会立即执行,只是记录操作的定义。

  7. 遇到行动算子时,SparkContext 会触发实际的计算。催生 Job行动算子会催生一个 Job一个 Job 包含多个阶段(Stages),每个阶段包含多个任务(Tasks)

  8. 任务在执行器中执行: Driver 将任务分配到不同的执行器中。每个任务对应一个 RDD 分区(Partition)。

  9. 任务完成:执行器接收到任务后,在本地执行,并将结果返回给 Driver。

粗粒度 Spark 会在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 Task 执行完成后,才会释放这部分资源。

优点:在 Application 执行之前,所有的资源都申请完毕,每一个 Task 直接使用资源就可以了,不需要 Task 在执行前自己去申请资源, Task 启动就快了, Task 执行快了, Stage 执行就快了, Job 也快了,同理 Application 执行也就快了。但是这样也出现了资源利用率较低的问题

RDD和算子

RDD(Resilient Distributed Dataset)即 弹性分布式数据类型。是 Spark 中的核心抽象,它表示一个 不可变的可并行操作的数据集合。RDD 具有以下几个关键特性:
弹性:RDD 可以自动从节点故障中恢复。
分布式:RDD 的数据分布在集群的多个节点上。
不可变:一旦创建,RDD 的数据是不可变的,不能直接修改。

RDD 的来源主要有两个方面: ①从外部数据集创建,例如本地文件或者 HDFS 中。 ②通过现有 RDD 进行转换。

RDD 主要由 两种类型的操作(算子) 进行操作:转换(Transformation):创建新的 RDD(惰性计算)。行动(Action):触发计算,返回结果或写入外部存储。

算子是 RDD 的操作,分为 转换算子(Transformation)行动算子(Action)

**① 转换算子(Transformation)**作用:对 RDD 进行转换,生成新的 RDD,不会立即执行(惰性计算)

常见的转换算子和行动算子

算子类型算子名称说明
转换算子map对RDD中每个元素应用函数,生成新RDD
转换算子flatMap对每个元素应用函数并展平(产生0个或多个元素),生成新RDD
转换算子filter根据给定条件过滤RDD中元素,返回符合条件的RDD
转换算子union合并两个RDD,返回包含所有元素的新RDD
转换算子groupByKey对键值对RDD按照键进行分组,返回(key, Iterable)
转换算子reduceByKey对键值对RDD中相同键的值进行归约计算,返回(key, result)
转换算子distinct去重,返回RDD中不重复的元素
转换算子sortBy根据指定函数对RDD中的元素进行排序,返回排序后的RDD
行动算子collect将RDD中所有元素收集到Driver端,适用于数据量较小情况
行动算子count返回RDD中元素的个数
行动算子reduce对RDD所有元素进行聚合操作,返回单个值
行动算子take(n)返回RDD前n个元素
行动算子saveAsTextFile将RDD的数据保存到外部存储系统,如HDFS
行动算子foreach对RDD中每个元素执行指定函数,通常用于输出或写入外部存储
// 下面是一个使用 Spark Scala API 实现英文文章词频统计的示例代码。假设你的英文文章存放在 HDFS 上的文件 hdfs://your_hdfs_path/english_article.txt 中。import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {// 初始化 Spark 配置与上下文val conf = new SparkConf().setAppName("WordCountExample").setMaster("local")val sc = new SparkContext(conf)// 读取文本文件(可以是 HDFS 路径或本地路径)val textFile = sc.textFile("hdfs://your_hdfs_path/english_article.txt")// 分词、转换为小写,并为每个单词赋值1val words = textFile.flatMap(line => line.split("\\s+")).map(word => (word.toLowerCase, 1))// 按照单词进行归约求和,得到每个单词的词频val wordCounts = words.reduceByKey(_ + _)// 对词频进行降序排序(可选)val sortedWordCounts = wordCounts.sortBy(_._2, ascending = false)// 将结果收集到Driver端并打印输出sortedWordCounts.collect.foreach{ case (word, count) =>println(s"$word: $count")}// 停止 SparkContextsc.stop()}
}

使用 flatMap 对每行文本按照空格进行分词(实际场景中可能需要更复杂的分词逻辑)。用 map 将单词转换为小写,并映射为 (word, 1) 形式。使用 reduceByKey 对相同单词进行求和,得到每个单词的总次数。可选步骤中利用 sortBy 按词频降序排序,方便查看高频词汇。最后通过 collect 将结果收集到 Driver 并打印输出。

宽依赖和窄依赖

RDD 的依赖关系主要分为两种:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。这两种依赖关系影响了数据在集群中的传输方式和计算性能。

在这里插入图片描述

窄依赖指的是父 RDD 的每个分区最多只被子 RDD 的一个分区所使用。这意味着数据可以在单个分区内独立处理,不需要跨分区的数据传输。常见的窄依赖操作包括 mapfilterunion 等。

窄依赖的一个典型例子是 map 操作。例如,对于一个 RDD 执行 map 操作时,每个元素都会应用一个函数生成新的元素,这个过程是并行且独立的,不需要依赖其他分区的数据。

宽依赖指的是父 RDD 的每个分区可能被子 RDD 的多个分区所使用。这通常会导致数据在各个分区之间进行大量的重新分配和传输(即 shuffle 操作)。常见的宽依赖操作包括 reduceByKeygroupByKeyjoin 等。宽依赖也是分隔 stage 的重要分割点。

宽依赖的一个典型例子是 reduceByKey 操作。在进行这个操作时,相同键的数据可能分布在不同的分区中,需要通过 shuffle 操作将相同键的数据聚集到同一个分区进行计算。

窄依赖与宽依赖的区别

数据传输:窄依赖不需要跨分区的数据传输是一对一的,宽依赖需要 shuffle 操作等待数据都算到,在各个分区之间进行大量的数据传输。

计算性能:窄依赖的计算性能较高,因为每个分区的计算是独立进行的,不需要额外的通信开销。而宽依赖的计算性能较低,因为 shuffle 操作会带来较大的网络开销和计算延迟。

容错恢复:窄依赖的恢复较为简单,只需要重算失效分区即可。宽依赖的恢复较为复杂,因为需要重新进行 shuffle 操作,将数据重新分配到正确的分区。

通常来说,窄依赖算子:如 mapfilterflatMap 等都是高效的,因为他们不用再不用的分区中传输数据,也就是没有shuffle,因为只在局部数据内计算,所以执行效率高,调度延迟低,不需要进行网络通信或磁盘I/O的shuffle过程。。

不高效算子通常是宽依赖算子:如 groupByKeyreduceByKey(如果不做局部聚合)、join(非广播 join)等。

这些算子会触发 shuffle 操作,导致网络传输、磁盘I/O和数据倾斜问题,从而显著增加计算开销和延迟。建议:在可以优化的情况下,尽量使用 reduceByKeyaggregateByKey 或采用广播变量来替代直接的 join 等操作,从而降低shuffle的开销。

Spark中的数据倾斜

当数据中某些 key 的分布极不均匀时,会出现热点 key,即某个(或少数)key对应的数据量远大于其他 key。例如在 reduceByKeygroupByKey 操作时,如果某个 key 聚集了大量数据,处理该 key 的任务(task)就会成为瓶颈,导致整个作业执行时间拉长,这就是数据倾斜。

与 MapReduce 的区别

MapReduce:数据倾斜也会导致少数 reduce 任务处理数据量远超其他任务,从而延长整个作业的运行时间。但 MapReduce 的 shuffle 是磁盘为主,倾斜问题可能相对隐蔽,且 MapReduce 的调度和容错机制与 Spark 不同。

Spark:数据倾斜更容易在内存中暴露,因为 Spark 采用 DAG 调度且依赖内存中的数据处理。如果某个 stage 中某个任务处理的数据远多于其他任务,不仅会导致长时间等待,还可能引发内存溢出等问题。Spark 的数据倾斜问题在某些场景下可能更严重,因为整体作业的执行速度往往取决于最慢的任务。

缓解数据倾斜的方法

Salting(加盐法): 在产生倾斜的 key 前添加一个随机前缀(或后缀),使得原本集中的数据散开到多个分区上;在聚合完毕后再去掉前缀重新合并结果。

调整分区数: 使用 repartitioncoalesce 操作来增加或减少分区数,从而更均衡地分布数据。

使用 CombineByKey/AggregateByKey 利用这些算子先在各分区内局部聚合,再进行全局聚合,减少跨分区数据量。

广播 Join:如果一张表很小,可以使用广播变量将其广播到所有节点,避免大规模 shuffle。

血统和依赖

血统指的是一个 RDD 从原始数据(或外部数据源)经过一系列转换操作(Transformation)所形成的有向无环图(DAG)。这个 DAG 记录了 RDD 之间的转换关系,说明了一个 RDD 是如何由上游 RDD 生成的。

依赖描述的是一个 RDD 的各个分区与其父 RDD 分区之间的关系。根据依赖关系的不同。

数据倾斜与依赖的关系

  • 数据倾斜
    当宽依赖操作中某些 key 的数据分布极不均衡时,会导致部分任务处理的数据量远大于其他任务,从而成为整个作业的瓶颈。这就是数据倾斜问题。

  • 依赖的作用
    数据倾斜往往发生在宽依赖场景中,因为宽依赖需要对数据进行重新分区和 Shuffle。通过理解依赖关系,开发者可以采用诸如“加盐”、“自定义分区器”、“局部聚合”等方法来缓解数据倾斜。

DAG有向无环图

Job 是 Spark 作业的基本执行单位。当你在 Spark 中调用一个行动算子,Spark 会触发一个 Job。这个 Job 会执行从原始数据源到最终结果的一系列计算。一个 Job 包含多个 Task,Task 是实际在集群节点上运行的计算单元。

DAG(有向无环图) 是 Spark 内部用来表示 RDD 之间依赖关系的图结构。这个图中的每个节点表示一个 RDD 转换操作,有点像是列出血缘关系的图。数据的流向是单向的最优路径所以是不会循环的。

在这里插入图片描述

DAG(有向无环图)的生成与任务调度的过程:首先 Client 提交任务到 SparkContext,Driver 接收到任务后,会将 RDD 的转换操作按依赖关系组织成一个 DAG。这部分的 DAG 生成是基于 RDD 之间的依赖关系(窄依赖和宽依赖)

转换算子表示 RDD 的转换操作只有在遇到行动算子时,才会触发计算,并生成相应的 DAG。图右侧展示了 DAG 划分为多个 Stage 的过程。Stage 是由 DAG 中的宽依赖分割出来的,宽依赖需要进行 Shuffle 操作,而窄依赖的操作则可以在同一个 Stage 中顺序执行。

整体运行流程当一个行动算子算完会返回 Driver 再准备下一个 job,直到算到最后一个所有结果都算完了

SparkSQL

spark随着后面的发展也逐渐产生了很多的api也接口,算子依然是底层,并且比较难学开发者就把常见的使用和情景封装成了SQL可以通过SQL的方式计算,他主要会经过下面的流程

在这里插入图片描述

在这里插入图片描述

如上图,它详细说明了从 SQL、DataFrame 或 Dataset 的查询到最终执行的 RDD 操作的过程。Catalyst 解析器的执行流程负责将高层次的查询转换为高效的物理执行计划。

首先,用户会提交一个 SQL 查询、DataFrame 操作或者 Dataset 操作。无论是哪种形式的输入,Spark 都会将其转化为一个统一的逻辑计划。
解析阶段(Parser):这里主要做的是语法解析,它将查询字符串解析为一个逻辑操作的树结构。检查使用语法是否有问题,是否存在对应的表或者字段。生成 Unresolved Logical Plan(未绑定的逻辑执行计划)
分析阶段(Analyzer):使用 Analysis Rules,配合元数据。将未解析的逻辑计划转化为一个解析后的逻辑计划(Logical Plan)。在这个阶段,所有的表名、列名等都会被具体化,并且会进行类型检查,以确保查询是合法的。
优化阶段(Optimizer):解析后的逻辑计划会经过一系列的优化规则(RBO)处理,生成一个优化后的逻辑计划优化器会应用各种优化策略,例如常量折叠、谓词下推、列裁剪等,以减少查询的计算量,并提高查询的执行效率。
物理计划生成阶段(Planner):优化后的逻辑计划会被传递给物理计划生成器(Planner),在这里它会被转换为一个或多个物理计划(Physical Plan)。物理计划是具体执行的方案,它定义了实际执行任务时的步骤。
代价模型评估(Cost Model):在物理计划生成之后,Catalyst 使用代价模型(Cost Model)来评估各个物理计划的执行代价。代价模型会考虑数据量、网络开销、计算复杂度等因素,选择出一个最优的物理计划,这个过程称为基于代价的优化(CBO)
运行:经过代价模型评估后,Catalyst 会选择最优的物理计划作为最终的执行计划(Selected Physical Plan)。这个执行计划会被转换为底层的 RDD 操作,并提交给 Spark 执行引擎。整个流程从用户提交查询到实际执行,Catalyst 起到了核心作用。它将高层次的查询逐步转换为底层的物理执行计划,并通过解析、分析、优化、生成和选择物理计划,确保查询能够高效地执行。

结尾:

本课程是一门以电商流量数据分析为核心的大数据实战课程,旨在帮助你全面掌握大数据技术栈的核心组件及其在实际项目中的应用。从零开始,你将深入了解并实践Hadoop、Hive、Spark和Flume等主流技术,为企业级电商流量项目构建一个高可用、稳定高效的数据处理系统。

在课程中,你将学习如何搭建并优化Hadoop高可用环境,熟悉HDFS分布式存储和YARN资源调度机制,为大规模数据存储与计算奠定坚实基础。随后,通过Hive数据仓库的构建与数仓建模,你将掌握如何将原始日志数据进行分层处理,实现数据清洗与结构化存储,从而为后续数据分析做好准备。

借助Spark SQL的强大功能,你将通过实战案例学会快速计算和分析关键指标,如页面浏览量(PV)、独立访客数(UV),以及通过数据比较获得的环比、等比等衍生指标。这些指标将帮助企业准确洞察用户行为和流量趋势,为优化营销策略提供科学依据。

同时,本课程还包含Flume数据采集与ETL入仓的实战模块,教你如何采集实时Web日志数据,并利用ETL流程将数据自动导入HDFS和Hive,确保数据传输和处理的高效稳定。

总体来说,这门课程面向希望提升大数据应用能力的技术人员和企业项目团队,紧密围绕公司电商流量项目的实际需求展开。通过系统的理论讲解与动手实践,你不仅能够构建从数据采集、存储、处理到可视化展示的完整数据管道,还能利用PV、UV、环比、等比等关键指标,全面掌握电商流量数据分析的核心技能。

今天这篇文章就到这里了,大厦之成,非一木之材也;大海之阔,非一流之归也。感谢大家观看本文

在这里插入图片描述


文章转载自:

http://lLmrdqkw.zztmk.cn
http://2DBqcPWo.zztmk.cn
http://4ZGVD2Uj.zztmk.cn
http://KeCCRtBV.zztmk.cn
http://tbp4dNX9.zztmk.cn
http://gQslxKxK.zztmk.cn
http://Dqlsd9A7.zztmk.cn
http://eRhp9FCi.zztmk.cn
http://znpCauoh.zztmk.cn
http://JtkLt6K7.zztmk.cn
http://f09RnjWt.zztmk.cn
http://Pbj7rVt2.zztmk.cn
http://dqPA87bk.zztmk.cn
http://UZsBsTGJ.zztmk.cn
http://6av4BsrS.zztmk.cn
http://WADQ7BPK.zztmk.cn
http://xmwHKbVm.zztmk.cn
http://wQiJ4w94.zztmk.cn
http://jNCLNNX4.zztmk.cn
http://twE18PcB.zztmk.cn
http://PjKeNK3j.zztmk.cn
http://IAMBwSeU.zztmk.cn
http://oqRaV7Gj.zztmk.cn
http://7SbGNvF7.zztmk.cn
http://0OzhLdSK.zztmk.cn
http://OYX8XqB2.zztmk.cn
http://M4dgKP9e.zztmk.cn
http://Q98NKTrK.zztmk.cn
http://qwEYrCk8.zztmk.cn
http://pcPN5tK2.zztmk.cn
http://www.dtcms.com/a/380267.html

相关文章:

  • vmware ubuntu18设置共享文件夹的几个重要点
  • 每日一题(5)
  • Lumerical licence center 无法连接的问题
  • Java网络编程(2):(socket API编程:UDP协议的 socket API -- 回显程序)
  • Java 类加载机制双亲委派与自定义类加载器
  • OpenLayers数据源集成 -- 章节九:必应地图集成详解
  • 前端调试工具有哪些?常用前端调试工具推荐、前端调试工具对比与最佳实践
  • 【C++练习】16.C++将一个十进制转换为二进制
  • 公司本地服务器上搭建部署的办公系统web项目网站,怎么让外网访问?有无公网IP下的2种通用方法教程
  • 【C++】string类 模拟实现
  • 【系列文章】Linux中的并发与竞争[02]-原子操作
  • 微信小程序 -开发邮箱注册验证功能
  • 使用ollama启动文心开源大模型0.3b版本
  • 【langchain】构建检索问答链
  • QT M/V架构开发实战:QSqlQueryModel/ QSqlTableModel/ QSqlRelationalTableModel介绍
  • 网络编程入门:构建你的第一个客户端-服务器应用
  • 极简灰度发布实现新老风控系统切流
  • 基于跳跃表的zset实现解析(lua版)
  • 【学习K230-例程18】GT6700-HTTP-Server
  • Redis列表(List):实现队列/栈的利器,底层原理与实战
  • 超级流水线和标量流水线的原理
  • 漫谈《数字图像处理》之边缘检测与边界预处理的辨析
  • (二)文件管理-文件查看-less命令的使用
  • 深入理解节流(Throttle):原理、实现与应用场景
  • 汽车电子电气架构中的电源架构(下)
  • GISBox与GeoServer使用体验全对比:轻量化工具如何重新定义GIS价值?
  • 02.【Linux系统编程】Linux权限(root超级用户和普通用户、创建普通用户、sudo短暂提权、权限概念、权限修改、粘滞位)
  • JavaEE 初阶第二十二期:网络原理,底层框架的“通关密码”(二)
  • Netty 实战应用:从 RPC 到即时通讯,再到 WebSocket
  • 南京方言数据集|300小时高质量自然对话音频|专业录音棚采集|方言语音识别模型训练|情感计算研究|方言保护文化遗产数字化|语音情感识别|方言对话系统开发