day07 spark sql
# SparkSQL设计及入门
## 知识点01:课程回顾
1. Spark程序运行的流程?(Day06_SparkCore高级特性)
2. 什么是宽窄依赖?
3. Spark的Shuffle设计?
4. 开发:RDD算子;理论:Spark设计、面试
## 知识点02:课程目标
### 1. SparkSQL介绍(目标:掌握SparkSQL代码开发规则)
#### 问题1:为什么要学SparkSQL,SparkCore与SparkSQL有什么区别?
- 原因:对于结构化数据分析来讲,SQL是最合适的一门语言,SparkCore不支持SQL分析。
- 区别:
- SparkCore:RDD(分布式列表),数据为行,驱动对象为SparkContext。
- SparkSQL:DataFrame(分布式表),数据+Schema(行、列),驱动对象为SparkSession。
#### 问题2:SparkSQL特点以及应用场景?
- 特点(统一化):
- 开发接口:SQL、DSL(Python、Scala、Java)。
- 分析场景:离线、实时数据分析。
- 功能层面:数据分析、机器学习。
- 场景:
- SparkCore:数据清洗ETL,将非结构化或半结构化数据转换为结构化数据。
- SparkSQL:数据分析OLAP,对结构化数据文件、数据表进行统计分析,代替项目Presto角色。
#### 问题3:SparkSQL编程规则?
- todo:1-构建驱动对象:SparkSession
```
spark = ( SparkSession
.builder 申请spark进入权限
.appName() 应用程序名称
.master() 指定是本机模式还是集群模式
.config() 执行器内存和分区
.config()
.getOrCreate()
)
```
- todo:2-实现数据处理:DataFrame
1. 读取数据:df = spark.read.csv/jdbc/orc/text/json() 这几种格式都可以
2. 处理数据:
- SQL:①给DF取一个表名;②调用SQL进行处理
- df.createOrReplaceTmpView("tbname") 在df中创建数据表
- rs_sql = spark.sql("select …… from tbname") 使用sql语句进行增删改查操作
- DSL:直接调用DSL函数处理
- rs_dsl = df.select().where().groupBy().agg().orderBy().limit().join() 调用 dsl函数进行便 捷增删改查处理,当 然,作用和sql语句几乎是重复的
3. 保存结果:
- 打印:rs_sql.show() 显示前几行
- 保存:rs_dsl.write.csv/jdbc/orc/text/json() 保存为什么格式可以自由选择
- todo:3-关闭驱动对象:spark.stop()
### 2. DataFrame设计以及转换(目标:理解DataFrame设计以及与RDD的区别)
#### 问题1:RDD、DataFrame、DataSet有什么区别?
- RDD:分布式列表,只有数据没有Schema,支持泛型。
- DataFrame:分布式表,有数据也有Schema,不支持泛型。
- DataSet:分布式表,有数据也有Schema,支持泛型。
🧩 第二部分:DataFrame 设计与转换
🎯 问题1:RDD、DataFrame、DataSet 有什么区别?
我们用一个“快递分拣中心”的比喻来理解三者。
| 概念 | 类比 | 特点 |
|---|---|---|
| RDD | 一车没贴标签的包裹 | 只知道有东西,不知道里面是什么 |
| DataFrame | 贴了标准标签的包裹流水线 | 知道每个包裹的“结构”(如:收件人、电话、地址) |
| DataSet | 带类型检查的智能包裹流水线 | 不仅知道结构,还知道“这是手机”还是“是书” |
🔹 1. RDD(Resilient Distributed Dataset)
- 本质:分布式列表,像一个
List<T>分布在多台机器上。 - 特点:
- 没有 Schema(结构):Spark 不知道每条数据长什么样。
- 支持 泛型:你可以放任何类型,比如
RDD[String]、RDD[Person]。 - 运行时出错:比如你写
.map(x => x.name),但某个元素没有name字段,程序运行时才报错。 - 性能较低:因为 Spark 不知道数据结构,无法优化。
📌 就像一堆没分类的包裹,你要一个个拆开看才知道内容。
🔹 2. DataFrame
- 本质:分布式表,像一张 Excel 表,有行有列。
- 特点:
- 有 Schema:每列都有名字和类型(如
name: String,age: Int)。 - 不支持泛型:在 Scala/Java 中,它本质上是
Dataset[Row],所有数据都封装在Row对象里。 - SQL 友好:可以直接用
spark.sql("SELECT ...")查询。 - 性能高:Spark 可以利用 Schema 进行优化(如谓词下推、列剪裁)。
- 在 Python/SQL 中最常用。
- 有 Schema:每列都有名字和类型(如
📌 就像所有包裹都贴了标准化的快递单,机器可以自动读取“收件人”、“电话”等字段进行分拣。
🔹 3. DataSet
- 本质:带类型的 DataFrame(主要在 Scala 中使用)。
- 特点:
- 有 Schema ✅
- 支持 泛型 ✅(如
Dataset[Person]) - 编译时检查:如果你写
.map(person => person.height),但Person类没有height,代码写完就报错,不用等到运行。 - 性能最好,但只在 Scala/Java 中可用,Python/R 不支持。
📌 就像智能分拣系统不仅读快递单,还知道“这是易碎品”或“是生鲜”,自动走不同通道。
✅ 三者对比表
| 特性 | RDD | DataFrame | DataSet |
|---|---|---|---|
| 是否有 Schema | ❌ 无 | ✅ 有 | ✅ 有 |
| 是否支持泛型 | ✅ 支持 | ❌ 不支持(Python/SQL) | ✅ 支持(Scala/Java) |
| 编译时检查 | ❌ 运行时报错 | ❌ 运行时报错 | ✅ 编译时报错 |
| 性能 | 较低 | 高 | 最高 |
| SQL 支持 | ❌ | ✅ | ✅ |
| 主要使用语言 | 所有 | Python, SQL, Java, Scala | Scala, Java |
💡 总结:优先用 DataFrame,Scala 用户可考虑 DataSet,RDD 仅用于特殊场景。
🎯 问题2:RDD 和 DataFrame 如何互相转换?
我们用“快递单重贴标签”的比喻来理解。
- 理论:
- 方式一:反射推断,要求RDD中每个元素类型必须为Row类型。
- 方式二:自定义Schema,RDD元素类型必须为元组或者列表,且基于RDD数据结构自己定义一个Schema。
- 实现:
- rdd.toDF(【列名】)
- df.rdd =》 RDD[Row]
🔄 1. RDD → DataFrame
有两种方式,就像你有两种方法给一堆旧包裹贴新标签:
✅ 方式一:反射推断(自动识别)
“你看着办,根据包裹里的内容,自动猜出字段名和类型。”
from pyspark.sql import Row# RDD 中每个元素必须是 Row 类型
rdd = spark.sparkContext.parallelize([Row(name="张三", age=25),Row(name="李四", age=30)
])# 自动推断结构 → 变成 DataFrame
df = rdd.toDF()
📌 优点:简单。
⚠️ 缺点:依赖数据样例,可能猜错。
✅ 方式二:自定义 Schema(手动定义)
“别猜了,我告诉你这张表应该有哪些字段,什么类型。”
from pyspark.sql.types import *# 步骤1:定义 Schema
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])# 步骤2:RDD 元素为元组或列表
rdd = spark.sparkContext.parallelize([("张三", 25),("李四", 30)
])# 步骤3:用 schema 创建 DataFrame
df = spark.createDataFrame(rdd, schema)
📌 优点:精确控制,适合结构复杂或数据不规范的情况。
💡 这是生产环境推荐方式。
🔄 2. DataFrame → RDD
“把标准化的快递单拆了,还原成原始包裹。”
df = spark.sql("SELECT name, age FROM users")
rdd = df.rdd # 得到 RDD[Row]
df.rdd返回的是一个RDD[Row],每个元素是一个Row对象。- 你可以继续用 RDD 的 API 处理它,比如
.map(),.filter()。
rdd.map(row => f"姓名:{row.name}, 年龄:{row.age}").collect()
📌 适用场景:需要使用低级 API 或复杂逻辑时。
✅ 总结口诀
RDD → DataFrame:要不你猜(反射),要不我说(Schema)
DataFrame → RDD:一拆就还原,变成 Row 列表
💡 实际开发中:尽量用 DataFrame,只有在必须用 RDD 时才转换。
🎉 最终总结:Spark 编程三步走 + 数据模型演进
| 阶段 | 操作 | 类比 |
|---|---|---|
| 1️⃣ 开始 | spark = SparkSession.builder...getOrCreate() | 领“游乐园通行证” |
| 2️⃣ 处理 | df = spark.read... → 处理 → df.write... | 在“数据工作台”上加工 |
| 3️⃣ 结束 | spark.stop() | 归还通行证,释放资源 |
数据模型进化:
RDD(原始包裹) → DataFrame(标准化快递单) → DataSet(智能识别包裹内容)
掌握这些,你就真正理解了 Spark SQL 的核心设计思想!🔥
### 3. SparkSQL中SQL开发规则以及DSL的API函数(目标:掌握SparkSQL中SQL开发规则)
#### 问题1:SparkSQL的SQL在Python代码中是怎么开发的?
- SparkSQL在开发SQL的时候有两种方式:①SQL代码文件(Hive一样);②Python代码文件。
- Python中开发SQL规则:
1. step1:必须先将DataFrame注册为临时视图:
df.createOrReplaceTmpView(tbname)。就是转化为sql表格
2. step2:再调用SQL对临时视图的数据进行处理:spark.sql(“SQL语句”)。
- 只写SQL,为什么不在DataGrip生成SQL文件,而要写Python文件呢?
- SQL只能对表处理:Spark中的表 <= Hive的表。
- 数据来源不来自Hive,必须先通过SparkSession读取数据变成DataFrame。
✅ Step 2:调用 SQL 对临时视图进行处理
result_df = spark.sql("SELECT city, AVG(age) FROM tbname WHERE age > 20 GROUP BY city")
| 概念 | 解释 |
|---|---|
spark.sql(...) | 这是 PySpark 提供的 API,用来执行一条 SQL 字符串。 |
| 返回值 | 仍然是一个 DataFrame!你可以继续用 DSL 处理它,或保存结果。 |
| 支持的语法 | 支持大部分标准 SQL + Spark SQL 扩展函数(如 explode, array_contains 等)。 |
| 动态性 | SQL 是字符串,可以拼接变量,实现动态查询。 |
🔄 完整示例代码
# 1. 构建 SparkSession(通行证)
from pyspark.sql import SparkSession
spark = SparkSession.builder \.appName("SQL Demo") \.master("local[*]") \.getOrCreate()# 2. 读取数据 → 得到 DataFrame
df = spark.read.csv("sales.csv", header=True, inferSchema=True)# 3. 注册为临时视图(给表起名字)
df.createOrReplaceTempView("sales_table")# 4. 写 SQL 查询它
result = spark.sql("""SELECT product,SUM(amount) AS total_sales,AVG(price) AS avg_priceFROM sales_tableWHERE date >= '2025-01-01'GROUP BY productORDER BY total_sales DESCLIMIT 10
""")# 5. 查看或保存结果
result.show()
result.write.parquet("/output/top_products")# 6. 关闭资源
spark.stop()
❓ 为什么不在 DataGrip 写 SQL 文件,而要写 Python 文件?
这是个非常好的问题!我们来对比一下:
| 场景 | DataGrip / Hive SQL 文件 | PySpark Python 文件 |
|---|---|---|
| 数据来源 | 直接操作 Hive 表、MySQL 表等 已存在的数据库表 | 可以读取 任意来源:CSV、JSON、Kafka、API、HDFS、数据库等 |
| 前置条件 | 表必须已经存在(比如 ETL 已完成) | 不需要现有表,PySpark 自己把原始数据变成表 |
| 灵活性 | 只能写 SQL | 可以混合使用:Python 逻辑 + 读文件 + 写 SQL + 再用 Python 处理 |
| 数据预处理 | 难以处理非结构化或半结构化数据 | 可以先用 Python/DSL 清洗、转换数据,再注册成表 |
| 工作流整合 | 单一 SQL 任务 | 可以构建完整的 ETL 流程(读 → 清洗 → 转换 → 分析 → 保存) |
🎯 举个实际例子说明:
假设你要分析一个 JSON 日志文件:
{"user": "A", "action": "click", "timestamp": "2025-01-01 10:00"}
{"user": "B", "action": "buy", "timestamp": "2025-01-01 10:05"}
❌ 用 DataGrip/Hive SQL 做不到:
- Hive 不方便直接读这种原始 JSON 日志。
- 你需要先用别的工具(比如 Spark)把它转成 Hive 表,才能查。
✅ 用 PySpark Python 文件轻松搞定:
# 1. 直接读 JSON
logs_df = spark.read.json("app_logs.json")# 2. 清洗数据(DSL)
cleaned_df = logs_df.filter(logs_df.action.isNotNull())# 3. 注册为临时表
cleaned_df.createOrReplaceTempView("logs")# 4. 写 SQL 分析
result = spark.sql("SELECT action, COUNT(*) FROM logs GROUP BY action")
result.show()
👉 PySpark 让你在一个脚本里完成“从原始数据到分析结果”的全流程,而不仅仅是“查表”。
✅ 总结:SparkSQL 在 Python 中的开发规则
| 规则 | 说明 |
|---|---|
| 1. 先有 DataFrame | 必须先用 spark.read 把数据加载成 DataFrame。 |
| 2. 再注册视图 | 调用 df.createOrReplaceTempView("表名"),让 DataFrame 可被 SQL 引用。 |
| 3. 最后写 SQL | 使用 spark.sql("SELECT ... FROM 表名") 执行查询,返回新的 DataFrame。 |
| 4. 结果可继续处理 | SQL 查询结果仍是 DataFrame,可 .show()、.write 或继续 DSL 操作。 |
🧠 一句话记住
SparkSQL 的 SQL 不是“操作数据库”,而是“操作内存中的 DataFrame”,
它让你可以用熟悉的 SQL 语法,分析任何来源的大数据,
而 Python 文件就是你的“全能数据分析脚本”。
这才是为什么我们选择在 Python 中写 SparkSQL,而不是只用 .sql 文件。
,数据分析简洁,灵活性差,功能局限性大,适合简单数据统计分析任务。
- Python/Scala/Ja## 【模块一:SparkSQL的介绍】
### 知识点03:【理解】SparkCore的缺点(目标:理解SparkCore的缺点)
#### 实施
- 场景:在大数据业务场景中,常处理结构化数据,来源多为结构化数据、结构化文件,结构化数据处理最方便的开发接口是SQL接口。
- 问题:SparkCore类似MapReduce的编程方式,需写代码、提交运行、构建对象实例RDD并调用函数处理,处理结构化数据不便,不能很好基于列处理,每次都需分割。
- 按行筛选、按列筛选:SparkCore:RDD:filter、map。
- 举个栗子:WordCount实现
- 数据:
hadoop 1
hive 1
spark 1
hadoop 1
hive 1
spark 1
hadoop 1
hive 1
hadoop 1
spark 2
hive 3
hadoop 4
- 需求:统计每个单词出现的次数,并按照单词个数升序排序。
- SparkCore实现:
1. 读取数据:将文件数据放入RDD中:RDD中每个元素就是文件的一行,input_rdd: RDD[str] = sc.textFile()
2. 处理数据:
rs_rdd = input_rdd
# 先分割,得到列表
.map(lambda line: re.split("\\s+",line))
# 将列表转换成元组
.map(lambda line: (line[0],int(line[1])))
# 按照Key分组聚合
.reduceByKey(lambda tmp,item: tmp+item)
# 按照个数排序
.sortBy(lambda tuple: tuple[1])
- SQL实现:
-- 将数据变成了一张表:word , numb
-- 统计每个单词出现的次数
select
word,
sum(numb) as cnt
from table
group by word
order by cnt desc
- 对比:
- SQL:简洁直观va:更灵活,功能可自定义实现,处理结构化数据的数据分析相对复杂。
- 思考:怎么能让Spark中的数据变成表呢?
- SQL:SQL是对表做处理。
- Spark支持SQL:但Spark必须要将数据变成表(数据 + Schema[表结构:字段信息])。
- SparkCore:SparkContext:RDD:将所有外部数据读取放入RDD:分布式列表:数据。
- SparkSQL:SparkSession:DataFrame:将所有外部数据读取放入DataFrame:分布式表:数据 + Schema。
- 小结:理解SparkCore的缺点
### 知识点04:【理解】SparkSQL的介绍(目标:理解SparkSQL的介绍)
#### 实施
- 问题:SparkCore只能通过普通Python/Scala/Java代码编程的方式来使用,对于统计分析的需求不是特别的友好怎么办?
- Hadoop:基于Java开发MapReduce实现分布式数据处理,不支持SQL,无法直接让数据分析师实现业务分析。
- FaceBook:会开发MapReduce的是Java工程师,会做业务分析的是数据分析师;Java工程师(MapReduce)不会做业务分析,数据分析师(SQL)不会Java,无法做数据分析。
- 解决:让Java工程师基于Java开发一个程序,能将数据分析师写的SQL转换为MapReduce程序。
- Hive:一群大数据开发者,将Hive源码独立出来,将底层解析成MapReduce的代码替换成解析变成SparkCore。
- Shark:基于Hive和Spark源代码集成设计的工具,高度类似Hive,计算引擎为Spark。
- 发展:
- Spark1.0:DB团队将Shark的部分源码集成到Spark软件中,重构设计,更名为SparkSQL;SparkCore:RDD;SparkSQL:SchemaRDD。
- Spark1.3:参考Python的Pandas设计,引入DataFrame;SparkSQL:DataFrame。
- Spark1.6:参考Flink的设计,引入DataSet的设计;SparkSQL:DataSet。
- Spark2.0:将DF作为DS的一种特殊形式,只保留DataSet数据结构。
- 学习的PySpark:上层看不到DataSet,全部是DataFrame,底层都是DataSet;设计:为让会Python的开发者快速上手PySpark,PySpark里只能看到DataFrame;注意:看到的是DataFrame,底层实际是DataSet。
- 定义:Spark SQL is Apache Spark's module for working with structured data.(SparkSQL是Spark中专门为结构化数据计算的模块,基于SparkCore之上)。
- 区别:SparkSQL处理结构化数据,SparkCore可处理任意类型数据。
- 结构化数据(数据分析场景):SparkSQL。
- 非结构化/半结构化数据(数据清洗场景):SparkCore。
- 联系:SparkSQL底层还是SparkCore,原理一样。
- 功能:提供SQL和DSL(Python、Scala、Java)开发接口,将SQL或DSL语句转换为SparkCore程序,实现结构化数据处理。
- SQL:写SQL对DataFrame做处理,需将DataFrame注册成表,处理简单需求。
- DSL:调用函数对DataFrame做处理,高度类似RDD调用算子处理,处理复杂需求。
- 特点(统一化):
- Integrated:集成大多数开发接口(SQL、DSL-Python、Scala、Java),既能写SQL做分析,也能写Python、Scala、Java代码做分析。
- Uniform Data Access:统一化数据访问(SparkSQL内部集成大多数常见结构化数据源的API,直接调方法即可读写)。
- 读数据:df1 = spark.read.csv/orc/jdbc/json/table()
- 写数据:df1.write.csv/orc/jdbc/json/table()
- Hive Integration:Hive的高度集成(SparkSQL的语法与Hive的语法95%以上一样),95%兼容Hive语法,部分特殊地方有区别(函数更多、语法准备度更高、限制更低)。
- Standard Connectivity:标准的数据连接接口(支持所有常用数据接口的方式执行SparkSQL的程序)。
- 理解SparkSQL:当做Hive Plus升级版。
- Hive:
- 测试:DataGrip连接HiveServer2写SQL做测试。
- 生产:SQL文件(hive -f / -e SQL文件/SQL语句);数据接口(Python:pyhive[10000] / Java:JDBC)。
- SparkSQL:
- 测试:SQL(DataGrip连接Spark ThriftServer写SQL做测试);DSL(Pycharm中直接运行测试)。
- 生产:SQL文件(spark-sql -f / -e SQL文件/SQL语句);DSL开发(spark-submit);数据接口(Python:pyhive[10001] / Java:JDBC)。
- 应用:整个Spark最主要且广泛应用的模块是SparkSQL,官方目前发展推荐SparkSQL;Spark社区目前发展是想用SparkSQL统一整个Spark接口;SparkSQL支持SQL也支持DSL(既可以写SQL也可以写代码(Python、Java、Scala));SparkSQL支持离线也支持实时(离线和实时代码开发规则基本一致);SparkSQL支持ML lib模块的接口开发(可直接用SparkSQL模式调用机器学习模块)。
- 小结:理解SparkSQL的介绍
### 知识点05:【理解】SQL与Core对比(目标:理解Spark中SparkSQL与SparkCore对比)
#### 实施
SparkSQL开发方式与SparkCore开发方式类似,但有区别,像但是不一样。
SparkCore:RDD、SparkContext;SparkSQL:Frame Data、SparkSession;领域:数据仓库、数据分析。
|对比维度|SparkCore|SparkSQL|
|----|----|----|
|处理场景|结构化、非结构化|结构化|
|处理类型|离线|离线、实时|
|数据来源|文件、数据库等|结构化数据(结构化文件、数据库)|
|开发语言|Python、Java、Scala|SQL、DSL-Python、Java、Scala|
|驱动对象|sc = SparkContext|spark = SparkSession|
|数据抽象|RDD|DataFrame/DataSet|
|抽象设计|分布式列表|分布式表|
|设计差异|仅数据|数据 + Schema(表结构)|
|处理方式|按行|按行、按列|
- 小结:理解Spark中SparkSQL与SparkCore对比
### 知识点06:【掌握】SparkSQL的编程规则(目标:掌握SparkSQL的编程规则)
#### 实施
- 开发方式:Python代码中实现SQL和DSL的开发。
- 方式一:基于DataGrip开发SparkSQL的SQL文件,类似Hive的用法。
- 方式二:基于Pycharm开发SparkSQL的Python文件,Spark特有的方式。
- 开发流程:
1. step1:在Python代码中构建驱动对象:SparkSession。
2. step2:读取数据变成DataFrame,使用SQL或者DSL进行处理,保存处理好的结果。
3. step3:关闭驱动对象:stop。
- 驱动对象:
- 任何一个Spark程序都需包含一个SparkContext对象,用于构建Driver进程负责的功能。
- SparkCore:
```
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master).set(Key, Value)
sc = SparkContext(conf=conf)
```
- SparkSQL:SparkSession(内部包含一个SparkContext)
```
SparkSession:用于代替SparkContext,Spark改成基于表(数据 + Schema)来做处理
注意:SparkSQL的本质是在SparkCore基础之上新增了一些东西
规则:任何一个Spark程序一定会包含一个SparkContext
设计:SparkSession必须包含原来SparkContext负责的内容,还会多一些内容,每个SparkSession内部都会包含一个SparkContext
```
- 如果按这种设计构建SparkSession:
```
conf = SparkConf().setMaster(运行模式).setAppName(程序名字).set(配置, 值)
sc = SparkContext(conf=conf) # 构建一个SparkContext
spark = SparkSession(context=sc) # 构建一个SparkSession
```
- 建造者模式:只将用户需要关心的交给用户做,其他由建造者自动实现;理解:原生方式像自己装修(自己设计、买材料、施工),建造者模式像找装修公司(告知装修需求,剩余由装修公司完成)。
- 基于建造者模式构建SparkSession对象:
```
spark = ( SparkSession
# 获取一个建造器
.builder
# 配置
.appName("APP Name") # 设置程序名字
.master("local[2]") # 设置程序运行模式
.config("spark.sql.shuffle.partitions", "2")
# 设置一些属性的值
# 构建
.getOrCreate()
)
```
- SQL规则:
- 流程:
1. step1:先将DataFrame注册成一张临时的视图(给它取个表名)。
2. step2:使用SQL对临时视图进行处理,得到新的DataFrame。
- 使用:
- 创建视图:将这个DataFrame构建一个表名,方便SQL做处理,df.createOrReplaceTempView("people")
- 调用SQL:
```
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
```
- DSL规则:
- 类似RDD的编程方式:调用算子函数来实现处理。
- 流程:直接调用DataFrame的DSL函数进行处理。
- 原生DSL函数(将SQL语法变成了函数):select、where、groupBy、orderBy、limit、count、agg;比写SQL更灵活,不用考虑SQL执行顺序。
- 部分RDD算子函数:repartition、coalesce、persist、unpersist、checkpoint。
- 使用:
```
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
df.select("name").show()
# | name|
# |Michael|
# | Andy|
# | Justin|
# +-------+
df.select(df['name'], df['age'] + 1).show() => select name, age + 1 from df
# | name|(age + 1)|
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
df.where(df['age'] > 21).show() => select * from df where age > 21
# |age|name|
# | 30|Andy|
# +---+----+
df.groupBy("age").count().show() => select age, count(1) from df group by age
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
df.orderBy("age", ascending=False) # 或者
df.sort(df.age.desc())
df.limit(5)
total = df.count() print(f"总共有 {total} 条数据")
select 选列,
where 筛选符合条件的数据
groupBy 分组,
agg 分组后计算特定数值
orderBy 排序
limit 查看前几行数据
count 统计数量
SparkSQL 编程规则与核心知识点详解
【模块一:SparkSQL 编程基础】
知识点06:【掌握】SparkSQL 在 Python 中的编程规则
编程规则测试代码(不能运行,仅用于规则讲解)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession"""
Description : SparkSQL 在 Python 代码中的编程规则
SourceFile : 01.pyspark_sql_programing_rule
Author : Frank
"""if __name__ == '__main__':# todo:0 - 设置系统环境变量os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'# todo:1 - 构建驱动对象: SparkContext => SparkSession"""方式一:传统方式(不推荐)conf = SparkConf().setAppName("SparkSQLAPP").setMaster("local[2]").set("配置名称", "配置的值")sc = SparkContext(conf=conf)spark = SparkSession(sparkContext=sc)"""# 推荐方式:使用建造者模式构建 SparkSessionspark = (SparkSession.builder.appName("SparkSQLApp").master("local[2]").config("配置名称", "配置的值").getOrCreate())# 从 SparkSession 获取 SparkContextsc = spark.sparkContext# todo:2 - 实现数据处理: RDD => DataFrame# step1: 读取数据并转换为 DataFramejson_df = spark.read.json(path="../datas/resources/people.json")jdbc_df = spark.read.jdbc(url="jdbc:mysql://node1:3306",table="dbname.tbname",properties={'user': 'root', 'password': '123456'})# step2: 处理数据 —— 使用 SQL 或 DSL""" 方式一:SQL(简洁直观,灵活性差) """json_df.createOrReplaceTempView("tmp_view_name")sql_rs = spark.sql("SELECT ... FROM tmp_view_name")""" 方式二:DSL(分析复杂,灵活性高) """import pyspark.sql.functions as Fdsl_rs = (jdbc_df.select(jdbc_df['col1']) # 使用 DataFrame 引用列.select(F.col("col1"), F.col("col2"), F.substring(F.col("col3"), 1, 3)) # 使用 SQL 函数引用列.where("id > 2").groupBy("分组字段").agg().orderBy("排序字段").limit(10).groupBy().count().orderBy())# step3: 保存结果或输出sql_rs.show() # 默认打印前 20 行jdbc_df.write.csv(path="../datas/output/output1", sep='\t')# todo:3 - 关闭驱动对象spark.stop()
小结
✅ 掌握 SparkSQL 的基本编程流程:
- 设置环境变量;
- 构建
SparkSession; - 读取数据 → 转换(SQL/DSL)→ 输出;
- 关闭
spark.stop()。
知识点07:【掌握】SparkSQL 实现 WordCount
实施步骤
Step1: 构建 SparkSession
spark = SparkSession \.builder \.appName("SparkSQLAppName") \.master("local[2]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()spark.sparkContext.setLogLevel("WARN")
Step2: 读取数据、转换、输出
读取数据
input_df = spark.read.text("../datas/wordcount")# 打印 Schema
input_df.printSchema()
# 输出:
# root
# |-- value: string (nullable = true)# 打印数据
input_df.show(truncate=False)
# 输出示例:
# |value|
# |hadoop spark|
# |hive hadoop spark spark|
# ...
方式一:使用 SQL 实现 WordCount
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession"""
Description : SparkSQL 基于 SQL 实现 WordCount
SourceFile : 02.pyspark_sql_wordcount_sql
Author : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount with SQL").master("local[2]").config("spark.default.parallelism", 2).getOrCreate())# 读取文本数据input_df = spark.read.text("../datas/wordcount/word.txt")# 注册为临时视图input_df.createOrReplaceTempView("tmp_view_words")# 使用 SQL 进行词频统计sql_rs = spark.sql("""WITH t1 AS (SELECT explode(split(value, ' ')) AS wordFROM tmp_view_words)SELECTword,count(1) AS cntFROM t1GROUP BY wordORDER BY cnt DESC""")# 显示结果sql_rs.show()# 关闭spark.stop()
方式二:使用 DSL 实现 WordCount
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F"""
Description : SparkSQL 基于 DSL 实现 WordCount
SourceFile : 03.pyspark_sql_wordcount_dsl.py
Author : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount DSL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取数据input_df = spark.read.text("../datas/wordcount/word.txt")# 使用 DSL 处理dsl_rs = (input_df.select(F.explode(F.split(F.col("value"), " ")).alias("word")).groupBy(F.col("word")).agg(F.count(F.col("word")).alias("cnt")).orderBy(F.col("cnt").desc()))# 输出结果dsl_rs.show()dsl_rs.write.json(path="../datas/output/output1")# 暂停(调试用)import timetime.sleep(10000000)spark.stop()
输出结果
+------+---+
| word |cnt|
+------+---+
| spark| 10|
| hue | 9|
|hadoop| 7|
|hbase | 6|
| hive | 3|
+------+---+
Step3: 关闭 SparkSession
spark.stop()
小结
✅ 掌握 SparkSQL 实现 WordCount 的两种方式:
- SQL:适合熟悉 SQL 的用户,简洁直观;
- DSL:代码更灵活,便于复杂逻辑处理。
【模块二:DataFrame 的转换】
知识点08:【理解】自动推断类型转换 DataFrame
问题0: RDD 与 DataFrame 的区别?
| 类型 | 数据结构 | 是否有 Schema | 是否支持泛型 | 示例 |
|---|---|---|---|---|
| RDD | 分布式列表 | ❌ 无 | ✅ 支持 | RDD[str], RDD[int] |
| DataFrame | 分布式表 | ✅ 有 | ❌ 不支持 | 每行是 Row 类型 |
| DataSet | 分布式表 | ✅ 有 | ✅ 支持(Scala) | DataSet[Row] = DataFrame |
🔔 注:PySpark 中 DataFrame 底层是
DataSet[Row]。
问题1: 半结构化数据能否直接用 SparkSQL 处理?
❌ 不能直接处理。
✅ 需先用 SparkCore(RDD)将非结构化/半结构化数据转为结构化,再转为 DataFrame。
问题2: 如何将 RDD 转换为 DataFrame?
方式一:反射推断 Schema(自动推断)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import Row"""
Description : 使用反射自动推断 Schema 将 RDD 转为 DataFrame
SourceFile : 004.pyspark_sql_rdd2df_reflect.py
Author : Frank
Date : 2023/5/23
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.master("local[2]").appName("RDD to DF via Reflection").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取原始数据movie_str_rdd = spark.sparkContext.textFile("../datas/movie/u.data")# 转换:字符串 → 列表 → 过滤 → Rowmovie_row_rdd = (movie_str_rdd.map(lambda line: re.split(r"\s+", line)).filter(lambda item: len(item) == 4).map(lambda item: Row(userid=item[0],movieid=item[1],rate=int(item[2]),ts=int(item[3]))))# 创建 DataFrame(自动推断 Schema)df = spark.createDataFrame(movie_row_rdd)df.printSchema()df.show()spark.stop()
输出 Schema
root|-- userid: string (nullable = true)|-- movieid: string (nullable = true)|-- rate: integer (nullable = false)|-- ts: long (nullable = false)
输出数据
|userid|movieid|rate| ts|
|-----|-------|----|---------|
| 196| 242| 3|881250949|
| 186| 302| 3|891717742|
| 22| 377| 1|878887116|
| 244| 51| 2|880606923|
小结
✅ 理解自动推断类型转换 DataFrame:
- 将 RDD 元素转为
Row类型; - Spark 自动分析字段名和类型;
- 适用于结构清晰、字段明确的数据。
知识点09:【理解】自定义 Schema 转换 DataFrame
步骤说明
- 将 RDD 转为
tuple或list; - 定义
StructType和StructField构建 Schema; - 使用
spark.createDataFrame(rdd, schema)。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType"""
Description : 手动定义 Schema 将 RDD 转为 DataFrame
SourceFile : 04.pyspark_sql_rdd2df.py
Author : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取数据movie_rdd = spark.sparkContext.textFile("../datas/movie/u.data")# 转为元组 RDDtuple_rdd = (movie_rdd.map(lambda line: re.split(r'\s+', line)).map(lambda item: (item[0], item[1], float(item[2]), int(item[3]))))# 自定义 Schemauser_schema = StructType([StructField("userid", StringType(), True),StructField("movieid", StringType(), True),StructField("rate", DoubleType(), False),StructField("ts", LongType(), True)])# 创建 DataFramedf2 = spark.createDataFrame(tuple_rdd, user_schema)df2.printSchema()df2.show()spark.stop()
小结
✅ 理解自定义 Schema 转换:
- 更精确控制字段类型和是否可空;
- 适用于类型不一致或需要强类型约束的场景。
知识点10:【掌握】指定列名称转换 DataFrame
使用 toDF() 方法
data_frame = rdd.toDF(["col1", "col2", "col3", "col4"])
完整示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
from pyspark.sql import SparkSession"""
Description : 使用 toDF 指定列名转换 RDD 为 DataFrame
SourceFile : 05.pyspark_sql_rdd_with_df.py
Author : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取并转换为元组 RDDtuple_rdd = (spark.sparkContext.textFile("../datas/movie/u.data").map(lambda line: re.split(r'\s+', line)).map(lambda item: (item[0], item[1], float(item[2]), int(item[3]))))# 使用 toDF 指定列名df = tuple_rdd.toDF(['userId', 'movieId', 'rate', 'ts'])df.printSchema()df.show()# 将 DataFrame 转回 RDDdf_rdd = df.rddprint("前5行:", df_rdd.take(5))print("第一行:", df_rdd.first())spark.stop()
小结
✅ 掌握 toDF() 方法:
- 简洁高效;
- 无需手动定义 Schema;
- 适用于元组或列表类型 RDD。
【模块三:SQL 和 DSL 的使用】
知识点11:【掌握】SQL 开发的使用
规则
DataFrame.createOrReplaceTempView("视图名"):注册临时视图;spark.sql("SQL语句"):执行 SQL 查询。
测试数据
emp.tsv:员工数据(id, name, sal, jiangjin, deptno)dept.csv:部门数据(deptno, dname, loc)
需求
查询每个部门薪资最高的前两名员工信息及其部门名称。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession"""
Description : 使用 SQL 实现复杂查询
SourceFile : 0506.pyspark_sql_rule_sql.py
Author : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取并转换 emp 数据emp_df = (spark.sparkContext.textFile("../datas/emp/emp.tsv").map(lambda line: line.split("\t")).map(lambda item: (item[0], item[1], float(item[2]), float(item[3]), item[4])).toDF(["empid", "ename", "sal", "jiangjin", "deptno"]))# 读取并转换 dept 数据dept_df = (spark.sparkContext.textFile("../datas/emp/dept.csv").map(lambda line: line.split(",")).map(lambda item: (item[0], item[1], item[2])).toDF(["deptno", "dname", "loc"]))# 注册临时视图emp_df.createOrReplaceTempView("tmp_view_emp")dept_df.createOrReplaceTempView("tmp_view_dept")# 执行 SQL 查询rs_df = spark.sql("""WITH tmp1 AS (SELECT *,row_number() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rnFROM tmp_view_emp),tmp2 AS (SELECT * FROM tmp1 WHERE rn < 3)SELECT tmp2.*, b.dnameFROM tmp2JOIN tmp_view_dept b ON tmp2.deptno = b.deptno""")rs_df.show()spark.stop()
输出结果
|empid|ename| sal|jiangjin|deptno|rn| dname|
|-----|-----|-----|--------|------|--|---------|
| 7788|SCOTT|3000.0| 0.0| 20| 1| RESEARCH|
| 7902| FORD|3000.0| 0.0| 20| 2| RESEARCH|
| 7839| KING|5000.0| 0.0| 10| 1|ACCOUNTING|
| 7782|CLARK|2450.0| 0.0| 10| 2|ACCOUNTING|
| 7698|BLAKE|2850.0| 0.0| 30| 1| SALES|
| 7499|ALLEN|1600.0| 300.0| 30| 2| SALES|
小结
✅ 掌握 SQL 在 SparkSQL 中的使用:
- 语法与 Hive 高度兼容;
- 适合复杂分析和窗口函数。
知识点12:【理解】DSL 开发中 API 函数的使用
DSL 函数分类
| 类别 | 函数示例 |
|---|---|
| 基础函数 | count, collect, take, first, head, tail |
| 基本算子 | foreach, distinct, union, coalesce, repartition |
| 持久化算子 | cache, persist, unpersist |
| 其他工具 | columns, schema, rdd, printSchema |
- 基础函数:count/collect/take/first、head/tail。
- 基本算子:foreach/foreachPartition、distinct/union/unionAll、coalesce/repartition。
- 持久化算子:cache/persist、unpersist。
- 其他算子:columns、schema/rdd/printSchema。
- 各函数说明:
- count:统计行数。
- collect:将DataFrame转换成一个数组。
- take:取DataFrame中前N行的数据。
- first:取DataFrame中第一行的数据。
- head:默认取DataFrame中第一行的数据,可指定返回前N行的数据。
- tail:可指定返回后N行的数据。
- foreach:对DataFrame中每条数据进行处理,没有返回值。
- foreachPartition:对DataFrame中每个分区数据进行处理,没有返回值。
- distinct:对DataFrame中每条数据进行去重处理。
- union/unionAll:实现两个DataFrame的合并。
- coalesce/repartition:调整DataFrame的分区数。
- cache/persist:对DataFrame进行缓存。
- unpersist:取消DataFrame的缓存。
- columns:返回DataFrame中的所有列名。
- schema:返回DataFrame中Schema的信息。
- rdd:返回DataFrame中的数据放入RDD中。
- printSchema:打印DataFrame的Schema信息。
完整示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark import StorageLevel
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = SparkSession \.builder \.appName("MovieApp") \.master("local[2]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()people_df = spark.read.json("../datas/resources/people.json")# 基本操作print(f"总行数: {people_df.count()}")print(f"全部数据: {people_df.collect()}")print(f"前两行: {people_df.take(2)}")print(f"第一行: {people_df.first()}")print(f"最后一行: {people_df.tail(1)}")people_df.foreach(lambda row: print(row))people_df.foreachPartition(lambda part: print(*part))# 合并与去重people_df_other = spark.read.json("../datas/resources/people.json")people_df.union(people_df_other).show()people_df.unionAll(people_df_other).show()people_df.unionAll(people_df_other).distinct().show()# 分区调整print(f"原分区数: {people_df.rdd.getNumPartitions()}")print(f"合并后: {people_df.coalesce(1).rdd.getNumPartitions()}")print(f"重分区后: {people_df.repartition(4).rdd.getNumPartitions()}")# 持久化people_df.cache()people_df.persist(StorageLevel.MEMORY_AND_DISK_2)people_df.unpersist(blocking=True)# 元信息print(f"列名: {people_df.columns}")print(f"Schema: {people_df.schema}")people_df.printSchema()spark.stop()
小结
✅ 理解 DSL 中常用 API 函数:
- 结合了 RDD 算子与 SQL 语法优点;
- 更适合复杂数据处理流程。
总结
| 知识点 | 目标 | 关键技能 |
|---|---|---|
| 06 | 掌握编程规则 | 环境变量、SparkSession 构建、SQL/DSL 流程 |
| 07 | 掌握 WordCount | SQL 与 DSL 实现词频统计 |
| 08 | 理解自动推断 | Row + createDataFrame 自动推断 Schema |
| 09 | 理解自定义 Schema | StructType + StructField 手动定义结构 |
| 10 | 掌握 toDF | rdd.toDF(["col1",...]) 快速转换 |
| 11 | 掌握 SQL 使用 | createOrReplaceTempView + spark.sql() |
| 12 | 理解 DSL API | 常用函数如 count, union, cache 等 |
✅ 建议学习路径:
- 先掌握
SparkSession构建; - 学会读取数据并查看
printSchema(); - 练习 SQL 和 DSL 实现 WordCount;
- 掌握 RDD → DataFrame 三种方式;
- 熟练使用
toDF和createOrReplaceTempView。
✅ 提示:所有代码均基于 PySpark,实际运行需配置好 Spark 环境。
