当前位置: 首页 > news >正文

用 Spark Shell 做交互式数据分析从入门到自包含应用

为什么用 Spark Shell?

Spark Shell 既是学习 API 的最短路径,也是探索数据的强大交互工具。你可以在 Shell 里反复尝试、立即验证,然后把验证过的代码拷进正式作业或应用。

  • 支持 Scala(运行在 JVM,上手 Java 生态很方便)
  • 支持 Python(PySpark)(语法简洁、数据科学生态友好)

环境与启动方式

方式 A:使用 Spark 自带脚本

# Python
./bin/pyspark# Scala
./bin/spark-shell

方式 B:已通过 pip 安装 PySpark(推荐给数据科学/本地开发)

pip install pyspark
pyspark

小贴士:

  • 指定 Python 解释器:

    export PYSPARK_PYTHON=python3
    
  • 指定 Spark 路径(必要时):

    export SPARK_HOME=/path/to/spark
    

Spark 的核心抽象:Dataset / DataFrame

  • Dataset:分布式元素集合。
  • Python 中,由于动态类型特性,实际使用的是 Dataset[Row],通常直接称为 DataFrame(与 Pandas/R 的 DataFrame 概念一致)。

读取文本为 DataFrame

textFile = spark.read.text("README.md")  # 每行一条记录,列名为 "value"

常用 action

textFile.count()   # 行数
textFile.first()   # Row(value='...')

过滤与链式调用

linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
linesWithSpark.count()  # 例如 15# 链式:过滤后直接计数
textFile.filter(textFile.value.contains("Spark")).count()

复杂一点:统计“每行最大词数”

关键修正:在 PySpark.alias("...")(而非 .name("...")),并使用原始字符串表达正则 r"\s+",避免转义歧义。

from pyspark.sql import functions as Fresult = (textFile.select(F.size(F.split(F.col("value"), r"\s+")).alias("numWords")).agg(F.max(F.col("numWords")).alias("max_numWords")).collect()
)
result  # [Row(max_numWords=15)]

MapReduce 风格:做一次词频统计(Word Count)

from pyspark.sql import functions as FwordCounts = (textFile.select(F.explode(F.split(F.col("value"), r"\s+")).alias("word")).where(F.col("word") != "")         # 可选:剔除空字符串.groupBy("word").count()
)# 看看 Top 20
wordCounts.orderBy(F.col("count").desc()).show(20, truncate=False)# 也可以落到磁盘/HDFS
# wordCounts.write.mode("overwrite").csv("/tmp/wordcounts")

缓存与性能:cache / persist

当数据会被重复访问(比如小而热的数据集,或 PageRank 这类迭代算法),把它放进集群级内存缓存会极大提高效率。

linesWithSpark.cache()   # 默认 MEMORY_ONLY
linesWithSpark.count()   # 触发计算并物化缓存
linesWithSpark.count()   # 再次访问会更快# 指定存储级别(当数据较大时有用)
from pyspark.storagelevel import StorageLevel
linesWithSpark.persist(StorageLevel.MEMORY_AND_DISK)

实用调试:

linesWithSpark.explain()              # 物理/逻辑执行计划
wordCounts.explain("formatted")       # 更友好的格式(Spark 3+)

自包含应用:用 PySpark 编写并运行

如果你在做打包的 PySpark 应用/库,可以在 setup.py 里声明依赖:

install_requires = ["pyspark==4.0.1"  # 按你的实际版本固定
]

示例:SimpleApp.py

# SimpleApp.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fdef main():logFile = "YOUR_SPARK_HOME/README.md"  # 替换为实际文件路径spark = SparkSession.builder.appName("SimpleApp").getOrCreate()df = spark.read.text(logFile).cache()numAs = df.filter(F.col("value").contains("a")).count()numBs = df.filter(F.col("value").contains("b")).count()print("Lines with a: %i, lines with b: %i" % (numAs, numBs))spark.stop()if __name__ == "__main__":main()

运行方式

首选:spark-submit

$ ${SPARK_HOME}/bin/spark-submit \--master "local[4]" \SimpleApp.py# 输出示例:
# Lines with a: 46, lines with b: 23

如果已用 pip 安装 PySpark(可直接用 Python 运行):

$ python SimpleApp.py
# Lines with a: 46, lines with b: 23

引入第三方代码/依赖时,可用 --py-files 传入 zip 包;引入连接器/JAR 时可用:

spark-submit --packages groupId:artifactId:version SimpleApp.py

去哪里继续深入?

  • RDD 编程指南Spark SQL 编程指南:系统理解 API 与最佳实践

  • 部署指南:把应用跑在真实集群(Standalone / YARN / Kubernetes)

  • examples 目录 含多语言示例:

    # Python
    ./bin/spark-submit examples/src/main/python/pi.py# Scala / Java
    ./bin/run-example SparkPi# R
    ./bin/spark-submit examples/src/main/r/dataframe.R
    

常见坑与实战建议(强烈建议一读)

  1. PySpark 别用 .name(...) → 用 .alias("...")
  2. 正则用原始字符串r"\s+",避免 "\s" 被当作普通 s
  3. 谨慎 collect() → 仅用于小结果;大结果用 show()/take(n) 或直接写出到存储。
  4. 只缓存“热数据” → 先 explain() 看执行计划、用 UI 观察 DAG 与缓存命中,不要“见谁 cache 谁”。
  5. 路径要清晰 → 本地绝对路径 vs HDFS/S3 URI(hdfs://s3a://)。
  6. 并行度与分区repartition(n) 提升 shuffle 后并行度;小结果落盘可 coalesce(1) 合并为单文件(仅适用于小数据)。
  7. 版本统一 → PySpark 版本与集群 Spark 版本尽量匹配,避免协议/计划不兼容。
http://www.dtcms.com/a/564528.html

相关文章:

  • WindowsXP Window7等老系统和Linux Ubuntu等系统在VM虚拟机中安装VM Toools工具实现宿主机虚拟机共用粘贴板
  • 第十二章:终极叩问:我是谁,我往何方?(3)
  • 校园网站建设的缺陷怎么做考试资料网站
  • 【Android Studio】webview 组件在android TV中进行加载,始终是客户端网页的方式进行加载,解决?
  • 应对不规则负载的异步ML模型服务AWS架构设计
  • Docker、Kubernetes与AWS中控机是什么?
  • AWS Bedrock + DeepSeek-R1:开启企业级 AI 开发的新篇章
  • C++ 类似pytorch的库,工具包,或者机器学习的生态
  • 关于手表的网站精品课程网站的建设
  • 正点原子【第四期】Linux之驱动开发学习笔记-10.1 Linux 内核定时器实验
  • Go语言设计模式:命令模式详解
  • Dropout提升模型泛化能力【动手学深度学习:PyTorch版 4.6 暂退法】
  • 网站开发用什么软件有哪些安徽安庆
  • 能够沟通业务的网站彩票网站开发 违法
  • 【机器学习13】异常检测优化、推荐系统、协同过滤
  • can‘t read /etc/apt/sources.list: No such file or directory
  • 深入理解 DNS 与 ICMP:网络世界的地址解析与连通性探测
  • MCU中的RC电路(Resistor-Capacitor Circuit)
  • Flink SQL 调优
  • CISP-PTE认证考试靶场
  • RDPWD!MCSAttachUserRequest函数分析之RDPWD!Domain结构中的ChannelList和UserAttachmentList
  • 细数Java中List的10个坑
  • 泉州手机网站开发怎么看一个网站是什么程序做的
  • PyTorch图像分割训练全流程解析
  • 无人机 - 关于无人机电池
  • 音视频播放的核心处理流程
  • 基于EasyExcel实现Excel导出功能
  • 【SpringBoot】31 核心功能 - 单元测试 - JUnit5 单元测试中的断言机制——验证你的代码是否按预期执行了
  • kafka问题解决
  • Parasoft C/C++test如何在CCS3环境下进行F2812项目的单元测试