当前位置: 首页 > news >正文

在IPython和PyCharm里通过PySpark实现词频统计

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 在虚拟机的IPython里玩PySpark
      • 2.1.1 更换pip3源为阿里源
      • 2.1.2 安装IPython库
      • 2.1.3 安装findspark库
      • 2.1.4 安装pyspark库
      • 2.1.5 在IPython里玩Spark
    • 2.2 在宿主机的PyCharm里玩PySpark
      • 2.2.1 更换pip3源为阿里源
      • 2.2.2 安装IPython库
      • 2.2.3 安装findspark库
      • 2.2.4 安装pyspark库
      • 2.2.5 在PyCharm里玩Spark
  • 3. 实战总结

1. 实战概述

  • 本次实战围绕 PySpark 的本地与远程开发环境搭建及词频统计应用展开。通过在虚拟机中配置 IPython 并结合 findspark 与 pyspark 库,分别使用 RDD 和 Spark SQL 实现了对本地及 HDFS 文件的词频统计;同时,在宿主机 PyCharm 中远程连接虚拟机,完成相同功能的脚本开发与运行,验证了 PySpark 在不同开发模式下的灵活性与一致性。

2. 实战步骤

2.1 在虚拟机的IPython里玩PySpark

2.1.1 更换pip3源为阿里源

  • 执行命令:mkdir ~/.pip
    在这里插入图片描述

  • 执行命令:vim ~/.pip/pip.conf
    在这里插入图片描述

    [global]
    index-url=http://mirrors.aliyun.com/pypi/simple
    [install]
    trusted-host=mirrors.aliyun.com
    

2.1.2 安装IPython库

  • 执行命令:pip3 install ipython
    在这里插入图片描述

2.1.3 安装findspark库

  • 执行命令:pip3 install findspark
    在这里插入图片描述

2.1.4 安装pyspark库

  • 执行命令:pip3 install pyspark
    在这里插入图片描述

2.1.5 在IPython里玩Spark

  1. 启动ipython,导入findspark库,初始化,导入pyspark库,定义sc变量
    在这里插入图片描述

  2. 使用PySpark基于Spark RDD实现词频统计

    • 数据源为本地文件:/root/words.txt

      for wc in sc.textFile("/root/words.txt") \.flatMap(lambda line: line.split(' ')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b) \.sortBy(lambda wc: wc[1], False) \.collect():print(wc)
      

      在这里插入图片描述

    • 数据源为HDFS文件:hdfs://master:9000/wordcount/input/test.txt

      for wc in sc.textFile("hdfs://master:9000/wordcount/input/test.txt") \.flatMap(lambda line: line.split(' ')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b) \.sortBy(lambda wc: wc[1], False) \.collect():print(wc)
      

      在这里插入图片描述

  3. 使用PySpark基于Spark SQL实现词频统计

    • 导入Spark会话,执行命令:from pyspark.sql import SparkSession
      在这里插入图片描述

    • 创建Spark会话对象,执行命令:spark = SparkSession.builder.appName('wc').getOrCreate()
      在这里插入图片描述

    • 读取HDFS文件生成数据帧,执行命令:lines_df = spark.read.text("hdfs://master:9000/wordcount/input/test.txt")
      在这里插入图片描述

    • 基于数据帧创建临时视图,执行命令:lines_df.createOrReplaceTempView("lines")
      在这里插入图片描述

    • 使用Spark SQL编写词频统计查询

      result = spark.sql("""SELECT word, COUNT(*) AS countFROM (SELECT explode(split(value, ' ')) AS wordFROM lines) tWHERE word != '' AND word IS NOT NULLGROUP BY wordORDER BY count DESC
      """)
      

      在这里插入图片描述

    • 执行命令:result.show()
      在这里插入图片描述

    • 收集并逐行打印结果

      for row in result.collect():print(f"('{row[0]}', {row[1]})")	
      

      在这里插入图片描述

2.2 在宿主机的PyCharm里玩PySpark

2.2.1 更换pip3源为阿里源

  • pip初始化文件:C:\Users\Administrator\pip\pip.ini
    在这里插入图片描述

2.2.2 安装IPython库

  • 执行命令:pip3 install ipython
    在这里插入图片描述

2.2.3 安装findspark库

  • 执行命令:pip3 install findspark
    在这里插入图片描述

2.2.4 安装pyspark库

  • 执行命令:pip3 install pyspark
    在这里插入图片描述

2.2.5 在PyCharm里玩Spark

  1. 使用Spark RDD实现词频统计

    • 创建Spark RDD词频统计.py文件
      在这里插入图片描述

      """
      功能:采用Spark RDD实现词频统计
      作者:华卫
      日期:2025年11月11日
      """import findsparkfindspark.init()
      from pyspark import SparkContext# ==============================
      # 1. 初始化 SparkContext
      # ==============================
      # 创建 Spark 上下文,指定应用名称为 "WordCount"
      # 注意:在本地测试时,若未配置 HDFS,可改用本地文件路径
      sc = SparkContext(appName="WordCount")try:# ==============================# 2. 读取输入文件(HDFS 路径)# ==============================# 从 HDFS 读取文本文件,返回一个 RDD,每个元素是一行字符串lines_rdd = sc.textFile("hdfs://master:9000/wordcount/input/test.txt")# ==============================# 3. 拆分每行为单词(扁平化)# ==============================# 对每一行使用 split(' ') 切分为单词列表,# flatMap 将所有列表“拍平”成一个单词的 RDDwords_rdd = lines_rdd.flatMap(lambda line: line.split(' '))# ==============================# 4. 映射为 (word, 1) 键值对# ==============================# 将每个单词转换为元组 (word, 1),用于后续计数word_pairs_rdd = words_rdd.map(lambda word: (word, 1))# ==============================# 5. 按键聚合,统计词频# ==============================# 对相同的 key(即单词)进行 reduce,累加其 value(即出现次数)word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)# ==============================# 6. 按词频降序排序# ==============================# sortBy 接收一个函数,提取每个元素的排序依据(这里是词频 wc[1])# ascending=False 表示降序(从高到低)sorted_word_counts_rdd = word_counts_rdd.sortBy(lambda wc: wc[1], ascending=False)# ==============================# 7. 收集结果并打印# ==============================# collect() 将分布式 RDD 的所有数据拉取到 Driver 端(仅适用于小结果集!)result = sorted_word_counts_rdd.collect()print("==词频统计结果(按频率降序)==")for word, count in result:print(f"{word}: {count}")finally:# ==============================# 8. 关闭 SparkContext(释放资源)# ==============================sc.stop()
      
    • 配置Python解释器(远程连接master虚拟机,使用Python3.7.7解释器)
      在这里插入图片描述

    • 运行程序,查看结果
      在这里插入图片描述

  2. 使用Spark SQL实现词频统计

    • 创建Spark SQL词频统计.py文件
      在这里插入图片描述

      """
      功能:采用 Spark SQL(基于临时视图 + SQL 查询)实现词频统计
      作者:华卫
      日期:2025年11月11日
      """import findspark
      findspark.init()
      from pyspark.sql import SparkSession# ==============================
      # 初始化 SparkSession
      # ==============================
      spark = SparkSession.builder \.appName("WordCountSQLWithView") \.getOrCreate()try:# ########################################################## 1. 读取HDFS文件生成数据帧# - 每行一个字符串,列名为 "value"# #########################################################lines_df = spark.read.text("hdfs://master:9000/wordcount/input/test.txt")# 此时 lines_df 包含一列:value(类型为 string)# ########################################################## 2. 基于数据帧创建临时视图,供SQL查询使用# #########################################################lines_df.createOrReplaceTempView("lines")# ########################################################## 3. 使用Spark SQL编写词频统计查询# - 拆分单词、炸裂数组、过滤空值和NULL、分组计数、降序排序# #########################################################result = spark.sql("""SELECT word, COUNT(*) AS countFROM (SELECT explode(split(value, ' ')) AS wordFROM lines) tWHERE word != '' AND word IS NOT NULLGROUP BY wordORDER BY count DESC""")result.show() # 表格形式输出结果# ########################################################## 4. 收集并逐行打印结果# - 格式:('word', count)# #########################################################print("==词频统计结果(按频率降序)==")for row in result.collect():print(f"('{row[0]}', {row[1]})")finally:# 关闭 SparkSessionspark.stop()
      
    • 运行程序,查看结果
      在这里插入图片描述

3. 实战总结

  • 本次实战系统地完成了 PySpark 开发环境的搭建与词频统计任务的实现。首先在虚拟机中配置阿里源,安装 IPython、findspark 和 pyspark,并通过 IPython 交互式环境成功运行基于 RDD 和 Spark SQL 的词频统计代码,验证了对本地文件和 HDFS 文件的处理能力。随后,在宿主机的 PyCharm 中配置远程 Python 解释器,连接虚拟机 Spark 环境,编写结构清晰的脚本分别使用 RDD 和 Spark SQL 实现相同功能,结果一致且运行稳定。整个过程不仅掌握了 PySpark 的核心 API 使用方法,也熟悉了本地开发与远程集群协同的工作模式,为后续大数据应用开发奠定了坚实基础。
http://www.dtcms.com/a/596380.html

相关文章:

  • 03-node.js webpack
  • 维护_其它进程间通信(IPC Inter-Process communication)和分布式通信框架列述
  • 【大模型训练】roll 调用megatron 计算损失函数有,会用到partial
  • 使用nestjs/cli创建nest.js新项目
  • 广州外贸网站建设公司平面设计主要做什么工资多少
  • 广东省建设工程交易中心网站网站关键词不稳定
  • 组建网站需多少钱微信网站模板
  • jfinal 支持mysql的json字段类型解决方案
  • Excel处理控件Aspose.Cells教程:如何使用C#在Excel中添加、编辑和更新切片器
  • Java 在 Excel 文件中添加或删除分节符
  • 电子电气架构 --- 车载OTA功能
  • Chrome HSTS(HTTP Strict Transport Security)
  • 【项目亮点】基于EasyExcel + 线程池解决POI文件导出时的内存溢出及超时问题
  • 【C++】链表算法习题
  • 搭建智能问答系统需要什么文档解析工具?
  • 【C++】(以及大多数编程语言)中常见的 六种基本位运算操作
  • (129页PPT)罗兰贝格银行风险预警管理体系规划(附下载方式)
  • 建设银行网站可以更改个人电话网址大全域名解析
  • 增删查改(其一) —— insert插入 与 select条件查询
  • JuiceSSH+cpolar解锁手机远程Linux新姿势,无需公网IP,固定地址稳定用
  • 传统生产制造企业手写单据数字化落地:旗讯 OCR 的技术实现与系统对接方案
  • 如何添加网站白名单广州建设网站的公司
  • nnUNet 训练与推理命令操作记录
  • 【C#】从一次异步锁逐渐展开浅谈服务器架构解决重复编码问题,我与AI的一次深度讨论得出的一些解决方案
  • PKHeX 宝可梦存档编辑工具 用户可自由修改宝可梦属性、技能、道具、图鉴完成度等信息
  • 深度解析:环形链表——手撕面试经典题
  • elasticsearch集群访问中的通信问题
  • 西安模板网站建设套餐佛山做网站费用
  • 什么是RKNN?
  • 《智元启示录》升级说明:从「AI 思考集」到「AI 决策内参」