当前位置: 首页 > news >正文

部分Spark SQL编程要点

根据个人Spark项目学习经验,总结如下几点Spark项目中涉及Spark SQL编程要点,全是干货!!!

文章目录

    • select常用操作
      • 基本写法
      • drop
      • withColumn
      • withColumnRenamed
      • cast
    • 使用 agg
    • 空值异常值处理
    • 时间日期函数
    • 数据源建表
      • txt文件
      • csv文件
      • json文件
      • 读写MySQL数据库
    • DF,DS对象上的SQL语句

**前言:**所有的内容基于使用Spark_Session对象读取数据之后,创建DataFrame对象 df

  1. select常用操作

基本写法

常用的两种写法:

df.select("name","age","sal").show()
df.select(df["name"],df["age"]+1).show()

说明1:
这个show()默认输出20行,如果想显示26行,可以show(26),不建议全部显示
如果非要全部显示,也有如下方法

  # 方法一:完整显示所有行和列print("=== 完整显示 ===")df.show(truncate=False)#方法二:显示所有行(但每行可能被截断)print("\n=== 显示所有行 ===")df.show(numRows=-1)

说明2:
df.select("sal"+100).show df.select("age+10").show这两种写法也是错误的

以上问题可以使用 expr表达式

df.select(expr("age+1")).show
df.selectExpr("round(sal,2) as newsal").show() #保留四舍五入后的两位小数

drop

# 删除一个或多个列
df.drop("name","age")

withColumn

# 修改列值
# 将原来的sal列保留两位小数赋值给新列round_sal(通过sal的数据添加了新列)
df1 = df.withColumn("round_sal",F.round("sal",2))

withColumnRenamed

df.withColumnRenamed("sal","newsal")

cast

## 将 sal 列转为字符串类型
df_new = df.withColumn("Stringsal", df["sal"].cast("string"))

备注:drop,withColumn,withColumnRenamed返回的DF

  1. 使用 agg

来看一个例子

# 按年份分组,统计上涨和下跌的总天数
yearly_stats = df.groupBy("year").agg(func.sum("is_up").alias("up_days"),    # 每年上涨天数func.sum("is_down").alias("down_days")  # 每年下跌天数).orderBy("year")  # 按年份排序

agg() 是一个用于多列聚合的核心方法
alias(“up_days”):将第一个聚合结果(sum(is_up))的列名重命名为 up_days
其他常用聚合函数:

import pyspark.sql.functions as func
func.count():计数非空值数量。 			
func.avg():计算平均值。
func.max()/func.min():最大/最小值。 			
func.collect_list():将值收集为列表。

一般来说使用到agg,就会用到groupBy,之后还会用到orderBy

  1. 空值异常值处理

涉及,空值,异常值例如0,空值异常值的填充处理,这里更多就是python的数据预处理

以下是金融大数据分析实战中的部分数据处理代码

# 统计每一列中的空值(NaN/NaT)总数
null_count = df.isnull().sum()
print(null_count)
print("\n=== 各字段空值数量 ===")
print(null_count[null_count > 0])  # 仅显示存在空值的字段及其数量。# 提取所有包含至少一个空值的完整行,对每一行(axis=1)进行逻辑或运算,只要该行有一个 True(即存在空值)
null_rows = df[df.isnull().any(axis=1)].copy() #null_rows 是一个新的 DataFrame,包含所有含空值的行
if len (null_rows) > 0:print (f"\n=== 包含空值的行(共 {len (null_rows)} 行)===")print(null_rows) # 数据有空值的行 (这里没有)# 查看核心字段中有异常值0的行
# o = 开盘价,h = 最高价,l = 最低价,c = 收盘价,v = 成交量,a = 成交额 ,pc = 前收盘价
fields = ['o', 'h', 'l', 'c', 'v', 'a','pc']
zero_count = {}
for field in fields:if field in df.columns:zero_count[field] = (df[field] == 0).sum()print("\n=== 核心字段 0 值数量 ===")for field, count in zero_count.items():print(f"{field}{count} 行")# eq (0):生成布尔型 DataFrame,标记哪些位置的值等于 0,any (axis=1):对每一行进行逻辑或运算,只要该行有一个核心字段为 0,整行为 True
zero_rows = df [df[fields].eq (0).any(axis=1)].copy()
print(zero_rows) #zero_rows 是包含异常零值的行的新 DataFrame

代码的详细解释已经附上了,最主要的是如何使用
上述代码中,df = pd.read_csv("**.csv"),遇到任何数据,我们创建DataFrame对象之后,就可以直接使用上述代码进行空值异常值处理,将你的数据字段修改一下即可

空值和异常值的填充问题

# 部分字段存在缺失现象,根据股票数据的特点,使用前值填充的方式填充缺失值
hist_data['v'].fillna(method='ffill', inplace=True)
# 部分股票数据存在异常0值(这些值不应该为0),仍使用前值填充的方式填充0值
hist_data['a'].replace(0, method='ffill', inplace=True)

其他填充方法还有 前向填充(ffill)向后填充(bfill)还有linear线性填充

  1. 时间日期函数

这个也很重要,因为大多数据都要涉及时间,以及对年月日的处理
Pandas提供了to_datetime()方法,可以将不同的日期格式转换为datetime 类型(默认格式是:y-M-d)。设置errors='coerce'参数,遇到不能转换的格式时将其置为NaN。

python处理方式

# 转换日期格式
rawData['Structed Date'] = pd.to_datetime(rawData['Date'], format = "%m/%d/%Y", errors='coerce').dt.date
rawData['Structed Date'].loc[rawData['Structed Date'].isnull()] # 然后再查看值为NaN的情况# 对时间进行格式转换
rawData['Structed Time'] = pd.to_datetime(rawData['Time'], format = "%H:%M:%S", errors='coerce').dt.time
rawData['Structed Time'].loc[rawData['Structed Time'].isnull()]

spark处理方式
还是以金融大数据处理为例

# 从加载数据开始就可以处理时间格式
schema = StructType([StructField('t', DateType(), False),#DateType 的默认显示格式:yyyy-MM-ddStructField("o", FloatType(), False),StructField("h", FloatType(), False),StructField("l", FloatType(), False)])
# 读取 HDFS CSV 并应用 schema
stockDF = (spark.read.format("csv").option("header", "true")# 若 CSV 无表头,设为 "false" 并通过 schema 定义字段名.schema(schema)# 应用自定义 schema.option("dateFormat","yyyyMMdd")# 会按你定义的格式去解析.load("/user/hadoop/stock/process_data.csv"))# 在使用数据时
# 将日期字段转换为年份
df = stockDF.withColumn("year", func.year("t")) #可以.month .day
# 1. 计算每年的平均交易量
yearly_avg_volume = df.groupBy("year").agg({"v": "avg"}).orderBy("year")#使用Spark的系统函数from_unixtime,将时间戳类型的create_time格式化成时间字符串
# 使用.sql()执行sql语句下面会谈
df.createTempView("user_info")
spark.sql("SELECT name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM user_info").show()
  1. 数据源建表

txt文件

// 基础加载(每行为一个字符串)
val textDF = spark.read.textFile("input/data.txt")
textDF.show(5, truncate=false) // 显示前5行不截断// 指定编码格式(解决中文乱码)
val utf8DF = spark.read.option("charset", "UTF-8").textFile("input.txt")

csv文件

val configuredDF = spark.read.format("csv").option("header", "true")       // 首行为列名.option("inferSchema", "true")  // 也可以想上文一样自定义schema.option("delimiter", ";")       // 自定义分隔符(如分号).load("data.csv")

json文件

// 自动展平嵌套结构,json显示一行
val jsonDF = spark.read.json("input/employees.json")
jsonDF.printSchema() // 查看推断出的 schema// 展开的json情况下加载
val nestedDF = spark.read.option("multiLine", "true") // 允许跨行 JSON 对象.json("input/complex_objects.json")

读写MySQL数据库

# 读MySQL数据库中的数据
from pyspark.sql import SparkSession
if __name__ == '__main__':spark = SparkSession. \Builder(). \appName('SparkReadMySQL'). \master('local[*]'). \getOrCreate()jdbcDF = spark.read \.format("jdbc") \.option("driver", "com.mysql.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark?useSSL=false") \.option("dbtable", "student") \.option("user", "root") \.option("password", "123456") \.load()jdbcDF.show()spark.stop()# 向MySQL数据库中写入
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSessionspark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()#设置模式信息 
schema = StructType([StructField("id", IntegerType(), True), \StructField("name", StringType(), True), \StructField("gender", StringType(), True), \StructField("age", IntegerType(), True)])#设置两条数据,表示两个学生的信息
studentRDD = spark \.sparkContext \.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \.map(lambda x:x.split(" "))#创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))#创建DataFrame
studentDF = spark.createDataFrame(rowRDD, schema)#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'zsh206'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)
  1. DF,DS对象上的SQL语句

首先需要注册临时视图,在临时视图的基础上执行sql 语句
下面来看代码演示

# 方法一
df1.createTempView("user")
spark.sql("select * from user").show()
# 方法二
df2.createOrReplaceTempView("student")
spark.sql("select * from student").show()

这里想表达的意思是,任何一个DataFrame对象都可以创建视图,使用SQL语句进行查询
备注:

  • spark.sql返回的是DataFrame
  • 如果是TemView已经存在,使用createTempView会报错
  • SQL的语法与HQL兼容
http://www.dtcms.com/a/473995.html

相关文章:

  • 【完整源码+数据集+部署教程】 飞机表面缺陷检测系统源码和数据集:改进yolo11-EfficientFormerV2
  • 工作做ppt课件的网站广州抖音seo
  • Java并发编程实战深度解析线程池ThreadPoolExecutor的设计原理与性能优化策略
  • 烟台建设公司网站兰州新区网站建设
  • OpenWrt之ipv6防火墙配置放行局域网设备的公网ipv6
  • 第一个爬虫程序:用 Requests+BeautifulSoup 抓取豆瓣电影 Top250
  • JavaScript 企业面试与学习难度拆解:从0到中高级的阶梯式路线图
  • 北京互联网公司有多少家seo词条
  • 网站项目建设所需成本网站前端建设需要学会什么
  • 拌合站软件开发(25) 替换海康LED屏幕可行性分析及方案
  • 外贸公司网站改版思路汉中网站网站建设
  • 物联网和嵌入式开发中使用16进制的原因
  • 自己制作网站的方法是服务器怎样做网站呢
  • 制作网站注册登录模块的思维导图今天的新闻联播
  • 映诗:基于视觉编码与自然语言生成的作诗平台
  • 《深入理解 SQLAlchemy 引擎与会话:从 Core 到 ORM 的全景解析》
  • Redis渐进式遍历:安全高效的键扫描术
  • Java-集合练习2
  • sql优化之联合索引
  • 基于51单片机无线八路抢答器
  • 网站怎么做白色字阿里巴巴网站官网
  • 2.3进程同步与互斥
  • 计算机组成原理之第一章计算机系统概述
  • 无服务器架构下的ACID特性实现方案
  • 四平方和定理
  • 搜索郑州网站服装网站建设
  • 广西临桂建设局网站如何做家乡网站
  • Leetcode2166-设计位集
  • 三种方法解——力扣206.反转链表
  • 企业网站广告网站响应式是什么意思