用 Spark Shell 做交互式数据分析从入门到自包含应用
为什么用 Spark Shell?
Spark Shell 既是学习 API 的最短路径,也是探索数据的强大交互工具。你可以在 Shell 里反复尝试、立即验证,然后把验证过的代码拷进正式作业或应用。
- 支持 Scala(运行在 JVM,上手 Java 生态很方便)
 - 支持 Python(PySpark)(语法简洁、数据科学生态友好)
 
环境与启动方式
方式 A:使用 Spark 自带脚本
# Python
./bin/pyspark# Scala
./bin/spark-shell
方式 B:已通过 pip 安装 PySpark(推荐给数据科学/本地开发)
pip install pyspark
pyspark
小贴士:
指定 Python 解释器:
export PYSPARK_PYTHON=python3指定 Spark 路径(必要时):
export SPARK_HOME=/path/to/spark
Spark 的核心抽象:Dataset / DataFrame
- Dataset:分布式元素集合。
 - 在 Python 中,由于动态类型特性,实际使用的是 
Dataset[Row],通常直接称为 DataFrame(与 Pandas/R 的 DataFrame 概念一致)。 
读取文本为 DataFrame
textFile = spark.read.text("README.md")  # 每行一条记录,列名为 "value"
常用 action
textFile.count()   # 行数
textFile.first()   # Row(value='...')
过滤与链式调用
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
linesWithSpark.count()  # 例如 15# 链式:过滤后直接计数
textFile.filter(textFile.value.contains("Spark")).count()
复杂一点:统计“每行最大词数”
关键修正:在 PySpark 用
.alias("...")(而非.name("...")),并使用原始字符串表达正则r"\s+",避免转义歧义。
from pyspark.sql import functions as Fresult = (textFile.select(F.size(F.split(F.col("value"), r"\s+")).alias("numWords")).agg(F.max(F.col("numWords")).alias("max_numWords")).collect()
)
result  # [Row(max_numWords=15)]
MapReduce 风格:做一次词频统计(Word Count)
from pyspark.sql import functions as FwordCounts = (textFile.select(F.explode(F.split(F.col("value"), r"\s+")).alias("word")).where(F.col("word") != "")         # 可选:剔除空字符串.groupBy("word").count()
)# 看看 Top 20
wordCounts.orderBy(F.col("count").desc()).show(20, truncate=False)# 也可以落到磁盘/HDFS
# wordCounts.write.mode("overwrite").csv("/tmp/wordcounts")
缓存与性能:cache / persist
当数据会被重复访问(比如小而热的数据集,或 PageRank 这类迭代算法),把它放进集群级内存缓存会极大提高效率。
linesWithSpark.cache()   # 默认 MEMORY_ONLY
linesWithSpark.count()   # 触发计算并物化缓存
linesWithSpark.count()   # 再次访问会更快# 指定存储级别(当数据较大时有用)
from pyspark.storagelevel import StorageLevel
linesWithSpark.persist(StorageLevel.MEMORY_AND_DISK)
实用调试:
linesWithSpark.explain()              # 物理/逻辑执行计划
wordCounts.explain("formatted")       # 更友好的格式(Spark 3+)
自包含应用:用 PySpark 编写并运行
如果你在做打包的 PySpark 应用/库,可以在 setup.py 里声明依赖:
install_requires = ["pyspark==4.0.1"  # 按你的实际版本固定
]
示例:SimpleApp.py
# SimpleApp.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fdef main():logFile = "YOUR_SPARK_HOME/README.md"  # 替换为实际文件路径spark = SparkSession.builder.appName("SimpleApp").getOrCreate()df = spark.read.text(logFile).cache()numAs = df.filter(F.col("value").contains("a")).count()numBs = df.filter(F.col("value").contains("b")).count()print("Lines with a: %i, lines with b: %i" % (numAs, numBs))spark.stop()if __name__ == "__main__":main()
运行方式
首选:spark-submit
$ ${SPARK_HOME}/bin/spark-submit \--master "local[4]" \SimpleApp.py# 输出示例:
# Lines with a: 46, lines with b: 23
如果已用 pip 安装 PySpark(可直接用 Python 运行):
$ python SimpleApp.py
# Lines with a: 46, lines with b: 23
引入第三方代码/依赖时,可用
--py-files传入 zip 包;引入连接器/JAR 时可用:spark-submit --packages groupId:artifactId:version SimpleApp.py
去哪里继续深入?
- 
RDD 编程指南 与 Spark SQL 编程指南:系统理解 API 与最佳实践
 - 
部署指南:把应用跑在真实集群(Standalone / YARN / Kubernetes)
 - 
examples 目录 含多语言示例:
# Python ./bin/spark-submit examples/src/main/python/pi.py# Scala / Java ./bin/run-example SparkPi# R ./bin/spark-submit examples/src/main/r/dataframe.R 
常见坑与实战建议(强烈建议一读)
- PySpark 别用 
.name(...)→ 用.alias("...")。 - 正则用原始字符串 → 
r"\s+",避免"\s"被当作普通s。 - 谨慎 
collect()→ 仅用于小结果;大结果用show()/take(n)或直接写出到存储。 - 只缓存“热数据” → 先 
explain()看执行计划、用 UI 观察 DAG 与缓存命中,不要“见谁 cache 谁”。 - 路径要清晰 → 本地绝对路径 vs HDFS/S3 URI(
hdfs://、s3a://)。 - 并行度与分区 → 
repartition(n)提升 shuffle 后并行度;小结果落盘可coalesce(1)合并为单文件(仅适用于小数据)。 - 版本统一 → PySpark 版本与集群 Spark 版本尽量匹配,避免协议/计划不兼容。
 
