Apache Spark 上手指南(基于 Spark 3.5.0 稳定版)
Apache Spark 上手指南(基于 Spark 3.5.0 稳定版)
本指南旨在帮助零基础用户从环境搭建到核心功能实操,逐步掌握 Spark 的基础使用,所有步骤均经过验证,可直接落地执行。Spark 是基于内存的分布式计算框架,核心用于大数据的批处理、流处理、机器学习和图计算,本指南聚焦最常用的批处理与 Spark SQL 能力。
一、前置知识与环境准备
在安装 Spark 前,需先配置依赖环境,Spark 依赖 Java 环境(推荐 JDK 11,兼容绝大多数版本),且支持 Scala(原生语言)、Python(PySpark)、Java 开发。
1.1 依赖环境安装(以 Windows 10/11 为例,Linux/macOS 步骤类似)
步骤 1:安装 JDK 11
- 下载 JDK 11:访问 Oracle 官网(需注册账号),选择对应系统的
jdk-11.x.x_windows-x64_bin.exe
(或.tar.gz
)。 - 安装 JDK:双击安装,建议安装路径无空格(如
D:\Java\jdk11
)。 - 配置环境变量:
- 右键「此电脑」→「属性」→「高级系统设置」→「环境变量」。
- 系统变量中新增
JAVA_HOME
,值为 JDK 安装路径(如D:\Java\jdk11
)。 - 编辑
Path
变量,新增%JAVA_HOME%\bin
和%JAVA_HOME%\jre\bin
(若 JDK 11 无 jre 目录,可在 cmd 中执行jlink --module-path jmods --add-modules java.desktop --output jre
生成)。
- 验证:打开 cmd,输入
java -version
,输出java version "11.x.x"
即成功。
步骤 2:(可选)安装 Python(用于 PySpark)
- 下载 Python 3.8+:访问 Python 官网,选择 Windows 安装包(勾选「Add Python 3.x to PATH」)。
- 验证:cmd 输入
python --version
,输出Python 3.x.x
即成功。 - 安装 PySpark 依赖:cmd 执行
pip install pyspark pandas
(pandas 用于 DataFrame 与本地数据交互)。
步骤 3:(可选)安装 Scala(用于 Scala 开发)
- 下载 Scala 2.12.x(Spark 3.x 兼容 Scala 2.12,不兼容 2.13):访问 Scala 官网,选择
scala-2.12.18.msi
。 - 安装:路径建议无空格(如
D:\Scala\scala-2.12.18
),安装时自动配置环境变量。 - 验证:cmd 输入
scala -version
,输出Scala code runner version 2.12.18
即成功。
1.2 安装 Spark 3.5.0(本地模式)
本地模式无需集群,适合新手测试代码,步骤如下:
- 下载 Spark:访问 Spark 官网,按以下配置选择:
- Spark Release:3.5.0(稳定版)
- Package Type:Pre-built for Apache Hadoop 3.3 and later(无需单独装 Hadoop)
- Download Spark:点击链接下载
spark-3.5.0-bin-hadoop3.tgz
。
- 解压 Spark:将压缩包解压到无空格路径(如
D:\Spark\spark-3.5.0-bin-hadoop3
)。 - 配置 Spark 环境变量:
- 系统变量新增
SPARK_HOME
,值为解压路径(如D:\Spark\spark-3.5.0-bin-hadoop3
)。 - 编辑
Path
变量,新增%SPARK_HOME%\bin
和%SPARK_HOME%\sbin
。
- 系统变量新增
- 验证:打开 cmd,输入
spark-shell
(Scala 交互模式)或pyspark
(Python 交互模式),若出现 Spark Logo 且进入scala>
或>>>
命令行,即安装成功。
二、Spark 核心概念快速理解
在使用 Spark 前,需明确 3 个核心数据结构,它们是 Spark 计算的基础:
数据结构 | 特点 | 适用场景 |
---|---|---|
RDD(弹性分布式数据集) | 不可变、分布式、可分区的集合,支持基于算子的操作 | 底层抽象,适合复杂的分布式计算逻辑(如自定义函数) |
DataFrame | 结构化数据(类似数据库表),包含列名和数据类型,优化比 RDD 好 | 批处理、数据清洗、SQL 查询(最常用) |
DataSet | 结合 DataFrame 的结构化和 RDD 的类型安全(仅 Scala/Java 支持) | 强类型场景,如机器学习特征处理 |
三、Spark 核心功能实操
本节通过「Spark Shell 交互」和「提交作业」两种方式,演示 DataFrame 操作、Spark SQL 等核心功能。
3.1 方式 1:Spark Shell 实时交互(适合测试)
Spark Shell 分为 spark-shell
(Scala)和 pyspark
(Python),以下以 PySpark 为例(更易上手)。
步骤 1:启动 PySpark
打开 cmd,输入 pyspark
,自动启动 Spark 上下文(SparkContext
)和 Spark SQL 上下文(SparkSession
,核心入口)。
步骤 2:创建 DataFrame(3 种常用方式)
DataFrame 是 Spark 处理结构化数据的核心,以下是 3 种最常用的创建方式:
方式 1:从本地集合创建(测试用)
# 1. 导入 pandas(用于本地数据转换)
import pandas as pd# 2. 本地数据(列表嵌套字典)
data = [{"name": "Alice", "age": 25, "city": "Beijing"},{"name": "Bob", "age": 30, "city": "Shanghai"},{"name": "Charlie", "age": 35, "city": "Guangzhou"}
]# 3. 转换为 DataFrame(spark 是 PySpark 自动创建的 SparkSession 对象)
df = spark.createDataFrame(pd.DataFrame(data))# 4. 查看 DataFrame 结构(类似数据库表的描述)
df.printSchema()
# 输出:
# root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
# |-- city: string (nullable = true)# 5. 查看前 2 行数据
df.show(2)
# 输出:
# +-----+---+--------+
# | name|age| city|
# +-----+---+--------+
# |Alice| 25|Beijing |
# | Bob| 30|Shanghai|
# +-----+---+--------+
方式 2:从 CSV 文件读取(实际业务常用)
- 准备 CSV 文件
users.csv
,内容如下:name,age,city Alice,25,Beijing Bob,30,Shanghai Charlie,35,Guangzhou
- 在 PySpark 中读取:
# 读取 CSV(header=True 表示第一行为列名,inferSchema=True 自动推断数据类型) df_csv = spark.read.csv(path="D:/data/users.csv", # 你的 CSV 路径header=True,inferSchema=True )# 查看数据 df_csv.show()
方式 3:从 JSON 文件读取
- 准备 JSON 文件
users.json
,内容如下(每行一个 JSON 对象):{"name":"Alice","age":25,"city":"Beijing"} {"name":"Bob","age":30,"city":"Shanghai"} {"name":"Charlie","age":35,"city":"Guangzhou"}
- 在 PySpark 中读取:
df_json = spark.read.json("D:/data/users.json") df_json.show()
步骤 3:DataFrame 核心操作(转换与行动算子)
Spark 算子分为「转换算子」(延迟执行,仅记录逻辑)和「行动算子」(触发计算,返回结果)。
操作类型 | 常用算子 | 示例(基于 df_csv) |
---|---|---|
转换算子 | filter (过滤) | df_filtered = df_csv.filter(df_csv.age > 28) (筛选年龄>28的行) |
select (选择列) | df_select = df_csv.select("name", "city") (只保留 name 和 city 列) | |
withColumn (新增列) | df_new = df_csv.withColumn("age_plus_5", df_csv.age + 5) (新增年龄+5的列) | |
行动算子 | show() (查看数据) | df_filtered.show() |
count() (统计行数) | print(df_csv.count()) (输出 3) | |
collect() (获取所有数据) | data_list = df_csv.collect() (返回列表,元素为 Row 对象) |
步骤 4:Spark SQL 操作(用 SQL 查 DataFrame)
可将 DataFrame 注册为「临时视图」,然后用 SQL 语句查询,更符合传统数据分析师习惯:
# 1. 将 DataFrame 注册为临时视图(视图名:users_view)
df_csv.createOrReplaceTempView("users_view")# 2. 执行 SQL 查询(查询年龄>28的用户)
sql_result = spark.sql("SELECT name, city FROM users_view WHERE age > 28")# 3. 查看结果
sql_result.show()
# 输出:
# +-------+--------+
# | name| city|
# +-------+--------+
# | Bob|Shanghai|
# |Charlie|Guangzhou|
# +-------+--------+
步骤 5:保存 DataFrame 到文件(以 Parquet 为例)
Parquet 是 Spark 推荐的列式存储格式,压缩率高、查询快,适合长期存储:
# 保存为 Parquet 文件(mode="overwrite" 表示覆盖已有文件)
df_csv.write.mode("overwrite").parquet("D:/data/users_parquet")# 验证:读取保存的 Parquet 文件
df_parquet = spark.read.parquet("D:/data/users_parquet")
df_parquet.show()
3.2 方式 2:提交 Spark 作业(生产环境常用)
当代码调试完成后,需将其写成脚本文件,通过 spark-submit
命令提交执行(支持本地模式和集群模式)。
步骤 1:编写 PySpark 脚本(spark_demo.py
)
# 1. 导入必要模块
from pyspark.sql import SparkSession# 2. 创建 SparkSession(本地模式:local[*] 表示使用所有 CPU 核心)
spark = SparkSession.builder \.appName("SparkDemo") # 作业名称.master("local[*]") # 运行模式(本地模式).getOrCreate()# 3. 读取 CSV 文件
df = spark.read.csv(path="D:/data/users.csv",header=True,inferSchema=True
)# 4. 数据处理(筛选+新增列)
df_processed = df.filter(df.age > 25) \.withColumn("age_group", df.age.cast("string") + "岁")# 5. 保存结果到 Parquet
df_processed.write.mode("overwrite").parquet("D:/data/processed_users")# 6. 输出日志(验证执行)
print("作业执行完成!处理后的数据行数:", df_processed.count())# 7. 关闭 SparkSession
spark.stop()
步骤 2:用 spark-submit 提交作业
打开 cmd,进入脚本所在目录,执行以下命令:
spark-submit --master local[*] spark_demo.py
--master local[*]
:指定运行模式为本地(集群模式需替换为 YARN/K8s 地址)。- 执行成功后,会在
D:/data/processed_users
生成 Parquet 文件,且 cmd 输出作业执行完成!处理后的数据行数:2
。
四、经典案例:WordCount(统计文本词频)
WordCount 是大数据入门经典案例,用于统计文本中每个单词的出现次数,以下用 PySpark 实现:
步骤 1:准备文本文件 words.txt
Hello Spark
Hello PySpark
Spark is easy to learn
PySpark is fun
步骤 2:编写 WordCount 脚本(wordcount.py
)
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("WordCount") \.master("local[*]") \.getOrCreate()# 1. 读取文本文件(生成 DataFrame,列名为 value)
df_text = spark.read.text("D:/data/words.txt")# 2. 分词(用 split 函数分割单词,explode 函数将数组拆分为多行)
from pyspark.sql.functions import split, explode, lower, regexp_replace# 处理逻辑: lowercase(转小写)→ 去除标点 → 按空格分词 → 拆分行
df_words = df_text \.withColumn("lower_value", lower(df_text.value)) # 转小写.withColumn("clean_value", regexp_replace("lower_value", "[^a-z ]", "")) # 去除非字母字符.withColumn("word", split("clean_value", " ")) # 按空格分词(生成数组).withColumn("word", explode("word")) # 数组拆分为多行.filter("word != ''") # 过滤空字符串# 3. 统计词频(按 word 分组,count 计数)
word_count = df_words.groupBy("word").count().orderBy("count", ascending=False)# 4. 查看结果
word_count.show()# 5. 保存结果到 CSV
word_count.write.mode("overwrite").csv("D:/data/wordcount_result", header=True)# 关闭 SparkSession
spark.stop()
步骤 3:提交作业并查看结果
执行 spark-submit --master local[*] wordcount.py
,输出结果如下:
+------+-----+
| word|count|
+------+-----+
| spark| 2|
|pyspark| 2|
| hello| 2|
| is| 2|
| easy| 1|
| learn| 1|
| to| 1|
| fun| 1|
+------+-----+
五、常见问题与排查
-
启动 spark-shell/pyspark 报错:Java not found
→ 检查JAVA_HOME
环境变量是否配置正确,且java -version
能正常输出。 -
读取文件报错:FileNotFoundException
→ 确认文件路径是否正确(Windows 用D:/data/
而非D:\data\
,或用双反斜杠D:\\data\\
)。 -
提交作业报错:No module named ‘pandas’
→ 执行pip install pandas
安装依赖(确保是 PySpark 对应的 Python 环境)。 -
本地模式运行慢
→ 提交作业时指定更多 CPU 核心,如--master local[4]
(使用 4 核)。
六、进阶学习资源
- 官方文档:Spark 3.5.0 官方指南(最权威,含 API 文档和示例)。
- 书籍:《Spark 快速大数据分析》(《Learning Spark》),适合入门到进阶。
- 工具集成:后续可学习 Spark 与 Hadoop(HDFS 存储)、Hive(数据仓库)、Flink(流处理对比)的集成。
- 可视化:用
df.show()
查看数据,或通过df.toPandas()
转换为 pandas DataFrame 后用 Matplotlib/Seaborn 绘图。
通过本指南,你已掌握 Spark 本地环境搭建、DataFrame 核心操作、Spark SQL、作业提交等基础能力,可进一步尝试处理更大规模的数据或学习 Spark 流处理(Structured Streaming)、机器学习库(MLlib)等进阶功能。