部分Spark SQL编程要点
根据个人Spark项目学习经验,总结如下几点Spark项目中涉及Spark SQL编程要点,全是干货!!!
文章目录
- select常用操作
- 基本写法
- drop
- withColumn
- withColumnRenamed
- cast
- 使用 agg
- 空值异常值处理
- 时间日期函数
- 数据源建表
- txt文件
- csv文件
- json文件
- 读写MySQL数据库
- DF,DS对象上的SQL语句
**前言:**所有的内容基于使用Spark_Session对象读取数据之后,创建DataFrame对象
df
基本写法
常用的两种写法:
df.select("name","age","sal").show()
df.select(df["name"],df["age"]+1).show()
说明1:
这个show()默认输出20行,如果想显示26行,可以show(26),不建议全部显示
如果非要全部显示,也有如下方法# 方法一:完整显示所有行和列print("=== 完整显示 ===")df.show(truncate=False)#方法二:显示所有行(但每行可能被截断)print("\n=== 显示所有行 ===")df.show(numRows=-1)
说明2:
df.select("sal"+100).show
df.select("age+10").show
这两种写法也是错误的
以上问题可以使用 expr表达式
df.select(expr("age+1")).show
df.selectExpr("round(sal,2) as newsal").show() #保留四舍五入后的两位小数
drop
# 删除一个或多个列
df.drop("name","age")
withColumn
# 修改列值
# 将原来的sal列保留两位小数赋值给新列round_sal(通过sal的数据添加了新列)
df1 = df.withColumn("round_sal",F.round("sal",2))
withColumnRenamed
df.withColumnRenamed("sal","newsal")
cast
## 将 sal 列转为字符串类型
df_new = df.withColumn("Stringsal", df["sal"].cast("string"))
备注:drop,withColumn,withColumnRenamed返回的DF
来看一个例子
# 按年份分组,统计上涨和下跌的总天数
yearly_stats = df.groupBy("year").agg(func.sum("is_up").alias("up_days"), # 每年上涨天数func.sum("is_down").alias("down_days") # 每年下跌天数).orderBy("year") # 按年份排序
agg() 是一个用于多列聚合的核心方法
alias(“up_days”):将第一个聚合结果(sum(is_up))的列名重命名为 up_days
其他常用聚合函数:import pyspark.sql.functions as func func.count():计数非空值数量。 func.avg():计算平均值。 func.max()/func.min():最大/最小值。 func.collect_list():将值收集为列表。
一般来说使用到agg,就会用到groupBy,之后还会用到orderBy
涉及,空值,异常值例如0,空值异常值的填充处理,这里更多就是python的数据预处理
以下是金融大数据分析实战中的部分数据处理代码
# 统计每一列中的空值(NaN/NaT)总数
null_count = df.isnull().sum()
print(null_count)
print("\n=== 各字段空值数量 ===")
print(null_count[null_count > 0]) # 仅显示存在空值的字段及其数量。# 提取所有包含至少一个空值的完整行,对每一行(axis=1)进行逻辑或运算,只要该行有一个 True(即存在空值)
null_rows = df[df.isnull().any(axis=1)].copy() #null_rows 是一个新的 DataFrame,包含所有含空值的行
if len (null_rows) > 0:print (f"\n=== 包含空值的行(共 {len (null_rows)} 行)===")print(null_rows) # 数据有空值的行 (这里没有)# 查看核心字段中有异常值0的行
# o = 开盘价,h = 最高价,l = 最低价,c = 收盘价,v = 成交量,a = 成交额 ,pc = 前收盘价
fields = ['o', 'h', 'l', 'c', 'v', 'a','pc']
zero_count = {}
for field in fields:if field in df.columns:zero_count[field] = (df[field] == 0).sum()print("\n=== 核心字段 0 值数量 ===")for field, count in zero_count.items():print(f"{field}:{count} 行")# eq (0):生成布尔型 DataFrame,标记哪些位置的值等于 0,any (axis=1):对每一行进行逻辑或运算,只要该行有一个核心字段为 0,整行为 True
zero_rows = df [df[fields].eq (0).any(axis=1)].copy()
print(zero_rows) #zero_rows 是包含异常零值的行的新 DataFrame
代码的详细解释已经附上了,最主要的是如何使用
上述代码中,df = pd.read_csv("**.csv")
,遇到任何数据,我们创建DataFrame对象之后,就可以直接使用上述代码进行空值异常值处理,将你的数据字段修改一下即可
空值和异常值的填充问题
# 部分字段存在缺失现象,根据股票数据的特点,使用前值填充的方式填充缺失值
hist_data['v'].fillna(method='ffill', inplace=True)
# 部分股票数据存在异常0值(这些值不应该为0),仍使用前值填充的方式填充0值
hist_data['a'].replace(0, method='ffill', inplace=True)
其他填充方法还有
前向填充(ffill)
或向后填充(bfill)
还有linear线性填充
这个也很重要,因为大多数据都要涉及时间,以及对年月日的处理
Pandas提供了to_datetime()
方法,可以将不同的日期格式转换为datetime 类型
(默认格式是:y-M-d)。设置errors='coerce'
参数,遇到不能转换的格式时将其置为NaN。
python处理方式
# 转换日期格式
rawData['Structed Date'] = pd.to_datetime(rawData['Date'], format = "%m/%d/%Y", errors='coerce').dt.date
rawData['Structed Date'].loc[rawData['Structed Date'].isnull()] # 然后再查看值为NaN的情况# 对时间进行格式转换
rawData['Structed Time'] = pd.to_datetime(rawData['Time'], format = "%H:%M:%S", errors='coerce').dt.time
rawData['Structed Time'].loc[rawData['Structed Time'].isnull()]
spark处理方式
还是以金融大数据处理为例
# 从加载数据开始就可以处理时间格式
schema = StructType([StructField('t', DateType(), False),#DateType 的默认显示格式:yyyy-MM-ddStructField("o", FloatType(), False),StructField("h", FloatType(), False),StructField("l", FloatType(), False)])
# 读取 HDFS CSV 并应用 schema
stockDF = (spark.read.format("csv").option("header", "true")# 若 CSV 无表头,设为 "false" 并通过 schema 定义字段名.schema(schema)# 应用自定义 schema.option("dateFormat","yyyyMMdd")# 会按你定义的格式去解析.load("/user/hadoop/stock/process_data.csv"))# 在使用数据时
# 将日期字段转换为年份
df = stockDF.withColumn("year", func.year("t")) #可以.month .day
# 1. 计算每年的平均交易量
yearly_avg_volume = df.groupBy("year").agg({"v": "avg"}).orderBy("year")#使用Spark的系统函数from_unixtime,将时间戳类型的create_time格式化成时间字符串
# 使用.sql()执行sql语句下面会谈
df.createTempView("user_info")
spark.sql("SELECT name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
txt文件
// 基础加载(每行为一个字符串)
val textDF = spark.read.textFile("input/data.txt")
textDF.show(5, truncate=false) // 显示前5行不截断// 指定编码格式(解决中文乱码)
val utf8DF = spark.read.option("charset", "UTF-8").textFile("input.txt")
csv文件
val configuredDF = spark.read.format("csv").option("header", "true") // 首行为列名.option("inferSchema", "true") // 也可以想上文一样自定义schema.option("delimiter", ";") // 自定义分隔符(如分号).load("data.csv")
json文件
// 自动展平嵌套结构,json显示一行
val jsonDF = spark.read.json("input/employees.json")
jsonDF.printSchema() // 查看推断出的 schema// 展开的json情况下加载
val nestedDF = spark.read.option("multiLine", "true") // 允许跨行 JSON 对象.json("input/complex_objects.json")
读写MySQL数据库
# 读MySQL数据库中的数据
from pyspark.sql import SparkSession
if __name__ == '__main__':spark = SparkSession. \Builder(). \appName('SparkReadMySQL'). \master('local[*]'). \getOrCreate()jdbcDF = spark.read \.format("jdbc") \.option("driver", "com.mysql.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \.option("dbtable", "student") \.option("user", "root") \.option("password", "123456") \.load()jdbcDF.show()spark.stop()# 向MySQL数据库中写入
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSessionspark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()#设置模式信息
schema = StructType([StructField("id", IntegerType(), True), \StructField("name", StringType(), True), \StructField("gender", StringType(), True), \StructField("age", IntegerType(), True)])#设置两条数据,表示两个学生的信息
studentRDD = spark \.sparkContext \.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \.map(lambda x:x.split(" "))#创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))#创建DataFrame
studentDF = spark.createDataFrame(rowRDD, schema)#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'zsh206'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)
首先需要注册临时视图,在临时视图的基础上执行sql 语句
下面来看代码演示
# 方法一
df1.createTempView("user")
spark.sql("select * from user").show()
# 方法二
df2.createOrReplaceTempView("student")
spark.sql("select * from student").show()
这里想表达的意思是,任何一个DataFrame对象都可以创建视图,使用SQL语句进行查询
备注:
- spark.sql返回的是DataFrame
- 如果是TemView已经存在,使用createTempView会报错
- SQL的语法与HQL兼容