当前位置: 首页 > news >正文

day07 spark sql

# SparkSQL设计及入门
## 知识点01:课程回顾
1. Spark程序运行的流程?(Day06_SparkCore高级特性)
2. 什么是宽窄依赖?
3. Spark的Shuffle设计?
4. 开发:RDD算子;理论:Spark设计、面试

## 知识点02:课程目标
### 1. SparkSQL介绍(目标:掌握SparkSQL代码开发规则)
#### 问题1:为什么要学SparkSQL,SparkCore与SparkSQL有什么区别?
- 原因:对于结构化数据分析来讲,SQL是最合适的一门语言,SparkCore不支持SQL分析。
- 区别:
  - SparkCore:RDD(分布式列表),数据为行,驱动对象为SparkContext。
- SparkSQL:DataFrame(分布式表),数据+Schema(行、列),驱动对象为SparkSession。

#### 问题2:SparkSQL特点以及应用场景?
- 特点(统一化):
- 开发接口:SQL、DSL(Python、Scala、Java)。
- 分析场景:离线、实时数据分析。
- 功能层面:数据分析、机器学习。
- 场景:
- SparkCore:数据清洗ETL,将非结构化或半结构化数据转换为结构化数据。
- SparkSQL:数据分析OLAP,对结构化数据文件、数据表进行统计分析,代替项目Presto角色。

#### 问题3:SparkSQL编程规则?
- todo:1-构建驱动对象:SparkSession
```
spark = ( SparkSession
.builder          申请spark进入权限
.appName()   应用程序名称
.master()        指定是本机模式还是集群模式
.config()         执行器内存和分区
.config()
.getOrCreate()
)

```

- todo:2-实现数据处理:DataFrame
1. 读取数据:df = spark.read.csv/jdbc/orc/text/json()   这几种格式都可以
2. 处理数据:
- SQL:①给DF取一个表名;②调用SQL进行处理
- df.createOrReplaceTmpView("tbname")                在df中创建数据表
- rs_sql = spark.sql("select …… from tbname")       使用sql语句进行增删改查操作
 - DSL:直接调用DSL函数处理 
- rs_dsl = df.select().where().groupBy().agg().orderBy().limit().join()        调用 dsl函数进行便                                                                                                                             捷增删改查处理,当                                                                                                        然,作用和sql语句几乎是重复的
3. 保存结果:
- 打印:rs_sql.show() 显示前几行
- 保存:rs_dsl.write.csv/jdbc/orc/text/json()  保存为什么格式可以自由选择

- todo:3-关闭驱动对象:spark.stop()

### 2. DataFrame设计以及转换(目标:理解DataFrame设计以及与RDD的区别)
#### 问题1:RDD、DataFrame、DataSet有什么区别?
- RDD:分布式列表,只有数据没有Schema,支持泛型。
- DataFrame:分布式表,有数据也有Schema,不支持泛型。
- DataSet:分布式表,有数据也有Schema,支持泛型。


🧩 第二部分:DataFrame 设计与转换

🎯 问题1:RDD、DataFrame、DataSet 有什么区别?

我们用一个“快递分拣中心”的比喻来理解三者。

概念类比特点
RDD一车没贴标签的包裹只知道有东西,不知道里面是什么
DataFrame贴了标准标签的包裹流水线知道每个包裹的“结构”(如:收件人、电话、地址)
DataSet带类型检查的智能包裹流水线不仅知道结构,还知道“这是手机”还是“是书”

🔹 1. RDD(Resilient Distributed Dataset)

  • 本质:分布式列表,像一个 List<T> 分布在多台机器上。
  • 特点
    • 没有 Schema(结构):Spark 不知道每条数据长什么样。
    • 支持 泛型:你可以放任何类型,比如 RDD[String]RDD[Person]
    • 运行时出错:比如你写 .map(x => x.name),但某个元素没有 name 字段,程序运行时才报错。
    • 性能较低:因为 Spark 不知道数据结构,无法优化。

📌 就像一堆没分类的包裹,你要一个个拆开看才知道内容。


🔹 2. DataFrame

  • 本质:分布式表,像一张 Excel 表,有行有列。
  • 特点
    • Schema:每列都有名字和类型(如 name: String, age: Int)。
    • 不支持泛型:在 Scala/Java 中,它本质上是 Dataset[Row],所有数据都封装在 Row 对象里。
    • SQL 友好:可以直接用 spark.sql("SELECT ...") 查询。
    • 性能高:Spark 可以利用 Schema 进行优化(如谓词下推、列剪裁)。
    • 在 Python/SQL 中最常用。

📌 就像所有包裹都贴了标准化的快递单,机器可以自动读取“收件人”、“电话”等字段进行分拣。


🔹 3. DataSet

  • 本质:带类型的 DataFrame(主要在 Scala 中使用)。
  • 特点
    • Schema
    • 支持 泛型 ✅(如 Dataset[Person]
    • 编译时检查:如果你写 .map(person => person.height),但 Person 类没有 height代码写完就报错,不用等到运行。
    • 性能最好,但只在 Scala/Java 中可用,Python/R 不支持。

📌 就像智能分拣系统不仅读快递单,还知道“这是易碎品”或“是生鲜”,自动走不同通道。


✅ 三者对比表

特性RDDDataFrameDataSet
是否有 Schema❌ 无✅ 有✅ 有
是否支持泛型✅ 支持❌ 不支持(Python/SQL)✅ 支持(Scala/Java)
编译时检查❌ 运行时报错❌ 运行时报错✅ 编译时报错
性能较低最高
SQL 支持
主要使用语言所有Python, SQL, Java, ScalaScala, Java

💡 总结:优先用 DataFrame,Scala 用户可考虑 DataSet,RDD 仅用于特殊场景


🎯 问题2:RDD 和 DataFrame 如何互相转换?

我们用“快递单重贴标签”的比喻来理解。
- 理论:
- 方式一:反射推断,要求RDD中每个元素类型必须为Row类型。
- 方式二:自定义Schema,RDD元素类型必须为元组或者列表,且基于RDD数据结构自己定义一个Schema。
- 实现:
- rdd.toDF(【列名】)
- df.rdd =》 RDD[Row]


🔄 1. RDD → DataFrame

有两种方式,就像你有两种方法给一堆旧包裹贴新标签:

✅ 方式一:反射推断(自动识别)

“你看着办,根据包裹里的内容,自动猜出字段名和类型。”

from pyspark.sql import Row# RDD 中每个元素必须是 Row 类型
rdd = spark.sparkContext.parallelize([Row(name="张三", age=25),Row(name="李四", age=30)
])# 自动推断结构 → 变成 DataFrame
df = rdd.toDF()

📌 优点:简单。
⚠️ 缺点:依赖数据样例,可能猜错。


✅ 方式二:自定义 Schema(手动定义)

“别猜了,我告诉你这张表应该有哪些字段,什么类型。”

from pyspark.sql.types import *# 步骤1:定义 Schema
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])# 步骤2:RDD 元素为元组或列表
rdd = spark.sparkContext.parallelize([("张三", 25),("李四", 30)
])# 步骤3:用 schema 创建 DataFrame
df = spark.createDataFrame(rdd, schema)

📌 优点:精确控制,适合结构复杂或数据不规范的情况。
💡 这是生产环境推荐方式。


🔄 2. DataFrame → RDD

“把标准化的快递单拆了,还原成原始包裹。”

df = spark.sql("SELECT name, age FROM users")
rdd = df.rdd  # 得到 RDD[Row]
  • df.rdd 返回的是一个 RDD[Row],每个元素是一个 Row 对象。
  • 你可以继续用 RDD 的 API 处理它,比如 .map(), .filter()
rdd.map(row => f"姓名:{row.name}, 年龄:{row.age}").collect()

📌 适用场景:需要使用低级 API 或复杂逻辑时。


✅ 总结口诀

RDD → DataFrame:要不你猜(反射),要不我说(Schema)
DataFrame → RDD:一拆就还原,变成 Row 列表

💡 实际开发中:尽量用 DataFrame,只有在必须用 RDD 时才转换。


🎉 最终总结:Spark 编程三步走 + 数据模型演进

阶段操作类比
1️⃣ 开始spark = SparkSession.builder...getOrCreate()领“游乐园通行证”
2️⃣ 处理df = spark.read... → 处理 → df.write...在“数据工作台”上加工
3️⃣ 结束spark.stop()归还通行证,释放资源

数据模型进化

RDD(原始包裹) → DataFrame(标准化快递单) → DataSet(智能识别包裹内容)

掌握这些,你就真正理解了 Spark SQL 的核心设计思想!🔥

### 3. SparkSQL中SQL开发规则以及DSL的API函数(目标:掌握SparkSQL中SQL开发规则)
#### 问题1:SparkSQL的SQL在Python代码中是怎么开发的?
- SparkSQL在开发SQL的时候有两种方式:①SQL代码文件(Hive一样);②Python代码文件。
- Python中开发SQL规则:
1. step1:必须先将DataFrame注册为临时视图:

   df.createOrReplaceTmpView(tbname)。就是转化为sql表格
2. step2:再调用SQL对临时视图的数据进行处理:spark.sql(“SQL语句”)。
- 只写SQL,为什么不在DataGrip生成SQL文件,而要写Python文件呢?
- SQL只能对表处理:Spark中的表 <= Hive的表。
- 数据来源不来自Hive,必须先通过SparkSession读取数据变成DataFrame。


✅ Step 2:调用 SQL 对临时视图进行处理

result_df = spark.sql("SELECT city, AVG(age) FROM tbname WHERE age > 20 GROUP BY city")
概念解释
spark.sql(...)这是 PySpark 提供的 API,用来执行一条 SQL 字符串。
返回值仍然是一个 DataFrame!你可以继续用 DSL 处理它,或保存结果。
支持的语法支持大部分标准 SQL + Spark SQL 扩展函数(如 explode, array_contains 等)。
动态性SQL 是字符串,可以拼接变量,实现动态查询。

🔄 完整示例代码

# 1. 构建 SparkSession(通行证)
from pyspark.sql import SparkSession
spark = SparkSession.builder \.appName("SQL Demo") \.master("local[*]") \.getOrCreate()# 2. 读取数据 → 得到 DataFrame
df = spark.read.csv("sales.csv", header=True, inferSchema=True)# 3. 注册为临时视图(给表起名字)
df.createOrReplaceTempView("sales_table")# 4. 写 SQL 查询它
result = spark.sql("""SELECT product,SUM(amount) AS total_sales,AVG(price) AS avg_priceFROM sales_tableWHERE date >= '2025-01-01'GROUP BY productORDER BY total_sales DESCLIMIT 10
""")# 5. 查看或保存结果
result.show()
result.write.parquet("/output/top_products")# 6. 关闭资源
spark.stop()

❓ 为什么不在 DataGrip 写 SQL 文件,而要写 Python 文件?

这是个非常好的问题!我们来对比一下:

场景DataGrip / Hive SQL 文件PySpark Python 文件
数据来源直接操作 Hive 表、MySQL 表等 已存在的数据库表可以读取 任意来源:CSV、JSON、Kafka、API、HDFS、数据库等
前置条件表必须已经存在(比如 ETL 已完成)不需要现有表,PySpark 自己把原始数据变成表
灵活性只能写 SQL可以混合使用:Python 逻辑 + 读文件 + 写 SQL + 再用 Python 处理
数据预处理难以处理非结构化或半结构化数据可以先用 Python/DSL 清洗、转换数据,再注册成表
工作流整合单一 SQL 任务可以构建完整的 ETL 流程(读 → 清洗 → 转换 → 分析 → 保存)

🎯 举个实际例子说明:

假设你要分析一个 JSON 日志文件:

{"user": "A", "action": "click", "timestamp": "2025-01-01 10:00"}
{"user": "B", "action": "buy",  "timestamp": "2025-01-01 10:05"}
❌ 用 DataGrip/Hive SQL 做不到:
  • Hive 不方便直接读这种原始 JSON 日志。
  • 你需要先用别的工具(比如 Spark)把它转成 Hive 表,才能查。
✅ 用 PySpark Python 文件轻松搞定:
# 1. 直接读 JSON
logs_df = spark.read.json("app_logs.json")# 2. 清洗数据(DSL)
cleaned_df = logs_df.filter(logs_df.action.isNotNull())# 3. 注册为临时表
cleaned_df.createOrReplaceTempView("logs")# 4. 写 SQL 分析
result = spark.sql("SELECT action, COUNT(*) FROM logs GROUP BY action")
result.show()

👉 PySpark 让你在一个脚本里完成“从原始数据到分析结果”的全流程,而不仅仅是“查表”。


✅ 总结:SparkSQL 在 Python 中的开发规则

规则说明
1. 先有 DataFrame必须先用 spark.read 把数据加载成 DataFrame。
2. 再注册视图调用 df.createOrReplaceTempView("表名"),让 DataFrame 可被 SQL 引用。
3. 最后写 SQL使用 spark.sql("SELECT ... FROM 表名") 执行查询,返回新的 DataFrame。
4. 结果可继续处理SQL 查询结果仍是 DataFrame,可 .show().write 或继续 DSL 操作。

🧠 一句话记住

SparkSQL 的 SQL 不是“操作数据库”,而是“操作内存中的 DataFrame”
它让你可以用熟悉的 SQL 语法,分析任何来源的大数据,
而 Python 文件就是你的“全能数据分析脚本”。

这才是为什么我们选择在 Python 中写 SparkSQL,而不是只用 .sql 文件。

,数据分析简洁,灵活性差,功能局限性大,适合简单数据统计分析任务。
- Python/Scala/Ja## 【模块一:SparkSQL的介绍】
### 知识点03:【理解】SparkCore的缺点(目标:理解SparkCore的缺点)
#### 实施
- 场景:在大数据业务场景中,常处理结构化数据,来源多为结构化数据、结构化文件,结构化数据处理最方便的开发接口是SQL接口。
- 问题:SparkCore类似MapReduce的编程方式,需写代码、提交运行、构建对象实例RDD并调用函数处理,处理结构化数据不便,不能很好基于列处理,每次都需分割。
- 按行筛选、按列筛选:SparkCore:RDD:filter、map。
- 举个栗子:WordCount实现
- 数据:
hadoop 1
hive 1
spark 1
hadoop 1
hive 1
spark 1
hadoop 1
hive 1
hadoop 1
spark 2
hive 3
hadoop 4
- 需求:统计每个单词出现的次数,并按照单词个数升序排序。
- SparkCore实现:
1. 读取数据:将文件数据放入RDD中:RDD中每个元素就是文件的一行,input_rdd: RDD[str] = sc.textFile()
2. 处理数据:
rs_rdd = input_rdd
# 先分割,得到列表
.map(lambda line: re.split("\\s+",line))
# 将列表转换成元组
.map(lambda line: (line[0],int(line[1])))
# 按照Key分组聚合
.reduceByKey(lambda tmp,item: tmp+item)
# 按照个数排序
.sortBy(lambda tuple: tuple[1])
- SQL实现:
-- 将数据变成了一张表:word , numb
-- 统计每个单词出现的次数
select
word,
sum(numb) as cnt
from table
group by word
order by cnt desc
- 对比:
- SQL:简洁直观va:更灵活,功能可自定义实现,处理结构化数据的数据分析相对复杂。
- 思考:怎么能让Spark中的数据变成表呢?
- SQL:SQL是对表做处理。
- Spark支持SQL:但Spark必须要将数据变成表(数据 + Schema[表结构:字段信息])。
- SparkCore:SparkContext:RDD:将所有外部数据读取放入RDD:分布式列表:数据。
- SparkSQL:SparkSession:DataFrame:将所有外部数据读取放入DataFrame:分布式表:数据 + Schema。
- 小结:理解SparkCore的缺点

### 知识点04:【理解】SparkSQL的介绍(目标:理解SparkSQL的介绍)
#### 实施
- 问题:SparkCore只能通过普通Python/Scala/Java代码编程的方式来使用,对于统计分析的需求不是特别的友好怎么办?
- Hadoop:基于Java开发MapReduce实现分布式数据处理,不支持SQL,无法直接让数据分析师实现业务分析。
- FaceBook:会开发MapReduce的是Java工程师,会做业务分析的是数据分析师;Java工程师(MapReduce)不会做业务分析,数据分析师(SQL)不会Java,无法做数据分析。
- 解决:让Java工程师基于Java开发一个程序,能将数据分析师写的SQL转换为MapReduce程序。
- Hive:一群大数据开发者,将Hive源码独立出来,将底层解析成MapReduce的代码替换成解析变成SparkCore。
- Shark:基于Hive和Spark源代码集成设计的工具,高度类似Hive,计算引擎为Spark。
- 发展:
- Spark1.0:DB团队将Shark的部分源码集成到Spark软件中,重构设计,更名为SparkSQL;SparkCore:RDD;SparkSQL:SchemaRDD。
- Spark1.3:参考Python的Pandas设计,引入DataFrame;SparkSQL:DataFrame。
- Spark1.6:参考Flink的设计,引入DataSet的设计;SparkSQL:DataSet。
- Spark2.0:将DF作为DS的一种特殊形式,只保留DataSet数据结构。
- 学习的PySpark:上层看不到DataSet,全部是DataFrame,底层都是DataSet;设计:为让会Python的开发者快速上手PySpark,PySpark里只能看到DataFrame;注意:看到的是DataFrame,底层实际是DataSet。
- 定义:Spark SQL is Apache Spark's module for working with structured data.(SparkSQL是Spark中专门为结构化数据计算的模块,基于SparkCore之上)。
- 区别:SparkSQL处理结构化数据,SparkCore可处理任意类型数据。
- 结构化数据(数据分析场景):SparkSQL。
- 非结构化/半结构化数据(数据清洗场景):SparkCore。
- 联系:SparkSQL底层还是SparkCore,原理一样。
- 功能:提供SQL和DSL(Python、Scala、Java)开发接口,将SQL或DSL语句转换为SparkCore程序,实现结构化数据处理。
- SQL:写SQL对DataFrame做处理,需将DataFrame注册成表,处理简单需求。
- DSL:调用函数对DataFrame做处理,高度类似RDD调用算子处理,处理复杂需求。
- 特点(统一化):
- Integrated:集成大多数开发接口(SQL、DSL-Python、Scala、Java),既能写SQL做分析,也能写Python、Scala、Java代码做分析。
- Uniform Data Access:统一化数据访问(SparkSQL内部集成大多数常见结构化数据源的API,直接调方法即可读写)。
- 读数据:df1 = spark.read.csv/orc/jdbc/json/table()
- 写数据:df1.write.csv/orc/jdbc/json/table()
- Hive Integration:Hive的高度集成(SparkSQL的语法与Hive的语法95%以上一样),95%兼容Hive语法,部分特殊地方有区别(函数更多、语法准备度更高、限制更低)。
- Standard Connectivity:标准的数据连接接口(支持所有常用数据接口的方式执行SparkSQL的程序)。
- 理解SparkSQL:当做Hive Plus升级版。
- Hive:
- 测试:DataGrip连接HiveServer2写SQL做测试。
- 生产:SQL文件(hive -f / -e SQL文件/SQL语句);数据接口(Python:pyhive[10000] / Java:JDBC)。
- SparkSQL:
- 测试:SQL(DataGrip连接Spark ThriftServer写SQL做测试);DSL(Pycharm中直接运行测试)。
- 生产:SQL文件(spark-sql -f / -e SQL文件/SQL语句);DSL开发(spark-submit);数据接口(Python:pyhive[10001] / Java:JDBC)。
- 应用:整个Spark最主要且广泛应用的模块是SparkSQL,官方目前发展推荐SparkSQL;Spark社区目前发展是想用SparkSQL统一整个Spark接口;SparkSQL支持SQL也支持DSL(既可以写SQL也可以写代码(Python、Java、Scala));SparkSQL支持离线也支持实时(离线和实时代码开发规则基本一致);SparkSQL支持ML lib模块的接口开发(可直接用SparkSQL模式调用机器学习模块)。
- 小结:理解SparkSQL的介绍

### 知识点05:【理解】SQL与Core对比(目标:理解Spark中SparkSQL与SparkCore对比)
#### 实施
SparkSQL开发方式与SparkCore开发方式类似,但有区别,像但是不一样。
SparkCore:RDD、SparkContext;SparkSQL:Frame Data、SparkSession;领域:数据仓库、数据分析。

|对比维度|SparkCore|SparkSQL|
|----|----|----|
|处理场景|结构化、非结构化|结构化|
|处理类型|离线|离线、实时|
|数据来源|文件、数据库等|结构化数据(结构化文件、数据库)|
|开发语言|Python、Java、Scala|SQL、DSL-Python、Java、Scala|
|驱动对象|sc = SparkContext|spark = SparkSession|
|数据抽象|RDD|DataFrame/DataSet|
|抽象设计|分布式列表|分布式表|
|设计差异|仅数据|数据 + Schema(表结构)|
|处理方式|按行|按行、按列|

- 小结:理解Spark中SparkSQL与SparkCore对比

### 知识点06:【掌握】SparkSQL的编程规则(目标:掌握SparkSQL的编程规则)
#### 实施
- 开发方式:Python代码中实现SQL和DSL的开发。
- 方式一:基于DataGrip开发SparkSQL的SQL文件,类似Hive的用法。
- 方式二:基于Pycharm开发SparkSQL的Python文件,Spark特有的方式。
- 开发流程:
1. step1:在Python代码中构建驱动对象:SparkSession。
2. step2:读取数据变成DataFrame,使用SQL或者DSL进行处理,保存处理好的结果。
3. step3:关闭驱动对象:stop。
- 驱动对象:
- 任何一个Spark程序都需包含一个SparkContext对象,用于构建Driver进程负责的功能。
- SparkCore:
```
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master).set(Key, Value)
sc = SparkContext(conf=conf)
```
- SparkSQL:SparkSession(内部包含一个SparkContext)
```
SparkSession:用于代替SparkContext,Spark改成基于表(数据 + Schema)来做处理
注意:SparkSQL的本质是在SparkCore基础之上新增了一些东西
规则:任何一个Spark程序一定会包含一个SparkContext
设计:SparkSession必须包含原来SparkContext负责的内容,还会多一些内容,每个SparkSession内部都会包含一个SparkContext
```
- 如果按这种设计构建SparkSession:
```
conf = SparkConf().setMaster(运行模式).setAppName(程序名字).set(配置, 值)
sc = SparkContext(conf=conf) # 构建一个SparkContext
spark = SparkSession(context=sc) # 构建一个SparkSession
```
- 建造者模式:只将用户需要关心的交给用户做,其他由建造者自动实现;理解:原生方式像自己装修(自己设计、买材料、施工),建造者模式像找装修公司(告知装修需求,剩余由装修公司完成)。
- 基于建造者模式构建SparkSession对象:
```
spark = ( SparkSession
# 获取一个建造器
.builder
# 配置
.appName("APP Name") # 设置程序名字
.master("local[2]") # 设置程序运行模式
.config("spark.sql.shuffle.partitions", "2")
# 设置一些属性的值
# 构建
.getOrCreate()
)

    ```
- SQL规则:
- 流程:
1. step1:先将DataFrame注册成一张临时的视图(给它取个表名)。
2. step2:使用SQL对临时视图进行处理,得到新的DataFrame。
- 使用:
- 创建视图:将这个DataFrame构建一个表名,方便SQL做处理,df.createOrReplaceTempView("people")
- 调用SQL:
```
  sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|

# +----+-------+

```
- DSL规则:
- 类似RDD的编程方式:调用算子函数来实现处理。
- 流程:直接调用DataFrame的DSL函数进行处理。
- 原生DSL函数(将SQL语法变成了函数):select、where、groupBy、orderBy、limit、count、agg;比写SQL更灵活,不用考虑SQL执行顺序。
- 部分RDD算子函数:repartition、coalesce、persist、unpersist、checkpoint。
- 使用:
```
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

    df.select("name").show()
# | name|
# |Michael|
# | Andy|
# | Justin|
# +-------+

    df.select(df['name'], df['age'] + 1).show() => select name, age + 1 from df
# | name|(age + 1)|
# |Michael| null|
# | Andy| 31|
# | Justin| 20|

    df.where(df['age'] > 21).show() => select * from df where age > 21
# |age|name|
# | 30|Andy|
# +---+----+

    df.groupBy("age").count().show() => select age, count(1) from df group by age
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+

   df.orderBy("age", ascending=False) # 或者

   df.sort(df.age.desc())

   df.limit(5)

   total = df.count() print(f"总共有 {total} 条数据")

   select 选列,

   where 筛选符合条件的数据

   groupBy 分组,

   agg 分组后计算特定数值

   orderBy 排序

   limit 查看前几行数据

   count 统计数量


SparkSQL 编程规则与核心知识点详解


【模块一:SparkSQL 编程基础】

知识点06:【掌握】SparkSQL 在 Python 中的编程规则

编程规则测试代码(不能运行,仅用于规则讲解)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession"""
Description : SparkSQL 在 Python 代码中的编程规则
SourceFile  : 01.pyspark_sql_programing_rule
Author      : Frank
"""if __name__ == '__main__':# todo:0 - 设置系统环境变量os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'# todo:1 - 构建驱动对象: SparkContext => SparkSession"""方式一:传统方式(不推荐)conf = SparkConf().setAppName("SparkSQLAPP").setMaster("local[2]").set("配置名称", "配置的值")sc = SparkContext(conf=conf)spark = SparkSession(sparkContext=sc)"""# 推荐方式:使用建造者模式构建 SparkSessionspark = (SparkSession.builder.appName("SparkSQLApp").master("local[2]").config("配置名称", "配置的值").getOrCreate())# 从 SparkSession 获取 SparkContextsc = spark.sparkContext# todo:2 - 实现数据处理: RDD => DataFrame# step1: 读取数据并转换为 DataFramejson_df = spark.read.json(path="../datas/resources/people.json")jdbc_df = spark.read.jdbc(url="jdbc:mysql://node1:3306",table="dbname.tbname",properties={'user': 'root', 'password': '123456'})# step2: 处理数据 —— 使用 SQL 或 DSL""" 方式一:SQL(简洁直观,灵活性差) """json_df.createOrReplaceTempView("tmp_view_name")sql_rs = spark.sql("SELECT ... FROM tmp_view_name")""" 方式二:DSL(分析复杂,灵活性高) """import pyspark.sql.functions as Fdsl_rs = (jdbc_df.select(jdbc_df['col1'])  # 使用 DataFrame 引用列.select(F.col("col1"), F.col("col2"), F.substring(F.col("col3"), 1, 3))  # 使用 SQL 函数引用列.where("id > 2").groupBy("分组字段").agg().orderBy("排序字段").limit(10).groupBy().count().orderBy())# step3: 保存结果或输出sql_rs.show()  # 默认打印前 20 行jdbc_df.write.csv(path="../datas/output/output1", sep='\t')# todo:3 - 关闭驱动对象spark.stop()
小结

✅ 掌握 SparkSQL 的基本编程流程:

  1. 设置环境变量;
  2. 构建 SparkSession
  3. 读取数据 → 转换(SQL/DSL)→ 输出;
  4. 关闭 spark.stop()

知识点07:【掌握】SparkSQL 实现 WordCount

实施步骤
Step1: 构建 SparkSession
spark = SparkSession \.builder \.appName("SparkSQLAppName") \.master("local[2]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()spark.sparkContext.setLogLevel("WARN")
Step2: 读取数据、转换、输出
读取数据
input_df = spark.read.text("../datas/wordcount")# 打印 Schema
input_df.printSchema()
# 输出:
# root
#  |-- value: string (nullable = true)# 打印数据
input_df.show(truncate=False)
# 输出示例:
# |value|
# |hadoop spark|
# |hive hadoop spark spark|
# ...
方式一:使用 SQL 实现 WordCount
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession"""
Description : SparkSQL 基于 SQL 实现 WordCount
SourceFile  : 02.pyspark_sql_wordcount_sql
Author      : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount with SQL").master("local[2]").config("spark.default.parallelism", 2).getOrCreate())# 读取文本数据input_df = spark.read.text("../datas/wordcount/word.txt")# 注册为临时视图input_df.createOrReplaceTempView("tmp_view_words")# 使用 SQL 进行词频统计sql_rs = spark.sql("""WITH t1 AS (SELECT explode(split(value, ' ')) AS wordFROM tmp_view_words)SELECTword,count(1) AS cntFROM t1GROUP BY wordORDER BY cnt DESC""")# 显示结果sql_rs.show()# 关闭spark.stop()
方式二:使用 DSL 实现 WordCount
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F"""
Description : SparkSQL 基于 DSL 实现 WordCount
SourceFile  : 03.pyspark_sql_wordcount_dsl.py
Author      : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount DSL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取数据input_df = spark.read.text("../datas/wordcount/word.txt")# 使用 DSL 处理dsl_rs = (input_df.select(F.explode(F.split(F.col("value"), " ")).alias("word")).groupBy(F.col("word")).agg(F.count(F.col("word")).alias("cnt")).orderBy(F.col("cnt").desc()))# 输出结果dsl_rs.show()dsl_rs.write.json(path="../datas/output/output1")# 暂停(调试用)import timetime.sleep(10000000)spark.stop()
输出结果
+------+---+
| word |cnt|
+------+---+
| spark| 10|
|  hue |  9|
|hadoop|  7|
|hbase |  6|
| hive |  3|
+------+---+
Step3: 关闭 SparkSession
spark.stop()
小结

✅ 掌握 SparkSQL 实现 WordCount 的两种方式:

  • SQL:适合熟悉 SQL 的用户,简洁直观;
  • DSL:代码更灵活,便于复杂逻辑处理。

【模块二:DataFrame 的转换】

知识点08:【理解】自动推断类型转换 DataFrame

问题0: RDD 与 DataFrame 的区别?
类型数据结构是否有 Schema是否支持泛型示例
RDD分布式列表❌ 无✅ 支持RDD[str], RDD[int]
DataFrame分布式表✅ 有❌ 不支持每行是 Row 类型
DataSet分布式表✅ 有✅ 支持(Scala)DataSet[Row] = DataFrame

🔔 注:PySpark 中 DataFrame 底层是 DataSet[Row]

问题1: 半结构化数据能否直接用 SparkSQL 处理?

不能直接处理
✅ 需先用 SparkCore(RDD)将非结构化/半结构化数据转为结构化,再转为 DataFrame。

问题2: 如何将 RDD 转换为 DataFrame?

方式一:反射推断 Schema(自动推断)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import Row"""
Description : 使用反射自动推断 Schema 将 RDD 转为 DataFrame
SourceFile  : 004.pyspark_sql_rdd2df_reflect.py
Author      : Frank
Date        : 2023/5/23
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.master("local[2]").appName("RDD to DF via Reflection").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取原始数据movie_str_rdd = spark.sparkContext.textFile("../datas/movie/u.data")# 转换:字符串 → 列表 → 过滤 → Rowmovie_row_rdd = (movie_str_rdd.map(lambda line: re.split(r"\s+", line)).filter(lambda item: len(item) == 4).map(lambda item: Row(userid=item[0],movieid=item[1],rate=int(item[2]),ts=int(item[3]))))# 创建 DataFrame(自动推断 Schema)df = spark.createDataFrame(movie_row_rdd)df.printSchema()df.show()spark.stop()
输出 Schema
root|-- userid: string (nullable = true)|-- movieid: string (nullable = true)|-- rate: integer (nullable = false)|-- ts: long (nullable = false)
输出数据
|userid|movieid|rate|       ts|
|-----|-------|----|---------|
|  196|    242|   3|881250949|
|  186|    302|   3|891717742|
|   22|    377|   1|878887116|
|  244|     51|   2|880606923|
小结

✅ 理解自动推断类型转换 DataFrame:

  • 将 RDD 元素转为 Row 类型;
  • Spark 自动分析字段名和类型;
  • 适用于结构清晰、字段明确的数据。

知识点09:【理解】自定义 Schema 转换 DataFrame

步骤说明
  1. 将 RDD 转为 tuplelist
  2. 定义 StructTypeStructField 构建 Schema;
  3. 使用 spark.createDataFrame(rdd, schema)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType"""
Description : 手动定义 Schema 将 RDD 转为 DataFrame
SourceFile  : 04.pyspark_sql_rdd2df.py
Author      : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取数据movie_rdd = spark.sparkContext.textFile("../datas/movie/u.data")# 转为元组 RDDtuple_rdd = (movie_rdd.map(lambda line: re.split(r'\s+', line)).map(lambda item: (item[0], item[1], float(item[2]), int(item[3]))))# 自定义 Schemauser_schema = StructType([StructField("userid", StringType(), True),StructField("movieid", StringType(), True),StructField("rate", DoubleType(), False),StructField("ts", LongType(), True)])# 创建 DataFramedf2 = spark.createDataFrame(tuple_rdd, user_schema)df2.printSchema()df2.show()spark.stop()
小结

✅ 理解自定义 Schema 转换:

  • 更精确控制字段类型和是否可空;
  • 适用于类型不一致或需要强类型约束的场景。

知识点10:【掌握】指定列名称转换 DataFrame

使用 toDF() 方法
data_frame = rdd.toDF(["col1", "col2", "col3", "col4"])
完整示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
from pyspark.sql import SparkSession"""
Description : 使用 toDF 指定列名转换 RDD 为 DataFrame
SourceFile  : 05.pyspark_sql_rdd_with_df.py
Author      : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取并转换为元组 RDDtuple_rdd = (spark.sparkContext.textFile("../datas/movie/u.data").map(lambda line: re.split(r'\s+', line)).map(lambda item: (item[0], item[1], float(item[2]), int(item[3]))))# 使用 toDF 指定列名df = tuple_rdd.toDF(['userId', 'movieId', 'rate', 'ts'])df.printSchema()df.show()# 将 DataFrame 转回 RDDdf_rdd = df.rddprint("前5行:", df_rdd.take(5))print("第一行:", df_rdd.first())spark.stop()
小结

✅ 掌握 toDF() 方法:

  • 简洁高效;
  • 无需手动定义 Schema;
  • 适用于元组或列表类型 RDD。

【模块三:SQL 和 DSL 的使用】

知识点11:【掌握】SQL 开发的使用

规则
  1. DataFrame.createOrReplaceTempView("视图名"):注册临时视图;
  2. spark.sql("SQL语句"):执行 SQL 查询。
测试数据
  • emp.tsv:员工数据(id, name, sal, jiangjin, deptno)
  • dept.csv:部门数据(deptno, dname, loc)
需求

查询每个部门薪资最高的前两名员工信息及其部门名称。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession"""
Description : 使用 SQL 实现复杂查询
SourceFile  : 0506.pyspark_sql_rule_sql.py
Author      : Frank
"""if __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = (SparkSession.builder.appName("WordCount SQL").master("local[2]").config("spark.sql.shuffle.partitions", 2).getOrCreate())# 读取并转换 emp 数据emp_df = (spark.sparkContext.textFile("../datas/emp/emp.tsv").map(lambda line: line.split("\t")).map(lambda item: (item[0], item[1], float(item[2]), float(item[3]), item[4])).toDF(["empid", "ename", "sal", "jiangjin", "deptno"]))# 读取并转换 dept 数据dept_df = (spark.sparkContext.textFile("../datas/emp/dept.csv").map(lambda line: line.split(",")).map(lambda item: (item[0], item[1], item[2])).toDF(["deptno", "dname", "loc"]))# 注册临时视图emp_df.createOrReplaceTempView("tmp_view_emp")dept_df.createOrReplaceTempView("tmp_view_dept")# 执行 SQL 查询rs_df = spark.sql("""WITH tmp1 AS (SELECT *,row_number() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rnFROM tmp_view_emp),tmp2 AS (SELECT * FROM tmp1 WHERE rn < 3)SELECT tmp2.*, b.dnameFROM tmp2JOIN tmp_view_dept b ON tmp2.deptno = b.deptno""")rs_df.show()spark.stop()
输出结果
|empid|ename|  sal|jiangjin|deptno|rn|    dname|
|-----|-----|-----|--------|------|--|---------|
| 7788|SCOTT|3000.0|     0.0|    20| 1| RESEARCH|
| 7902| FORD|3000.0|     0.0|    20| 2| RESEARCH|
| 7839| KING|5000.0|     0.0|    10| 1|ACCOUNTING|
| 7782|CLARK|2450.0|     0.0|    10| 2|ACCOUNTING|
| 7698|BLAKE|2850.0|     0.0|    30| 1|    SALES|
| 7499|ALLEN|1600.0|   300.0|    30| 2|    SALES|
小结

✅ 掌握 SQL 在 SparkSQL 中的使用:

  • 语法与 Hive 高度兼容;
  • 适合复杂分析和窗口函数。

知识点12:【理解】DSL 开发中 API 函数的使用

DSL 函数分类
类别函数示例
基础函数count, collect, take, first, head, tail
基本算子foreach, distinct, union, coalesce, repartition
持久化算子cache, persist, unpersist
其他工具columns, schema, rdd, printSchema

- 基础函数:count/collect/take/first、head/tail。
- 基本算子:foreach/foreachPartition、distinct/union/unionAll、coalesce/repartition。
- 持久化算子:cache/persist、unpersist。
- 其他算子:columns、schema/rdd/printSchema。
- 各函数说明:
- count:统计行数。
- collect:将DataFrame转换成一个数组。
- take:取DataFrame中前N行的数据。
- first:取DataFrame中第一行的数据。
- head:默认取DataFrame中第一行的数据,可指定返回前N行的数据。
- tail:可指定返回后N行的数据。
- foreach:对DataFrame中每条数据进行处理,没有返回值。
- foreachPartition:对DataFrame中每个分区数据进行处理,没有返回值。
- distinct:对DataFrame中每条数据进行去重处理。
- union/unionAll:实现两个DataFrame的合并。
- coalesce/repartition:调整DataFrame的分区数。
- cache/persist:对DataFrame进行缓存。
- unpersist:取消DataFrame的缓存。
- columns:返回DataFrame中的所有列名。
- schema:返回DataFrame中Schema的信息。
- rdd:返回DataFrame中的数据放入RDD中。
- printSchema:打印DataFrame的Schema信息。

完整示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark import StorageLevel
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = '/export/server/jdk'os.environ['HADOOP_HOME'] = '/export/server/hadoop'os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'spark = SparkSession \.builder \.appName("MovieApp") \.master("local[2]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()people_df = spark.read.json("../datas/resources/people.json")# 基本操作print(f"总行数: {people_df.count()}")print(f"全部数据: {people_df.collect()}")print(f"前两行: {people_df.take(2)}")print(f"第一行: {people_df.first()}")print(f"最后一行: {people_df.tail(1)}")people_df.foreach(lambda row: print(row))people_df.foreachPartition(lambda part: print(*part))# 合并与去重people_df_other = spark.read.json("../datas/resources/people.json")people_df.union(people_df_other).show()people_df.unionAll(people_df_other).show()people_df.unionAll(people_df_other).distinct().show()# 分区调整print(f"原分区数: {people_df.rdd.getNumPartitions()}")print(f"合并后: {people_df.coalesce(1).rdd.getNumPartitions()}")print(f"重分区后: {people_df.repartition(4).rdd.getNumPartitions()}")# 持久化people_df.cache()people_df.persist(StorageLevel.MEMORY_AND_DISK_2)people_df.unpersist(blocking=True)# 元信息print(f"列名: {people_df.columns}")print(f"Schema: {people_df.schema}")people_df.printSchema()spark.stop()
小结

✅ 理解 DSL 中常用 API 函数:

  • 结合了 RDD 算子与 SQL 语法优点;
  • 更适合复杂数据处理流程。

总结

知识点目标关键技能
06掌握编程规则环境变量、SparkSession 构建、SQL/DSL 流程
07掌握 WordCountSQL 与 DSL 实现词频统计
08理解自动推断Row + createDataFrame 自动推断 Schema
09理解自定义 SchemaStructType + StructField 手动定义结构
10掌握 toDFrdd.toDF(["col1",...]) 快速转换
11掌握 SQL 使用createOrReplaceTempView + spark.sql()
12理解 DSL API常用函数如 count, union, cache

建议学习路径

  1. 先掌握 SparkSession 构建;
  2. 学会读取数据并查看 printSchema()
  3. 练习 SQL 和 DSL 实现 WordCount;
  4. 掌握 RDD → DataFrame 三种方式;
  5. 熟练使用 toDFcreateOrReplaceTempView

提示:所有代码均基于 PySpark,实际运行需配置好 Spark 环境。

 

http://www.dtcms.com/a/536594.html

相关文章:

  • 如何做网站维护做个什么样的网站比较好
  • 借用与引用实战
  • 涉密资质 网站建设整站seo策略实施
  • 【数据结构】链表补充——静态链表、循环链表、双向链表与双向循环链表
  • Python测试题1
  • 解锁仓颉语言:探索全场景智能编程新范式
  • 大模型-模型压缩:量化、剪枝、蒸馏、二值化 (3)
  • C++进阶:(二)多态的深度解析
  • 天汇大厦网站建设公司佳木斯做网站公司
  • Java 大视界 -- 基于 Java 的大数据可视化在城市交通拥堵溯源与治理策略展示中的应用
  • 从零实现一个完整的vector类:深入理解C++动态数组
  • JVM从操作系统层面的总体启动流程
  • C++list类的模拟实现
  • 深圳三站合一网站建设网站建设推广怎样找客户
  • 【多所高校主办】第七届机器人、智能控制与人工智能国际学术会议(RICAI 2025)
  • 做网站有虚拟服务器什么是网络营销产生的基础
  • 高配款浮标五参数—可以及时掌握水体的生态状况
  • 《Java 实用技巧:均匀取元素算法(支持不足补齐)》
  • 【Linux】nohup命令
  • 泰州网站建设案例昆明网站seo外包
  • 【成长纪实】星光不负 码向未来|我的 HarmonyOS 学习之路与社区成长故事
  • 网站服务器租用4t多少钱一年啊提供网站建设公司有哪些
  • 如何处理系统环境变量的字符长度超过了 Windows 对话框的限制(2047 个字符)
  • 快速上手大模型:深度学习1(初识、神经网络基础)
  • Java---StringBuffer类
  • 【从零开始构建性能测试体系-10】人工智能与性能测试:如何借助AI提升测试效率
  • 网站建设人员要与客户谈什么一篇关于大学网站建设与管理的论文
  • 子洲网站建设制作网站上做网页怎么改图片
  • kafka使用-Producer
  • CUDA实现的点云MLS滤波