pyspark入门实操
1、 Spark简单介绍
Spark是一个快速、通用的大数据处理引擎,可以进行分布式类数据处理和分析。与Hadoop的MapReduce相比,Spark具有更高的性能和更丰富的功能。Spark支持多种编程语言(如Scala、Java和Python(pyspark)),并提供了一组丰富的API,包括用于数据处理、机学习和图计算的库。
据我了解,大部分公司,都会对于数据的预处理+模型的推理都会用pyspark来做分布式处理,如模型的分布式推理(tensorflow和torch只支持分布式训练,不支持分布式预测)。
一般用的最多的是spark-scala(追求高性能,强于工程)和pyspark(AI算法场景,方便和其他python库配合,强于分析)。此外spark-scala支持spark graphx图计算模块,而pyspark是不支持的。
2、 spark架构
Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。
SparkContext是spark功能的主要入口。其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。
每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个spark application的生命周期,一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量,并目可以通过SparkContext访问Spark的服务,运行任务。SparkContext设置内部服务,并建立与spark执行环境的连接。
3、 driver是啥
在 Spark 应用程序中,Driver 这个组件发挥着非常重要的作用。它负责应用程序的启动,控制 Spark 集群、执行任务,以及处理应用程序的结果并将它们呈现给用户。
具体地说,Driver 执行以下任务:
1.启动 Soark 应用程序:在启动时,Driver 创建一个 SoarkContext对象,SoarkContext是一个与 Soark 集群交互的主要入口
点,同时向集群申请执行资源。
2.解析和计算每个 RDD 的依赖关系:在应用程序定义 RDD 及其转换操作后,Driver 确定 RDD 之间依赖关系以构建一个有向无环
图(DAG),并将该图提交给集群的调度程序。
3.将 Spark 作业转换为任务:一旦 DAG 构建完成,Driver 将 DAG 拆分成任务并将其提交到集群
4.协调任务的执行: Driver 与 Spark 集群中的资源管理器协同工作,根据可用性和资源限制决定任务在哪个工作节点或 Executor
上运行。
5.管理结果的收集和管理:一旦任务完成,Driver 将收集到的数据汇总,并决定如何将它们呈现给客户端(例如,写入到
HDFS/Hive等外部系统,或者将它们发送到用户的应用程序)。
总之,Driver 是整个 Spark 应用程序的中央调度器,它将应用程序逻辑转换为可以在集群上执行的任务,并协调所有 Executor 的工作。
Driver 在 Spark 应用程序的执行过程中不断地接收、处理、调度和监视数据。
总结:执行 filter 和 map 时,控制台没有任何输出、没有计算发生;只有执行 collect()(Action)时,Spark 才会串联所有 Transformation 逻辑,分布式计算后返回结果。所以,transformation操作只是生成rdd之间的DAG依赖关系图,action操作才是真正的执行操作。shuffle操作就类似于sql的排序操作,或者筛选重输出的操作。
以下是一些pyspark的实操!
# coding=utf-8from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import Row #在pyspark中,Row是一个类似于元组的类型,用于表示Spark DataFrame、DataSet或者rdd的条目进行操作
import datetime
import logging spark = SparkSession.builder. \config('spark.hadoop.hive.exec.dynamic.partition.mode', 'nonstrict'). \config('spark.hadoop.hive.exec.dynamic.partition', 'true'). \config('spark.sql.crossJoin.enabled', 'true'). \appName('temp_2025110402'). \enableHiveSupport(). \getOrCreate()spark.sql('use dwh')
spark.sql("SET hive.exec.dynamic.partition=true")
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
sc=spark.sparkContextdt=${yyyyMMdd}
print(dt)
##1、计算hive表数据输出结果到hive表
# df=spark.sql("select phone_num,cust_code,openacct_date,first_pc_source,dt from dwh.dm_pc_user_all_df where dt={}".format(dt)) #返回的df是spark分布式结构,与RDD同源,支持分布式计算,而非本地内存中的pandas df
# def filter_data(df):
# df1=df.filter((df["first_pc_source"]=="互联网引流") & (df["openacct_date"].isNotNull())).select(['phone_num','cust_code','dt'])
# return df1
# df1=filter_data(df)
# print({df1.count()})
# df1.write.format("hive").option('fileFormat','orcfile').mode("append").partitionBy("dt").saveAsTable("temp_2025110402")#有分区的orc表##2、pandas的df转spark的df # pdf=pd.DataFrame([("lilei",18),("hanmeimei",17)],columns=["name","age"]) #本地生成df
# df=spark.createDataFrame(pdf) #生成分布式df
# df.show()#show为action操作,立即输出 ##3、rdd转df
# rdd=sc.parallelize([("lilei",15),("hanmeimei",16)]) #生成rdd数据
# df=rdd.toDF(["name","age"]) #将rdd数据转化成spark的df
# df.show()
# df.printSchema() #输出各列信息 ##4、读取hdfs文件到spark的df,另存df为hdfs文件、另存df到hive表
# df=spark.read.option("header","true").option("inferSchema","true").option("delimiter",",").csv("/project/DWH/ef/20250922返回的30万数据.csv")
# df.show(5)
# df.write.format("csv").option("header","true").save("/project/DWH/ef/temp.csv") #存到hdfs文件
# df.write.format("hive").option('fileFormat','textfile').mode("append").saveAsTable("temp_20251105") #无分区的textfile表##5、类rdd操作
#pdf=pd.DataFrame([("Hello World"),("Hello Spark")],columns=["value"])
#df=spark.createDataFrame(pdf)
#1)、map
# rdd=df.rdd.map(lambda x:Row(x[0].upper())) #对df第一列进行upper操作
# dfmap=rdd.toDF(["value"]).show()
#2)、flatmap
#df_flat=df.rdd.flatMap(lambda x:x[0].split(" ")).map(lambda x:Row(x)).toDF(["value"]) #对df进行split后放在一列,并转化df
# df_flat.show()
#3)、filter&broadcast
# broads=sc.broadcast(["Hello","World"])
# df_filter_broad=df_flat.filter(~col("value").isin(broads.value)) #过滤掉不在broads里的取值
# df_filter_broad.show()
#4)、distinct
# df_distinct=df_flat.distinct() #去重
# df_distinct.show()
#5)、intersect
# pdf2=pd.DataFrame([("Hello World"),("Hello Scala")],columns=["value"])
# df2=spark.createDataFrame(pdf2)
# dfintersect=df.intersect(df2) #取交集##6、类excel操作
# pdf=pd.DataFrame([("LiLei",15,"male"),
# ("HanMeiMei",16,"female"),
# ("DaChui",17,"male"),
# ("RuHua",16,None)],columns=["name","age","gender"])
# df=spark.createDataFrame(pdf)
# #1)、withColumn
# dfnew=df.withColumn("birthyear",2025-df["age"])
# dfnew.show()
# #2)、select
# dfupdate=dfnew.select("name","age","birthyear","gender") #字段重新排序
# dfupdate.show()
# #3)、withColumnRenamed
# dfrename=df.withColumnRenamed("gender","sex")
# dfrename.show()
# #4)、sort
# dfsorted=df.sort(df["age"].desc()) #根据年龄降序排序
# dfsorted.show()
# #5)、orderBy
# dfordered=df.orderBy(df["age"].desc(),df["gender"].desc())
# dfordered.show()
# #6)、drop、fill、replace、dropDuplicates、agg
# dfnotnan = df.na.drop()#去除nan值的行
# dfnotnan.show()
# df_fill = df.na.fill("female")#填充nan值
# df_fill.show()
# df_replace = df.na.replace({"":"female","RuHua":"SiYu"})#替换某些字段的值
# df_replace.show()
# df2 = df.unionAll(df) #合并再去重
# df2.show()
# dfunique = df2.dropDuplicates()
# dfunique.show()
# dfunique_part = df.dropDuplicates(["age"])#去重,根据部分字段
# dfunique_part.show()
# dfagg = df.agg(F.count('name').alias('count'), F.max('age').alias('max_age'))#求用户个数和最大年龄数
# # dfagg = df.agg({"name":"count","age":"max"})# 也可这样
# dfagg.show()##7、类sql操作
# df = spark.createDataFrame([
# ("LiLei",15,"male"),
# ("HanMeiMei",16,"female"),
# ("DaChui",17,"male"),
# ("RuHua",16,None)]).toDF("name","age","gender") #也可以这样直接生成spark df # dftest = df.select("name").limit(2)#表查询select
# dftest.show()
# dftest = df.select("name",df["age"]-2020).toDF("name","birth_year")# .toDF换个名字
# dftest.show()
# spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
# dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )#表查询selectExpr,可以使用UDF函数,指定别名等
# dftest.show()
# dftest = df.where("gender='male' and age>15")#表查询where, 指定SQL中的where子句表达式
# dftest.show()
# dftest = df.filter(df["age"]>16)#表查询filter
# #或者
# dftest = df.filter("age>16")
# dftest.show()##8、groupBy&pivot
# dfstudent = spark.createDataFrame([("LiLei",18,"male",1),("HanMeiMei",16,"female",1),
# ("Jim",17,"male",2),("DaChui",20,"male",2)]).toDF("name","age","gender","class")
# dfstudent.show()
# dfstudent.groupBy("class").pivot("gender").max("age").show() #class和gender透视表取最大年龄
# ##9、selectExpr&row_number() over()
# df = spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
# ("DaChui",65,"class2"),("RuHua",55,"class2")]) \
# .toDF("name","score","class")# dforder = df.selectExpr("name","score","class",
# "row_number() over (partition by class order by score desc) as order") #row_number() over()之后按照顺序保留name、score、class
# dforder.show()##9、spark df与sql的交互
# df = spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
# ("name","age","gender"))
# df.createOrReplaceTempView("student") #注册一张临时表视图
# dfmale = spark.sql("select * from student where gender='male'") #进行sql操作
# dfmale.show()##10、action操作
##collect、take、takeSample、first、count、reduce
# rdd = sc.parallelize(range(1,11),2)
# rdd.collect()# collect操作将数据汇集到Driver,数据过大时有超内存风险
# rdd = sc.parallelize(range(10),5)
# part_data = rdd.take(3)#take操作将前若干个数据汇集到Driver,相比collect安全
# rdd = sc.parallelize(range(10),5)
# sample_data = rdd.takeSample(withReplacement=False, num=10, seed=0)#takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
# rdd = sc.parallelize(range(10),5)
# first_data = rdd.first()#first取第一个数据
# print(first_data) # 0
# rdd = sc.parallelize(range(10),5)
# data_count = rdd.count()#count查看RDD元素数量
# print(data_count) # 10
# rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=2)
# rdd.reduce(lambda x,y:x+y)#reduce利用二元函数对数据进行规约##11、transformation操作
# rdd = sc.parallelize(range(10),3)
# new_rdd = rdd.map(lambda x:x**2)
# new_rdd.collect()
# # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# # filter应用过滤条件过滤掉一些数据
# rdd = sc.parallelize(range(10),2)
# rdd.filter(lambda x:x>5).collect()
# # [6, 7, 8, 9]
# #flatMap操作执行将每个元素生成一个Array后压平
# rdd = sc.parallelize(["hello world","hello China"])
# # map
# rdd.map(lambda x:x.split(" ")).collect()
# # [['hello', 'world'], ['hello', 'China']]
# # flatMap
# rdd.flatMap(lambda x:x.split(" ")).collect()
# # ['hello', 'world', 'hello', 'China']
# #sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
# rdd = sc.parallelize(range(10),1)
# rdd.sample(withReplacement=False, fraction=0.5, seed=0).collect()
# # [1, 4, 9]
# rdd = sc.parallelize(range(10), 1)
# sample_list = rdd.takeSample(False, 5, 0)
# #distinct去重
# rdd = sc.parallelize([1,1,2,2,3,3,4,5])
# rdd.distinct().collect()
# # [4, 1, 5, 2, 3]
# #subtract找到属于前一个rdd而不属于后一个rdd的元素
# a = sc.parallelize(range(10))
# b = sc.parallelize(range(5,15))
# a.subtract(b).collect()
# # [0, 1, 2, 3, 4]
# #union合并数据
# a = sc.parallelize(range(5))
# b = sc.parallelize(range(3,8))
# a.union(b).collect()
# # [0, 1, 2, 3, 4, 3, 4, 5, 6, 7]
# #intersection求交集
# a = sc.parallelize(range(1,6))
# b = sc.parallelize(range(3,9))
# a.intersection(b).collect()
# # [3, 4, 5]
# #按照某种方式进行排序
# #指定按照第3个元素大小进行排序
# rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
# rdd.sortBy(lambda x:x[2]).collect()
# # [(4, 1, 1), (3, 2, 2), (1, 2, 3)]
# #按照拉链方式连接两个RDD,效果类似python的zip函数
# #需要两个RDD具有相同的分区,每个分区元素数量相同
# rdd1 = sc.parallelize(["LiLei","Hanmeimei","Lily"])
# rdd2 = sc.parallelize([19,18,20])
# rdd_zip = rdd1.zip(rdd2)
# rdd_zip.collect()
# # [('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
# #将RDD和一个从0开始的递增序列按照拉链方式连接。
# rdd = sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
# rdd_index = rdd.zipWithIndex()
# rdd_index.collect()
# # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]
# spark.stop()