Spark专题-第二部分:Spark SQL 入门(1)-Spark SQL 简介
通过前两章的理论介绍,让读者能有一个简单的认知,接下来就是sql部分,很简单,你一定能理解并掌握
第二部分:Spark SQL 入门(1)
Spark SQL 简介
1. Spark SQL 是什么?
Spark SQL 是 Apache Spark 中用于处理结构化数据的模块。它提供了一个名为 DataFrame 的编程抽象,并且可以与 Spark 生态系统中的其他组件无缝集成。
核心价值主张:
Spark SQL 让你能够使用 SQL 查询或 DataFrame API 来查询 Spark 程序内的结构化数据。
2. Spark SQL 的演进历程
了解 Spark SQL 的历史有助于理解其设计理念:
-
早期阶段:Shark (Hive on Spark)
- 尝试在 Spark 上运行 Hive
- 兼容 Hive 查询语言、元存储和数据格式
- 性能优于 MapReduce 但受限于 Hive 代码库
-
Spark SQL 诞生:
- 重新设计的查询引擎
- 引入 DataFrame API
- 提供更好的性能和扩展性
-
统一入口点:
- 引入 SparkSession 替代 SQLContext 和 HiveContext
- 提供与所有 Spark 功能交互的统一入口
3. 核心概念:DataFrame 与 Dataset
DataFrame:
- 等同于关系型数据库中的表
- 具有命名列和类型的分布式数据集合
- 在 Python 和 R 中,DataFrame 是主要的编程抽象
- 在 Scala 和 Java 中,DataFrame 是 Dataset[Row] 的类型别名
Dataset:
- 强类型的数据集合
- 结合了 RDD 的优点和 Spark SQL 的优化执行引擎
- 仅在 Scala 和 Java API 中可用
RDD vs DataFrame vs Dataset:
特性 | RDD | DataFrame | Dataset |
---|---|---|---|
类型安全 | 是 | 否 | 是 |
优化 | 无 | Catalyst 优化器 | Catalyst 优化器 |
序列化 | Java 序列化 | Tungsten 二进制格式 | Tungsten 二进制格式 |
语言支持 | 所有 | 所有 | Scala/Java |
API 风格 | 函数式 | 声明式 | 混合式 |
4. SparkSession:统一的入口点
在 Spark 2.0+ 中,SparkSession 取代了旧的 SQLContext 和 HiveContext,提供了与 Spark 所有功能交互的统一入口。
# 创建 SparkSession 的典型方式
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("My Spark SQL App") \.config("spark.some.config.option", "some-value") \.enableHiveSupport() \ # 如果需要访问 Hive.getOrCreate()# SparkSession 提供了访问各种功能的入口
spark.sql("SELECT * FROM table") # 执行 SQL
spark.read.csv("path/to/file") # 读取数据
spark.udf.register("my_udf", my_function) # 注册 UDF
SparkSession 的组件:
5. Spark SQL 的架构与执行流程
Spark SQL 的核心是其优化器 Catalyst,它负责将用户查询转换为高效的执行计划。这一部分不过多展开,具体会在下文做示例解释
Catalyst 优化器的工作阶段:
- 解析:将 SQL 查询或 DataFrame 操作转换为抽象语法树(AST)
- 逻辑计划:将 AST 转换为逻辑计划
- 逻辑优化:应用规则优化逻辑计划(如谓词下推、常量折叠)
- 物理计划:将逻辑计划转换为物理计划
- 代码生成:生成高效的 Java 字节码
6. 为什么使用 Spark SQL?
-
性能优势:
- Catalyst 优化器自动优化查询
- Tungsten 引擎提供内存管理和代码生成
- 通常比直接使用 RDD 更高效
-
易用性:
- 提供熟悉的 SQL 接口
- DataFrame API 直观易用
- 与多种数据源集成
-
统一的数据访问:
- 相同的 API 访问不同数据源
- 支持 Hive、JSON、Parquet、ORC、JDBC 等
-
与 Spark 生态集成:
- 与 Spark Streaming、MLlib、GraphX 无缝集成
- 支持流式 SQL 查询
7. 获取数据示例
# 创建 DataFrame 的多种方式
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerTypespark = SparkSession.builder.appName("SparkSQLIntro").getOrCreate()# 方式1:从集合创建
data = [("Alice", 28), ("Bob", 35), ("Charlie", 42)]
columns = ["name", "age"]
df1 = spark.createDataFrame(data, columns)# 方式2:指定schema创建
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])
df2 = spark.createDataFrame(data, schema)# 方式3:从数据源读取
df3 = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)# 方式4:从Hive表读取
df4 = spark.sql("SELECT * FROM hive_table")# 使用 DataFrame API 进行操作
df_filtered = df1.filter(df1.age > 30)
df_selected = df_filtered.select("name")# 使用 SQL 查询
df1.createOrReplaceTempView("people")
sql_result = spark.sql("SELECT name FROM people WHERE age > 30")# 展示结果
print("DataFrame API 结果:")
df_selected.show()print("SQL 查询结果:")
sql_result.show()# 停止 SparkSession
spark.stop()
8. Spark SQL 的应用场景
- ETL 管道:从多种数据源提取、转换和加载数据
- 数据分析:使用 SQL 或 DataFrame API 进行探索性数据分析
- 机器学习:为 MLlib 准备和预处理特征数据
- 流处理:使用 Structured Streaming 处理实时数据流
- 数据仓库:构建在 Spark 上的数据仓库解决方案
9. 示例
9.1 一个简单查询的完整执行流程
让我们通过一个简单的示例来理解 Spark SQL 如何处理 HDFS 上的数据文件:
-- 简单查询:从 HDFS 上的用户数据中筛选年龄大于 30 的用户
SELECT name, age, department
FROM users
WHERE age > 30 AND department = 'Engineering'
9.2 执行流程详解
9.2.1 解析和计划生成阶段
代码层面:
# 用户提交查询
df = spark.sql("SELECT name, age, department FROM users WHERE age > 30 AND department = 'Engineering'")# Spark SQL 内部会构建如下逻辑计划
处理步骤:
- 语法解析:将 SQL 字符串解析为抽象语法树(AST)
- 逻辑计划生成:创建未优化的逻辑查询计划
- 逻辑优化:应用各种优化规则(谓词下推、列裁剪等)
- 物理计划生成:将逻辑计划转换为可在集群上执行的物理计划
9.2.2 HDFS 数据读取阶段
物理执行细节:
HDFS 交互细节:
- Spark 首先与 HDFS NameNode 通信,获取文件元数据(块位置、大小等)
- 根据数据本地性原则,优先将任务调度到存储数据块的节点上
- 每个 Task 读取一个 HDFS 数据块(默认128MB)
9.2.3 数据过滤与处理阶段
谓词下推(Predicate Pushdown)优化:
# 没有谓词下推的情况(低效)
df = spark.read.parquet("hdfs://path/to/users")
result = df.filter((df.age > 30) & (df.department == 'Engineering')).select("name", "age", "department")# 有谓词下推的情况(高效)
# Spark 会将过滤条件下推到数据读取层,减少数据传输量
列裁剪(Column Pruning)优化:
- Spark 只读取查询中需要的列(name, age, department)
- 忽略其他不必要的列,减少 I/O 和数据传输
9.2.4 分布式执行模型
9.2.5. 完整代码示例与说明
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("HDFS_Processing_Example") \.config("spark.sql.adaptive.enabled", "true") \ # 启用自适应查询执行.getOrCreate()# 读取 HDFS 上的 Parquet 文件
# 注意:此时并不会立即读取数据,只是创建了一个逻辑计划
df = spark.read.parquet("hdfs://namenode:8020/data/users")# 创建临时视图以便执行 SQL 查询
df.createOrReplaceTempView("users")# 执行查询 - 此时才会真正触发计算
result = spark.sql("""SELECT name, age, department FROM users WHERE age > 30 AND department = 'Engineering'
""")# 查看执行计划(调试用)
result.explain(True)
# 输出:
# == Parsed Logical Plan ==
# == Analyzed Logical Plan ==
# == Optimized Logical Plan == # 这里可以看到优化后的计划
# == Physical Plan == # 这里可以看到物理执行计划# 触发实际执行并获取结果
# 注意:collect() 会将所有数据收集到Driver端,生产环境中慎用
output = result.collect()# 更安全的方式是写入到HDFS或其他存储系统
result.write.parquet("hdfs://namenode:8020/results/engineering_staff")# 停止 SparkSession
spark.stop()
9.2.6 关键优化技术
-
谓词下推(Predicate Pushdown)
- 将过滤条件推送到数据源层面
- 在读取数据时尽早过滤,减少数据传输量
-
列裁剪(Column Pruning)
- 只读取查询中需要的列
- 显著减少 I/O 和内存使用
-
分区裁剪(Partition Pruning)
- 如果数据是分区的,只读取相关分区
- 例如:
WHERE date = '2023-01-01'
只会读取对应日期的分区
-
自适应查询执行(AQE)
- Spark 3.0+ 特性
- 运行时基于统计信息优化执行计划
- 自动处理数据倾斜和调整shuffle分区数
9.2.7 性能考量
影响性能的关键因素:
- 数据本地性:任务与数据的距离(PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY)
- 文件格式:列式格式(Parquet、ORC)通常比行式格式(CSV、JSON)更高效
- 压缩算法:Snappy、Zstandard、Gzip 等压缩方式的选择
- 集群配置:Executor内存、CPU核心数、网络带宽等
监控执行:
- 通过 Spark UI(通常为 http://driver-host:4040)监控任务执行
- 查看每个Stage的详细信息,包括数据读取量、shuffle数据量等
- 识别数据倾斜和性能瓶颈
通过这种分层处理方式,Spark SQL 能够高效地处理分布在 HDFS 上的大规模数据集,同时提供类似传统数据库的查询体验。
10. 总结
Spark SQL 是 Spark 生态系统中处理结构化数据的核心组件,它提供了:
- 熟悉的 SQL 接口和强大的 DataFrame API
- 高性能的查询执行得益于 Catalyst 优化器和 Tungsten 引擎
- 与多种数据源和 Spark 组件的无缝集成
这篇文章就以一个简单的sql查询作为结束,下一篇会介绍一些具体的算子