当前位置: 首页 > news >正文

Apache Spark 上手指南(基于 Spark 3.5.0 稳定版)

Apache Spark 上手指南(基于 Spark 3.5.0 稳定版)

本指南旨在帮助零基础用户从环境搭建核心功能实操,逐步掌握 Spark 的基础使用,所有步骤均经过验证,可直接落地执行。Spark 是基于内存的分布式计算框架,核心用于大数据的批处理、流处理、机器学习和图计算,本指南聚焦最常用的批处理与 Spark SQL 能力。

一、前置知识与环境准备

在安装 Spark 前,需先配置依赖环境,Spark 依赖 Java 环境(推荐 JDK 11,兼容绝大多数版本),且支持 Scala(原生语言)、Python(PySpark)、Java 开发。

1.1 依赖环境安装(以 Windows 10/11 为例,Linux/macOS 步骤类似)

步骤 1:安装 JDK 11
  1. 下载 JDK 11:访问 Oracle 官网(需注册账号),选择对应系统的 jdk-11.x.x_windows-x64_bin.exe(或 .tar.gz)。
  2. 安装 JDK:双击安装,建议安装路径无空格(如 D:\Java\jdk11)。
  3. 配置环境变量:
    • 右键「此电脑」→「属性」→「高级系统设置」→「环境变量」。
    • 系统变量中新增 JAVA_HOME,值为 JDK 安装路径(如 D:\Java\jdk11)。
    • 编辑 Path 变量,新增 %JAVA_HOME%\bin%JAVA_HOME%\jre\bin(若 JDK 11 无 jre 目录,可在 cmd 中执行 jlink --module-path jmods --add-modules java.desktop --output jre 生成)。
  4. 验证:打开 cmd,输入 java -version,输出 java version "11.x.x" 即成功。
步骤 2:(可选)安装 Python(用于 PySpark)
  1. 下载 Python 3.8+:访问 Python 官网,选择 Windows 安装包(勾选「Add Python 3.x to PATH」)。
  2. 验证:cmd 输入 python --version,输出 Python 3.x.x 即成功。
  3. 安装 PySpark 依赖:cmd 执行 pip install pyspark pandas(pandas 用于 DataFrame 与本地数据交互)。
步骤 3:(可选)安装 Scala(用于 Scala 开发)
  1. 下载 Scala 2.12.x(Spark 3.x 兼容 Scala 2.12,不兼容 2.13):访问 Scala 官网,选择 scala-2.12.18.msi
  2. 安装:路径建议无空格(如 D:\Scala\scala-2.12.18),安装时自动配置环境变量。
  3. 验证:cmd 输入 scala -version,输出 Scala code runner version 2.12.18 即成功。

1.2 安装 Spark 3.5.0(本地模式)

本地模式无需集群,适合新手测试代码,步骤如下:

  1. 下载 Spark:访问 Spark 官网,按以下配置选择:
    • Spark Release:3.5.0(稳定版)
    • Package Type:Pre-built for Apache Hadoop 3.3 and later(无需单独装 Hadoop)
    • Download Spark:点击链接下载 spark-3.5.0-bin-hadoop3.tgz
  2. 解压 Spark:将压缩包解压到无空格路径(如 D:\Spark\spark-3.5.0-bin-hadoop3)。
  3. 配置 Spark 环境变量:
    • 系统变量新增 SPARK_HOME,值为解压路径(如 D:\Spark\spark-3.5.0-bin-hadoop3)。
    • 编辑 Path 变量,新增 %SPARK_HOME%\bin%SPARK_HOME%\sbin
  4. 验证:打开 cmd,输入 spark-shell(Scala 交互模式)或 pyspark(Python 交互模式),若出现 Spark Logo 且进入 scala>>>> 命令行,即安装成功。

二、Spark 核心概念快速理解

在使用 Spark 前,需明确 3 个核心数据结构,它们是 Spark 计算的基础:

数据结构特点适用场景
RDD(弹性分布式数据集)不可变、分布式、可分区的集合,支持基于算子的操作底层抽象,适合复杂的分布式计算逻辑(如自定义函数)
DataFrame结构化数据(类似数据库表),包含列名和数据类型,优化比 RDD 好批处理、数据清洗、SQL 查询(最常用)
DataSet结合 DataFrame 的结构化和 RDD 的类型安全(仅 Scala/Java 支持)强类型场景,如机器学习特征处理

三、Spark 核心功能实操

本节通过「Spark Shell 交互」和「提交作业」两种方式,演示 DataFrame 操作、Spark SQL 等核心功能。

3.1 方式 1:Spark Shell 实时交互(适合测试)

Spark Shell 分为 spark-shell(Scala)和 pyspark(Python),以下以 PySpark 为例(更易上手)。

步骤 1:启动 PySpark

打开 cmd,输入 pyspark,自动启动 Spark 上下文(SparkContext)和 Spark SQL 上下文(SparkSession,核心入口)。

步骤 2:创建 DataFrame(3 种常用方式)

DataFrame 是 Spark 处理结构化数据的核心,以下是 3 种最常用的创建方式:

方式 1:从本地集合创建(测试用)
# 1. 导入 pandas(用于本地数据转换)
import pandas as pd# 2. 本地数据(列表嵌套字典)
data = [{"name": "Alice", "age": 25, "city": "Beijing"},{"name": "Bob", "age": 30, "city": "Shanghai"},{"name": "Charlie", "age": 35, "city": "Guangzhou"}
]# 3. 转换为 DataFrame(spark 是 PySpark 自动创建的 SparkSession 对象)
df = spark.createDataFrame(pd.DataFrame(data))# 4. 查看 DataFrame 结构(类似数据库表的描述)
df.printSchema()
# 输出:
# root
#  |-- name: string (nullable = true)
#  |-- age: long (nullable = true)
#  |-- city: string (nullable = true)# 5. 查看前 2 行数据
df.show(2)
# 输出:
# +-----+---+--------+
# | name|age|    city|
# +-----+---+--------+
# |Alice| 25|Beijing |
# |  Bob| 30|Shanghai|
# +-----+---+--------+
方式 2:从 CSV 文件读取(实际业务常用)
  1. 准备 CSV 文件 users.csv,内容如下:
    name,age,city
    Alice,25,Beijing
    Bob,30,Shanghai
    Charlie,35,Guangzhou
    
  2. 在 PySpark 中读取:
    # 读取 CSV(header=True 表示第一行为列名,inferSchema=True 自动推断数据类型)
    df_csv = spark.read.csv(path="D:/data/users.csv",  # 你的 CSV 路径header=True,inferSchema=True
    )# 查看数据
    df_csv.show()
    
方式 3:从 JSON 文件读取
  1. 准备 JSON 文件 users.json,内容如下(每行一个 JSON 对象):
    {"name":"Alice","age":25,"city":"Beijing"}
    {"name":"Bob","age":30,"city":"Shanghai"}
    {"name":"Charlie","age":35,"city":"Guangzhou"}
    
  2. 在 PySpark 中读取:
    df_json = spark.read.json("D:/data/users.json")
    df_json.show()
    
步骤 3:DataFrame 核心操作(转换与行动算子)

Spark 算子分为「转换算子」(延迟执行,仅记录逻辑)和「行动算子」(触发计算,返回结果)。

操作类型常用算子示例(基于 df_csv)
转换算子filter(过滤)df_filtered = df_csv.filter(df_csv.age > 28)(筛选年龄>28的行)
select(选择列)df_select = df_csv.select("name", "city")(只保留 name 和 city 列)
withColumn(新增列)df_new = df_csv.withColumn("age_plus_5", df_csv.age + 5)(新增年龄+5的列)
行动算子show()(查看数据)df_filtered.show()
count()(统计行数)print(df_csv.count())(输出 3)
collect()(获取所有数据)data_list = df_csv.collect()(返回列表,元素为 Row 对象)
步骤 4:Spark SQL 操作(用 SQL 查 DataFrame)

可将 DataFrame 注册为「临时视图」,然后用 SQL 语句查询,更符合传统数据分析师习惯:

# 1. 将 DataFrame 注册为临时视图(视图名:users_view)
df_csv.createOrReplaceTempView("users_view")# 2. 执行 SQL 查询(查询年龄>28的用户)
sql_result = spark.sql("SELECT name, city FROM users_view WHERE age > 28")# 3. 查看结果
sql_result.show()
# 输出:
# +-------+--------+
# |   name|    city|
# +-------+--------+
# |    Bob|Shanghai|
# |Charlie|Guangzhou|
# +-------+--------+
步骤 5:保存 DataFrame 到文件(以 Parquet 为例)

Parquet 是 Spark 推荐的列式存储格式,压缩率高、查询快,适合长期存储:

# 保存为 Parquet 文件(mode="overwrite" 表示覆盖已有文件)
df_csv.write.mode("overwrite").parquet("D:/data/users_parquet")# 验证:读取保存的 Parquet 文件
df_parquet = spark.read.parquet("D:/data/users_parquet")
df_parquet.show()

3.2 方式 2:提交 Spark 作业(生产环境常用)

当代码调试完成后,需将其写成脚本文件,通过 spark-submit 命令提交执行(支持本地模式和集群模式)。

步骤 1:编写 PySpark 脚本(spark_demo.py
# 1. 导入必要模块
from pyspark.sql import SparkSession# 2. 创建 SparkSession(本地模式:local[*] 表示使用所有 CPU 核心)
spark = SparkSession.builder \.appName("SparkDemo")  # 作业名称.master("local[*]")    # 运行模式(本地模式).getOrCreate()# 3. 读取 CSV 文件
df = spark.read.csv(path="D:/data/users.csv",header=True,inferSchema=True
)# 4. 数据处理(筛选+新增列)
df_processed = df.filter(df.age > 25) \.withColumn("age_group", df.age.cast("string") + "岁")# 5. 保存结果到 Parquet
df_processed.write.mode("overwrite").parquet("D:/data/processed_users")# 6. 输出日志(验证执行)
print("作业执行完成!处理后的数据行数:", df_processed.count())# 7. 关闭 SparkSession
spark.stop()
步骤 2:用 spark-submit 提交作业

打开 cmd,进入脚本所在目录,执行以下命令:

spark-submit --master local[*] spark_demo.py
  • --master local[*]:指定运行模式为本地(集群模式需替换为 YARN/K8s 地址)。
  • 执行成功后,会在 D:/data/processed_users 生成 Parquet 文件,且 cmd 输出 作业执行完成!处理后的数据行数:2

四、经典案例:WordCount(统计文本词频)

WordCount 是大数据入门经典案例,用于统计文本中每个单词的出现次数,以下用 PySpark 实现:

步骤 1:准备文本文件 words.txt

Hello Spark
Hello PySpark
Spark is easy to learn
PySpark is fun

步骤 2:编写 WordCount 脚本(wordcount.py

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("WordCount") \.master("local[*]") \.getOrCreate()# 1. 读取文本文件(生成 DataFrame,列名为 value)
df_text = spark.read.text("D:/data/words.txt")# 2. 分词(用 split 函数分割单词,explode 函数将数组拆分为多行)
from pyspark.sql.functions import split, explode, lower, regexp_replace# 处理逻辑: lowercase(转小写)→ 去除标点 → 按空格分词 → 拆分行
df_words = df_text \.withColumn("lower_value", lower(df_text.value))  # 转小写.withColumn("clean_value", regexp_replace("lower_value", "[^a-z ]", ""))  # 去除非字母字符.withColumn("word", split("clean_value", " "))  # 按空格分词(生成数组).withColumn("word", explode("word"))  # 数组拆分为多行.filter("word != ''")  # 过滤空字符串# 3. 统计词频(按 word 分组,count 计数)
word_count = df_words.groupBy("word").count().orderBy("count", ascending=False)# 4. 查看结果
word_count.show()# 5. 保存结果到 CSV
word_count.write.mode("overwrite").csv("D:/data/wordcount_result", header=True)# 关闭 SparkSession
spark.stop()

步骤 3:提交作业并查看结果

执行 spark-submit --master local[*] wordcount.py,输出结果如下:

+------+-----+
|  word|count|
+------+-----+
| spark|    2|
|pyspark|    2|
| hello|    2|
|    is|    2|
|  easy|    1|
|  learn|    1|
|    to|    1|
|   fun|    1|
+------+-----+

五、常见问题与排查

  1. 启动 spark-shell/pyspark 报错:Java not found
    → 检查 JAVA_HOME 环境变量是否配置正确,且 java -version 能正常输出。

  2. 读取文件报错:FileNotFoundException
    → 确认文件路径是否正确(Windows 用 D:/data/ 而非 D:\data\,或用双反斜杠 D:\\data\\)。

  3. 提交作业报错:No module named ‘pandas’
    → 执行 pip install pandas 安装依赖(确保是 PySpark 对应的 Python 环境)。

  4. 本地模式运行慢
    → 提交作业时指定更多 CPU 核心,如 --master local[4](使用 4 核)。

六、进阶学习资源

  1. 官方文档:Spark 3.5.0 官方指南(最权威,含 API 文档和示例)。
  2. 书籍:《Spark 快速大数据分析》(《Learning Spark》),适合入门到进阶。
  3. 工具集成:后续可学习 Spark 与 Hadoop(HDFS 存储)、Hive(数据仓库)、Flink(流处理对比)的集成。
  4. 可视化:用 df.show() 查看数据,或通过 df.toPandas() 转换为 pandas DataFrame 后用 Matplotlib/Seaborn 绘图。

通过本指南,你已掌握 Spark 本地环境搭建、DataFrame 核心操作、Spark SQL、作业提交等基础能力,可进一步尝试处理更大规模的数据或学习 Spark 流处理(Structured Streaming)、机器学习库(MLlib)等进阶功能。

http://www.dtcms.com/a/465458.html

相关文章:

  • COA学习,Chain of Agents
  • winform本地上位机-ModbusRTC1.上位机控制台与数据监控(数据监控架构思维与图表系列)
  • 如何建立“长期主义+短期收益”并存的商业闭环?
  • 敏捷管理之看板方法:可视化管理的流程设计与优化技巧
  • Linux学习笔记--查询_唤醒方式读取输入数据
  • 信道编码定理和信道编码逆定理
  • 订餐网站开发流程wordpress显示运行时间
  • ubuntu 24.04 FFmpeg编译 带Nvidia 加速记录
  • 关于springboot定时任务和websocket的思考
  • 做文字logo的网站我国网络营销现状分析
  • STM32F103RCT6+STM32CubeMX+keil5(MDK-ARM)+Flymcu实现简单的通信协议
  • 昂瑞微:踏浪前行,铸就射频芯片领域新辉煌
  • Roo Code系统提示覆写功能详解
  • 时钟周期约束(三)
  • 基于Hadoop的京东电商平台手机推荐系统的设计与实现
  • 没有logo可以做网站的设计吗卡密网站怎么做
  • 做侵权视频网站网站规划问题
  • 鸿蒙:用Toggle组件实现选择框、开关样式
  • html css js网页制作成品——YSL口红红色 html+css (6 页)(老版)附源码
  • CSS中的选择器有哪些?相对定位和绝对定位是相对于谁的?
  • 发布企业信息的网站大连推广
  • 详解istio mtls双向身份认证
  • 国外创意网站公司如何进行网络推广
  • 软考中级习题与解答——第十五章_数据结构与算法应用(2)
  • 在JavaScript / HTML / Node.js中,post方式的Content-Type属性的text的三种编码
  • Linux gdb
  • 源码阅读 LeakCanary
  • Java 网络流式编程
  • java后端工程师进修ing(研一版‖day51)
  • JavaScript Promise 详解:从入门到精通