当前位置: 首页 > news >正文

pyspark 数据处理的三种方式RDD、DataFrame、Spark SQL案例

目录

  • 一、浅语
  • 二、三种数据处理方式比较
    • 2.1 RDD
    • 2.2 DataFrame
    • 2.3 Spark SQL
  • 三、三种方法的创建方式
    • 3.1 创建RDD
    • 3.2 创建DataFrame
      • 3.2.1 创建sqlContext
      • 3.2.2 定义Schema
      • 3.2.3 创建DataFrame
    • 3.3 创建SparkSQL
      • 3.3.1 登录临时表
      • 3.3.2 使用sparkSQL
  • 四、三种方法显示部分字段
    • 4.1 使用RDD选取显示部分字段
    • 4.2 使用DataFrame选取显示字段
      • 4.2.1 select方法输入[字段名]
      • 4.2.2 select方法输入[dataframe名称].[字段名]
      • 4.2.3 select方法输入[dataframe别名].[字段名]
      • 4.2.4 通过dataframe中括号方式
    • 4.3 使用SparkSQL选取显示字段
  • 五、三种方法增加计算字段
    • 5.1 RDD增加计算字段
    • 5.2 DataFrame增加计算字段
    • 5.3 SparkSQL增加计算字段
  • 六、三种方法筛选数据
    • 6.1 RDD筛选数据
    • 6.2 DataFrame筛选数据
      • 6.2.1 使用多个filter
      • 6.2.2 使用单个filter
      • 6.2.3 使用[dataframe 名称].[字段名]指定条件
      • 6.2.4 使用[]指定筛选条件
    • 6.3 SparkSQL筛选数据
  • 七、三种方法按单个字段给数据排序
    • 7.1 RDD:takeOrdered方法
    • 7.2 DataFrame
    • 7.3 SQparkSQL
  • 八、三种方法按多个字段排序
    • 8.1 RDD
    • 8.2 DataFrame
    • 8.3 SparkSQL
  • 九、三种方法显示不重复的数据
    • 9.1 RDD
    • 9.2 DataFrame
    • 9.3 SparkSQL
  • 十、三种方法分组统计数据
    • 10.1 RDD:map/reduce
    • 10.2 DataFrame
      • 10.2.1 crosstab:长表变宽表
    • 10.3 SparkSQL
  • 参考资料


在这里插入图片描述

一、浅语

上一篇(《pyspark RDD相关常用函数使用案例》) 对pyspark的一些常用函数做了梳理,这篇主要是针对RDD、DataFrame、SparkSql三种实现同一功能需要的方式做一梳理,通过实际动手,体会不同方式在数据处理过程中的差异性、便利性。

import findspark
findspark.init() 
from pyspark.sql import SparkSession

# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("Test PySpark") \
    .master("local[*]") \
    .getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'

二、三种数据处理方式比较

2.1 RDD

  • rdd的数据</font color-“green”>只能使用位置来指定每一个字段。
  • rdd功能最强,能完成所有spark功能。

2.2 DataFrame

  • Spark DataFrame被创建时</font color=“green”>必须定义Schema,定义每个字段名和数据类型。
  • 定义了很多类似SQL的方法
  • 比起RDD更容易使用

2.3 Spark SQL

  • 有DataFrame派生出来,所以</font color=“green”>必须先创建DataFrame,进而转化使用。
  • 最简单

三、三种方法的创建方式

3.1 创建RDD

# 读取本地数据文件
mRDD = sc.textFile("test.txt")
print('查看数据的行数:',mRDD.count())
# 按照空格分隔数据字段
sRDD = mRDD.map(lambda x:x.split(' '))
print('查看分隔后的结果:',sRDD.collect())
查看数据的行数: 8
查看分隔后的结果: [['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F'], ['black', '4', 'F'], ['red', '5', 'M'], ['red', '5', 'M'], ['blue', '3', 'M'], ['blue', '7', 'M']]

3.2 创建DataFrame

3.2.1 创建sqlContext

sqlContext = SparkSession.builder.getOrCreate()

3.2.2 定义Schema

from pyspark.sql import Row
sRows = sRDD.map(lambda x:Row(color=x[0],
                             num = int(x[1]),
                             sex = x[2])
                )
# 查看schema
sRows.collect()
[Row(color='yellow', num=1, sex='F'),
 Row(color='blue', num=2, sex='M'),
 Row(color='yellow', num=3, sex='F'),
 Row(color='black', num=4, sex='F'),
 Row(color='red', num=5, sex='M'),
 Row(color='red', num=5, sex='M'),
 Row(color='blue', num=3, sex='M'),
 Row(color='blue', num=7, sex='M')]

3.2.3 创建DataFrame

# 创建DataFrame¶
df = sqlContext.createDataFrame(sRows)
# 使用.printSchema()查看DataFrame的schema
df.printSchema()
root
 |-- color: string (nullable = true)
 |-- num: long (nullable = true)
 |-- sex: string (nullable = true)
# 查看数据
df.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow|  1|  F|
|  blue|  2|  M|
|yellow|  3|  F|
| black|  4|  F|
|   red|  5|  M|
|   red|  5|  M|
|  blue|  3|  M|
|  blue|  7|  M|
+------+---+---+
### 为DataFrame创建别名
dataf = df.alias('dataf')
dataf.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow|  1|  F|
|  blue|  2|  M|
|yellow|  3|  F|
| black|  4|  F|
|   red|  5|  M|
|   red|  5|  M|
|  blue|  3|  M|
|  blue|  7|  M|
+------+---+---+

3.3 创建SparkSQL

3.3.1 登录临时表

dataf.registerTempTable('temp_tb')
D:\bigdataenv\spark-3.5.0-bin-hadoop3\python\pyspark\sql\dataframe.py:329: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.
  warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)

3.3.2 使用sparkSQL

查看数据使用show()方法,默认显示前20行数据

sqlContext.sql('select * from temp_tb').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow|  1|  F|
|  blue|  2|  M|
|yellow|  3|  F|
| black|  4|  F|
|   red|  5|  M|
|   red|  5|  M|
|  blue|  3|  M|
|  blue|  7|  M|
+------+---+---+

四、三种方法显示部分字段

4.1 使用RDD选取显示部分字段

rdd = sRDD.map(lambda x:(x[0],x[2],x[1]))
rdd.take(2)
[('yellow', 'F', '1'), ('blue', 'M', '2')]

4.2 使用DataFrame选取显示字段

下面四种方法显示的结果相同。

4.2.1 select方法输入[字段名]

df.select('color','sex').show()
+------+---+
| color|sex|
+------+---+
|yellow|  F|
|  blue|  M|
|yellow|  F|
| black|  F|
|   red|  M|
|   red|  M|
|  blue|  M|
|  blue|  M|
+------+---+

4.2.2 select方法输入[dataframe名称].[字段名]

df.select(df.color,df.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow|  F|
|  blue|  M|
|yellow|  F|
| black|  F|
|   red|  M|
|   red|  M|
|  blue|  M|
|  blue|  M|
+------+---+

4.2.3 select方法输入[dataframe别名].[字段名]

dataf.select(dataf.color,dataf.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow|  F|
|  blue|  M|
|yellow|  F|
| black|  F|
|   red|  M|
|   red|  M|
|  blue|  M|
|  blue|  M|
+------+---+

4.2.4 通过dataframe中括号方式

df[df['color'],df['sex']].show()
+------+---+
| color|sex|
+------+---+
|yellow|  F|
|  blue|  M|
|yellow|  F|
| black|  F|
|   red|  M|
|   red|  M|
|  blue|  M|
|  blue|  M|
+------+---+

4.3 使用SparkSQL选取显示字段

sqlContext.sql('select color,sex from temp_tb').show()
+------+---+
| color|sex|
+------+---+
|yellow|  F|
|  blue|  M|
|yellow|  F|
| black|  F|
|   red|  M|
|   red|  M|
|  blue|  M|
|  blue|  M|
+------+---+

五、三种方法增加计算字段

5.1 RDD增加计算字段

sRDD.map(lambda x:(x[0],x[1],x[2],10-int(x[1]))).collect()
[('yellow', '1', 'F', 9),
 ('blue', '2', 'M', 8),
 ('yellow', '3', 'F', 7),
 ('black', '4', 'F', 6),
 ('red', '5', 'M', 5),
 ('red', '5', 'M', 5),
 ('blue', '3', 'M', 7),
 ('blue', '7', 'M', 3)]

5.2 DataFrame增加计算字段

df.select('color','num','sex',10-df['num']).show()
+------+---+---+----------+
| color|num|sex|(10 - num)|
+------+---+---+----------+
|yellow|  1|  F|         9|
|  blue|  2|  M|         8|
|yellow|  3|  F|         7|
| black|  4|  F|         6|
|   red|  5|  M|         5|
|   red|  5|  M|         5|
|  blue|  3|  M|         7|
|  blue|  7|  M|         3|
+------+---+---+----------+
# 为计算字段取一个别名
df.select('color','num','sex',(10-df['num']).alias('diff_num')).show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow|  1|  F|       9|
|  blue|  2|  M|       8|
|yellow|  3|  F|       7|
| black|  4|  F|       6|
|   red|  5|  M|       5|
|   red|  5|  M|       5|
|  blue|  3|  M|       7|
|  blue|  7|  M|       3|
+------+---+---+--------+

5.3 SparkSQL增加计算字段

sqlContext.sql('select color,num,sex,10-num as diff_num from temp_tb').show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow|  1|  F|       9|
|  blue|  2|  M|       8|
|yellow|  3|  F|       7|
| black|  4|  F|       6|
|   red|  5|  M|       5|
|   red|  5|  M|       5|
|  blue|  3|  M|       7|
|  blue|  7|  M|       3|
+------+---+---+--------+

六、三种方法筛选数据

6.1 RDD筛选数据

sRDD.filter(lambda x:int(x[1])>2).collect()
[['yellow', '3', 'F'],
 ['black', '4', 'F'],
 ['red', '5', 'M'],
 ['red', '5', 'M'],
 ['blue', '3', 'M'],
 ['blue', '7', 'M']]

6.2 DataFrame筛选数据

四种筛选方式,执行结果相同。

6.2.1 使用多个filter

df.filter("color='blue'").filter('num=2').show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue|  2|  M|
+-----+---+---+

6.2.2 使用单个filter

df.filter("color='blue' and num=2 ").show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue|  2|  M|
+-----+---+---+

6.2.3 使用[dataframe 名称].[字段名]指定条件

注意:

  • 必须使用“&”,不能使用“and”
  • 必须使用“==”,不能使用“=”
df.filter((df.color=='blue') & (df.num==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue|  2|  M|
+-----+---+---+

6.2.4 使用[]指定筛选条件

df.filter((df['color']=='blue') & (df['num']==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue|  2|  M|
+-----+---+---+

6.3 SparkSQL筛选数据

sqlContext.sql("""select * from temp_tb 
        where color='blue' and num=2    """).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue|  2|  M|
+-----+---+---+

七、三种方法按单个字段给数据排序

7.1 RDD:takeOrdered方法

takeOrdered(num,key=None):

  • num:要显示的项数
  • key:使用lambda语句设置要排序的字段
# 升序示例
sRDD.takeOrdered(3,key=lambda x:int(x[1]))
[['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F']]
# 降序示例
sRDD.takeOrdered(3,key=lambda x: -1 * int(x[1]))
[['blue', '7', 'M'], ['red', '5', 'M'], ['red', '5', 'M']]

7.2 DataFrame

# 升序
df.orderBy('num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow|  1|  F|
|  blue|  2|  M|
|  blue|  3|  M|
|yellow|  3|  F|
| black|  4|  F|
|   red|  5|  M|
|   red|  5|  M|
|  blue|  7|  M|
+------+---+---+
# 降序
df.orderBy('num',ascending=0).show()
+------+---+---+
| color|num|sex|
+------+---+---+
|  blue|  7|  M|
|   red|  5|  M|
|   red|  5|  M|
| black|  4|  F|
|  blue|  3|  M|
|yellow|  3|  F|
|  blue|  2|  M|
|yellow|  1|  F|
+------+---+---+

7.3 SQparkSQL

# 升序
sqlContext.sql('select * from temp_tb order by num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow|  1|  F|
|  blue|  2|  M|
|  blue|  3|  M|
|yellow|  3|  F|
| black|  4|  F|
|   red|  5|  M|
|   red|  5|  M|
|  blue|  7|  M|
+------+---+---+
# 降序

sqlContext.sql('select * from temp_tb order by num desc').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|  blue|  7|  M|
|   red|  5|  M|
|   red|  5|  M|
| black|  4|  F|
|  blue|  3|  M|
|yellow|  3|  F|
|  blue|  2|  M|
|yellow|  1|  F|
+------+---+---+

八、三种方法按多个字段排序

8.1 RDD

# 先num降序,color升序
sRDD.takeOrdered(3,key = lambda x: (-1*x[1],x[0]))
[['black', '4', 'F'], ['blue', '2', 'M'], ['blue', '3', 'M']]

8.2 DataFrame

df.orderBy(['num','color'],ascending=[0,1]).show()
+------+---+---+
| color|num|sex|
+------+---+---+
|  blue|  7|  M|
|   red|  5|  M|
|   red|  5|  M|
| black|  4|  F|
|  blue|  3|  M|
|yellow|  3|  F|
|  blue|  2|  M|
|yellow|  1|  F|
+------+---+---+
df.orderBy(df.num.desc(),df.color).show()
+------+---+---+
| color|num|sex|
+------+---+---+
|  blue|  7|  M|
|   red|  5|  M|
|   red|  5|  M|
| black|  4|  F|
|  blue|  3|  M|
|yellow|  3|  F|
|  blue|  2|  M|
|yellow|  1|  F|
+------+---+---+

8.3 SparkSQL

sqlContext.sql("select * from temp_tb order by num desc , color ").show()
+------+---+---+
| color|num|sex|
+------+---+---+
|  blue|  7|  M|
|   red|  5|  M|
|   red|  5|  M|
| black|  4|  F|
|  blue|  3|  M|
|yellow|  3|  F|
|  blue|  2|  M|
|yellow|  1|  F|
+------+---+---+

九、三种方法显示不重复的数据

9.1 RDD

sRDD.map(lambda x:x[2]).distinct().collect()
['F', 'M']
sRDD.map(lambda x:(x[1],x[2])).distinct().collect()
[('3', 'F'),
 ('4', 'F'),
 ('5', 'M'),
 ('3', 'M'),
 ('7', 'M'),
 ('1', 'F'),
 ('2', 'M')]

9.2 DataFrame

df.select('sex').distinct().show()
+---+
|sex|
+---+
|  F|
|  M|
+---+
df.select('num','sex').distinct().show()
+---+---+
|num|sex|
+---+---+
|  2|  M|
|  3|  F|
|  4|  F|
|  1|  F|
|  5|  M|
|  3|  M|
|  7|  M|
+---+---+

9.3 SparkSQL

sqlContext.sql("select distinct sex from temp_tb").show()
+---+
|sex|
+---+
|  F|
|  M|
+---+
sqlContext.sql("select distinct num,sex from temp_tb").show()
+---+---+
|num|sex|
+---+---+
|  2|  M|
|  3|  F|
|  4|  F|
|  1|  F|
|  5|  M|
|  3|  M|
|  7|  M|
+---+---+

十、三种方法分组统计数据

10.1 RDD:map/reduce

在RDD中进行数据的分组统计,必须使用map/reduce

# 单字段:eg:按照sex分组统计
sRDD.map(lambda x:(x[2],int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[('F', 8), ('M', 22)]
# 多字段
sRDD.map(lambda x:((x[2],x[0]),int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[(('F', 'yellow'), 4),
 (('M', 'blue'), 12),
 (('F', 'black'), 4),
 (('M', 'red'), 10)]

10.2 DataFrame

df.select(['sex','num']).groupBy('sex').sum().show()
+---+--------+
|sex|sum(num)|
+---+--------+
|  F|       8|
|  M|      22|
+---+--------+
df.select(['sex','color','num']).groupBy(['sex','color']).sum().orderBy(['sex','color']).show()
+---+------+--------+
|sex| color|sum(num)|
+---+------+--------+
|  F| black|       4|
|  F|yellow|       4|
|  M|  blue|      12|
|  M|   red|      10|
+---+------+--------+

10.2.1 crosstab:长表变宽表

df.crosstab('color','sex').show()
+---------+---+---+
|color_sex|  F|  M|
+---------+---+---+
|   yellow|  2|  0|
|      red|  0|  2|
|    black|  1|  0|
|     blue|  0|  3|
+---------+---+---+

10.3 SparkSQL

sqlContext.sql('select sex,sum(num) from temp_tb group by sex').show()
+---+--------+
|sex|sum(num)|
+---+--------+
|  F|       8|
|  M|      22|
+---+--------+

参考资料

《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739

相关文章:

  • 大模型中的微调LoRA是什么
  • 多视图几何--对极几何--从0-1理解对极几何
  • 个人记录的一个插件,Unity-RuntimeMonitor
  • static 用法,函数递归与迭代详解
  • Spring Cloud之远程调用OpenFeign参数传递
  • Unity单例模式更新金币数据
  • CI/CD—Jenkins配置Poll SCM触发自动构建
  • DETR详解
  • 基于SpringBoot实现旅游酒店平台功能六
  • 【C#学习笔记02】基本元素与数据类型
  • mac本地部署Qwq-32b记录
  • 供应链工作效率如何提升
  • Java常见面试技术点整理讲解——后端框架(整理中,未完成)
  • 什么是一致性模型,在实践中如何选择?
  • 程序化广告行业(3/89):深度剖析行业知识与数据处理实践
  • MOM成功实施分享(七)电力电容制造MOM工艺分析与解决方案(第二部分)
  • 菜鸟打印机组件安装后重启显示“Windows 找不到文件‘CNPrintClient,exe‘。请确定文件名是否正确后,再试一次。”的正确解决方案
  • JavaScript性能优化:DOM操作优化实战
  • 2025-03-10 吴恩达机器学习1——机器学习概述
  • Python的函数
  • 全国治安管理工作视频会召开
  • 上海地铁:一孩童鞋子卡于电梯梯级处,其间未造成人员受伤
  • 住建部:目前已累计建设改造各类市政管网50万公里
  • 荷兰外交大臣费尔德坎普将访华
  • 著名文学评论家、原伊犁师范学院院长吴孝成逝世
  • 外交部:将持续便利中外人员往来,让“中国游”金字招牌更加闪耀