当前位置: 首页 > news >正文

Spark专题-第二部分:Spark SQL 入门(1)-Spark SQL 简介

通过前两章的理论介绍,让读者能有一个简单的认知,接下来就是sql部分,很简单,你一定能理解并掌握

第二部分:Spark SQL 入门(1)

Spark SQL 简介

1. Spark SQL 是什么?

Spark SQL 是 Apache Spark 中用于处理结构化数据的模块。它提供了一个名为 DataFrame 的编程抽象,并且可以与 Spark 生态系统中的其他组件无缝集成。

核心价值主张

Spark SQL 让你能够使用 SQL 查询或 DataFrame API 来查询 Spark 程序内的结构化数据。

Spark Core
Spark SQL
DataFrame API
SQL 查询
数据源集成
优化器 Catalyst
Python/Java/Scala/R
标准 SQL 支持
Hive, JSON, Parquet, JDBC...
查询优化

2. Spark SQL 的演进历程

了解 Spark SQL 的历史有助于理解其设计理念:

  1. 早期阶段:Shark (Hive on Spark)

    • 尝试在 Spark 上运行 Hive
    • 兼容 Hive 查询语言、元存储和数据格式
    • 性能优于 MapReduce 但受限于 Hive 代码库
  2. Spark SQL 诞生

    • 重新设计的查询引擎
    • 引入 DataFrame API
    • 提供更好的性能和扩展性
  3. 统一入口点

    • 引入 SparkSession 替代 SQLContext 和 HiveContext
    • 提供与所有 Spark 功能交互的统一入口

3. 核心概念:DataFrame 与 Dataset

RDD
- partitions: List[Partition]
- dependencies: List[Dependency]
+map(func)
+filter(func)
+reduce(func)
Dataset
- encoder: Encoder[T]
- logicalPlan: LogicalPlan
+filter(func)
+map(func)
+select(cols: Column*)
DataFrame
- schema: StructType
+filter(condition: Column)
+select(cols: Column*)
+groupBy(cols: Column*)

DataFrame

  • 等同于关系型数据库中的表
  • 具有命名列和类型的分布式数据集合
  • 在 Python 和 R 中,DataFrame 是主要的编程抽象
  • 在 Scala 和 Java 中,DataFrame 是 Dataset[Row] 的类型别名

Dataset

  • 强类型的数据集合
  • 结合了 RDD 的优点和 Spark SQL 的优化执行引擎
  • 仅在 Scala 和 Java API 中可用

RDD vs DataFrame vs Dataset

特性RDDDataFrameDataset
类型安全
优化Catalyst 优化器Catalyst 优化器
序列化Java 序列化Tungsten 二进制格式Tungsten 二进制格式
语言支持所有所有Scala/Java
API 风格函数式声明式混合式

4. SparkSession:统一的入口点

在 Spark 2.0+ 中,SparkSession 取代了旧的 SQLContext 和 HiveContext,提供了与 Spark 所有功能交互的统一入口。

# 创建 SparkSession 的典型方式
from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("My Spark SQL App") \.config("spark.some.config.option", "some-value") \.enableHiveSupport() \  # 如果需要访问 Hive.getOrCreate()# SparkSession 提供了访问各种功能的入口
spark.sql("SELECT * FROM table")  # 执行 SQL
spark.read.csv("path/to/file")    # 读取数据
spark.udf.register("my_udf", my_function)  # 注册 UDF

SparkSession 的组件

SparkSession
SparkContext
SQLContext
StreamingContext
HiveContext
RDD 操作
DataFrame 操作
流处理
Hive 集成

5. Spark SQL 的架构与执行流程

Spark SQL 的核心是其优化器 Catalyst,它负责将用户查询转换为高效的执行计划。这一部分不过多展开,具体会在下文做示例解释

Catalyst 优化器
逻辑计划
逻辑优化
物理计划
物理优化
SQL 查询/DataFrame 代码
解析
代码生成
执行

Catalyst 优化器的工作阶段

  1. 解析:将 SQL 查询或 DataFrame 操作转换为抽象语法树(AST)
  2. 逻辑计划:将 AST 转换为逻辑计划
  3. 逻辑优化:应用规则优化逻辑计划(如谓词下推、常量折叠)
  4. 物理计划:将逻辑计划转换为物理计划
  5. 代码生成:生成高效的 Java 字节码

6. 为什么使用 Spark SQL?

  1. 性能优势

    • Catalyst 优化器自动优化查询
    • Tungsten 引擎提供内存管理和代码生成
    • 通常比直接使用 RDD 更高效
  2. 易用性

    • 提供熟悉的 SQL 接口
    • DataFrame API 直观易用
    • 与多种数据源集成
  3. 统一的数据访问

    • 相同的 API 访问不同数据源
    • 支持 Hive、JSON、Parquet、ORC、JDBC 等
  4. 与 Spark 生态集成

    • 与 Spark Streaming、MLlib、GraphX 无缝集成
    • 支持流式 SQL 查询

7. 获取数据示例

# 创建 DataFrame 的多种方式
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerTypespark = SparkSession.builder.appName("SparkSQLIntro").getOrCreate()# 方式1:从集合创建
data = [("Alice", 28), ("Bob", 35), ("Charlie", 42)]
columns = ["name", "age"]
df1 = spark.createDataFrame(data, columns)# 方式2:指定schema创建
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])
df2 = spark.createDataFrame(data, schema)# 方式3:从数据源读取
df3 = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)# 方式4:从Hive表读取
df4 = spark.sql("SELECT * FROM hive_table")# 使用 DataFrame API 进行操作
df_filtered = df1.filter(df1.age > 30)
df_selected = df_filtered.select("name")# 使用 SQL 查询
df1.createOrReplaceTempView("people")
sql_result = spark.sql("SELECT name FROM people WHERE age > 30")# 展示结果
print("DataFrame API 结果:")
df_selected.show()print("SQL 查询结果:")
sql_result.show()# 停止 SparkSession
spark.stop()

8. Spark SQL 的应用场景

  1. ETL 管道:从多种数据源提取、转换和加载数据
  2. 数据分析:使用 SQL 或 DataFrame API 进行探索性数据分析
  3. 机器学习:为 MLlib 准备和预处理特征数据
  4. 流处理:使用 Structured Streaming 处理实时数据流
  5. 数据仓库:构建在 Spark 上的数据仓库解决方案

9. 示例

9.1 一个简单查询的完整执行流程

让我们通过一个简单的示例来理解 Spark SQL 如何处理 HDFS 上的数据文件:

-- 简单查询:从 HDFS 上的用户数据中筛选年龄大于 30 的用户
SELECT name, age, department 
FROM users 
WHERE age > 30 AND department = 'Engineering'
9.2 执行流程详解
HDFS 数据存储
Block 1
part-00000
Block 2
part-00001
Block 3
part-00002
SQL 查询
解析与语法分析
生成逻辑计划
逻辑优化
谓词下推/列裁剪
生成物理计划
任务调度与执行
从HDFS读取数据
数据转换与过滤
结果返回
9.2.1 解析和计划生成阶段

代码层面:

# 用户提交查询
df = spark.sql("SELECT name, age, department FROM users WHERE age > 30 AND department = 'Engineering'")# Spark SQL 内部会构建如下逻辑计划

处理步骤:

  1. 语法解析:将 SQL 字符串解析为抽象语法树(AST)
  2. 逻辑计划生成:创建未优化的逻辑查询计划
  3. 逻辑优化:应用各种优化规则(谓词下推、列裁剪等)
  4. 物理计划生成:将逻辑计划转换为可在集群上执行的物理计划
9.2.2 HDFS 数据读取阶段

物理执行细节:

Executor执行过程
连接HDFS DataNode
Task启动
读取数据块
应用谓词下推过滤
仅返回需要的列
将结果放入内存
Driver程序
计算输入分割
根据HDFS块大小
为每个分割创建Task
将Task分发到Executor

HDFS 交互细节:

  • Spark 首先与 HDFS NameNode 通信,获取文件元数据(块位置、大小等)
  • 根据数据本地性原则,优先将任务调度到存储数据块的节点上
  • 每个 Task 读取一个 HDFS 数据块(默认128MB)
9.2.3 数据过滤与处理阶段

谓词下推(Predicate Pushdown)优化:

# 没有谓词下推的情况(低效)
df = spark.read.parquet("hdfs://path/to/users")
result = df.filter((df.age > 30) & (df.department == 'Engineering')).select("name", "age", "department")# 有谓词下推的情况(高效)
# Spark 会将过滤条件下推到数据读取层,减少数据传输量

列裁剪(Column Pruning)优化:

  • Spark 只读取查询中需要的列(name, age, department)
  • 忽略其他不必要的列,减少 I/O 和数据传输
9.2.4 分布式执行模型
HDFS集群
工作节点3
工作节点2
工作节点1
Driver节点
DataNode: block1
DataNode: block2
DataNode: block3
Executor进程
Task: 读取block3
内存数据
Executor进程
Task: 读取block2
内存数据
Executor进程
Task: 读取block1
内存数据
SparkSession
查询规划器
任务调度器
9.2.5. 完整代码示例与说明
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("HDFS_Processing_Example") \.config("spark.sql.adaptive.enabled", "true") \  # 启用自适应查询执行.getOrCreate()# 读取 HDFS 上的 Parquet 文件
# 注意:此时并不会立即读取数据,只是创建了一个逻辑计划
df = spark.read.parquet("hdfs://namenode:8020/data/users")# 创建临时视图以便执行 SQL 查询
df.createOrReplaceTempView("users")# 执行查询 - 此时才会真正触发计算
result = spark.sql("""SELECT name, age, department FROM users WHERE age > 30 AND department = 'Engineering'
""")# 查看执行计划(调试用)
result.explain(True)
# 输出:
# == Parsed Logical Plan ==
# == Analyzed Logical Plan ==  
# == Optimized Logical Plan ==  # 这里可以看到优化后的计划
# == Physical Plan ==          # 这里可以看到物理执行计划# 触发实际执行并获取结果
# 注意:collect() 会将所有数据收集到Driver端,生产环境中慎用
output = result.collect()# 更安全的方式是写入到HDFS或其他存储系统
result.write.parquet("hdfs://namenode:8020/results/engineering_staff")# 停止 SparkSession
spark.stop()
9.2.6 关键优化技术
  1. 谓词下推(Predicate Pushdown)

    • 将过滤条件推送到数据源层面
    • 在读取数据时尽早过滤,减少数据传输量
  2. 列裁剪(Column Pruning)

    • 只读取查询中需要的列
    • 显著减少 I/O 和内存使用
  3. 分区裁剪(Partition Pruning)

    • 如果数据是分区的,只读取相关分区
    • 例如:WHERE date = '2023-01-01' 只会读取对应日期的分区
  4. 自适应查询执行(AQE)

    • Spark 3.0+ 特性
    • 运行时基于统计信息优化执行计划
    • 自动处理数据倾斜和调整shuffle分区数
9.2.7 性能考量

影响性能的关键因素:

  1. 数据本地性:任务与数据的距离(PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY)
  2. 文件格式:列式格式(Parquet、ORC)通常比行式格式(CSV、JSON)更高效
  3. 压缩算法:Snappy、Zstandard、Gzip 等压缩方式的选择
  4. 集群配置:Executor内存、CPU核心数、网络带宽等

监控执行:

  • 通过 Spark UI(通常为 http://driver-host:4040)监控任务执行
  • 查看每个Stage的详细信息,包括数据读取量、shuffle数据量等
  • 识别数据倾斜和性能瓶颈

通过这种分层处理方式,Spark SQL 能够高效地处理分布在 HDFS 上的大规模数据集,同时提供类似传统数据库的查询体验。

10. 总结

Spark SQL 是 Spark 生态系统中处理结构化数据的核心组件,它提供了:

  • 熟悉的 SQL 接口和强大的 DataFrame API
  • 高性能的查询执行得益于 Catalyst 优化器和 Tungsten 引擎
  • 与多种数据源和 Spark 组件的无缝集成

这篇文章就以一个简单的sql查询作为结束,下一篇会介绍一些具体的算子


文章转载自:

http://U7qoW2Kv.gqcsd.cn
http://Vq7X1V78.gqcsd.cn
http://DeeicBCu.gqcsd.cn
http://aHP3cJ0I.gqcsd.cn
http://MuuJdlob.gqcsd.cn
http://3UAvUril.gqcsd.cn
http://08SmsD7y.gqcsd.cn
http://MmFWYi1e.gqcsd.cn
http://smLLTAW0.gqcsd.cn
http://5hHGyEmS.gqcsd.cn
http://GKl3mxgM.gqcsd.cn
http://EzopOlln.gqcsd.cn
http://kLZ7nLHH.gqcsd.cn
http://aeent42e.gqcsd.cn
http://2AmC6aT2.gqcsd.cn
http://yUFETtPp.gqcsd.cn
http://SZqFwAyA.gqcsd.cn
http://aT5Y5iII.gqcsd.cn
http://InBVUcxy.gqcsd.cn
http://L0O7c9Ft.gqcsd.cn
http://AlDniXM7.gqcsd.cn
http://iA0MZn2J.gqcsd.cn
http://HVqM8vKx.gqcsd.cn
http://NmRQ4vIW.gqcsd.cn
http://kItTuItw.gqcsd.cn
http://MPjeqQzV.gqcsd.cn
http://a3u7fuEE.gqcsd.cn
http://0wEC0V1T.gqcsd.cn
http://iVrSwGwc.gqcsd.cn
http://IQO2zcWF.gqcsd.cn
http://www.dtcms.com/a/385896.html

相关文章:

  • Spark源码学习分享之submit提交流程(1)
  • 5、二叉树-小堆
  • 技术奇点爆发周:2025 年 9 月科技突破全景扫描
  • 从Dubbo到SpringCloud Alibaba:大型项目迁移的实战手册(含成本分析与踩坑全记录)(一)
  • 【算法】C语言多组输入输出模板
  • 测试 Docker 的实时恢复功能
  • 系统中间件与云虚拟化-serverless-基于阿里云函数计算的云工作流CloudFlow设计与体验
  • springboot netty 客户端网络编程入门与实战
  • TCP/IP模型
  • 智慧用电安全管理系统的核心优势
  • flutter结合NestedScrollView+TabBar实现嵌套滚动
  • 基于定制开发开源AI智能名片S2B2C商城小程序的社群团购线上平台搭建研究
  • DEDECMS 小程序插件简介 2.0全新上线
  • 详解 Spring Boot 单元测试:@SpringBootTest 与 JUnit 依赖配置及环境注入
  • JMeter元件简介与JMeter测试计划
  • 陪诊小程序:让医疗关怀触手可及
  • n*n矩阵方程组Ax=b,使用Eigen矩阵库常用解法介绍
  • IvorySQL 4.6:DocumentDB+FerretDB 实现 MongoDB 兼容部署指南
  • UART,IIC,SPI总线(通信协议)
  • 记录一次小程序请求报错:600001
  • 光谱相机的新兴领域应用
  • GO学习记录十——发包
  • OpenLayers数据源集成 -- 章节十六:XML图层详解:OpenStreetMap数据的动态加载与智能样式渲染方案
  • vector 模拟实现 4 大痛点解析:从 memcpy 到模板嵌套的实战方案
  • tuple/dict/list 这三个数据类型在取值时候的区别
  • 用Python实现自动化的Web测试(Selenium)
  • Spring Boot 2.5.0 集成 Elasticsearch 7.12.0 实现 CRUD 完整指南(Windows 环境)
  • 第九章:使用Jmeter+Ant+Jenkins实现接口自动化测试持续集成
  • 使用IP的好处
  • 育碧确定《AC影》3月20日发售并分享系列游戏首发数据