当前位置: 首页 > news >正文

Spark-SQL(一)

Spark SQL

概述

Spark SQL是Apache Spark用于处理结构化数据的模块

特点

1 易整合。无缝的整合了 SQL 查询和 Spark 编程

2 统一的数据访问。使用相同的方式连接不同的数据源

3 兼容 Hive。在已有的仓库上直接运行 SQL 或者 HQL

4 标准数据连接。通过 JDBC 或者 ODBC 来连接

代码

创建 DataFrame

1. 首先,在spark的bin目录下,创建data目录,在里面创建一个json文件,名为 user.json 文件中的数据为:

{"username":"zhangsan","age":20}

{"username":"lisi","age":17}

2. 读取 json 文件创建 DataFrame--------df

 val df = spark.read.json("D:/spark/bin/data/user.json")

3. 展示数据:

 df.show

SQL语法

1. 读取数据-----df1

 val df1 = spark.read.json("D:/spark/bin/data/user.json")

2. 对 DataFrame 创建一个临时表

 df1.createOrReplaceTempView("people")

3. 通过 SQL 语句实现查询全表

 val sqlDF = spark.sql("select * from people")

4. 结果展示

sqlDF.show

DSL语法

1. 创建一个 DataFrame-----df2

val df2 = spark.read.json("D:/spark/bin/data/user.json")

2. 查看 DataFrame 的 Schema 信息

df2.printSchema

3. 只查看"username"列数据

 df2.select("username").show()

4. 查看"username"列数据以及"age+1"数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

df2.select($"username",$"age" + 1).show

df2.select('username ,'age + 1).show()

5. 查看"age"大于"18"的数据

df2.filter($"age" > 18).show

6. 按照"age"分组,查看数据条数

 df2.groupBy("age").count.show

RDD 转换为 DataFrame

DataFrame 转换为 RDD

val df3 = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF

val rdd = df.rdd

val array = rdd.collect

注意:此时得到的 RDD 存储类型为 Row

array(0)

array(0)(0)

array(0).getAs[String]("name")

DataSet

1. 使用样例类序列创建 DataSet

case class Person(name: String, age: Long)

val caseClassDS = Seq(Person("zhangsan",2)).toDS()

caseClassDS.show

2. 使用基本类型的序列创建 DataSet

val ds = Seq(1,2,3,4,5).toDS

ds.show

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

RDD 转换为 DataSet

case class User(name:String, age:Int)

sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

DataSet 转换为 RDD

case class User(name:String, age:Int)

val aaa = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

val rdd = aaa.rdd

rdd.collect

DataFrame 和 DataSet 转换

DataFrame 转换为 DataSet

case class User(name:String, age:Int)

val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")

val ds = df.as[User]

DataSet 转换为 DataFrame

val ds = df.as[User]

val df = ds.toDF

RDD、DataFrame、DataSet 三者的关系

三者的共性

RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数

据提供便利;

三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;

三者有许多共同的函数,如 filter,排序等;

在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:

import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)

三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

三者都有分区(partition)的概念

DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

三者的区别

1. RDD

RDD 一般和 spark mllib 同时使用

RDD 不支持 sparksql 操作

2. DataFrame

与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值

DataFrame 与 DataSet 一般不与 spark mllib 同时使用

DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作

DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然

3. DataSet

Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。

DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]

DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性里提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

三者可以通过上图的方式进行相互转换。


文章转载自:
http://achlorhydria.wanhuigw.com
http://ban.wanhuigw.com
http://aurum.wanhuigw.com
http://cache.wanhuigw.com
http://bacat.wanhuigw.com
http://ascolichen.wanhuigw.com
http://acousma.wanhuigw.com
http://chillsome.wanhuigw.com
http://aldermaston.wanhuigw.com
http://automania.wanhuigw.com
http://anker.wanhuigw.com
http://cbx.wanhuigw.com
http://artotype.wanhuigw.com
http://busk.wanhuigw.com
http://caffeinic.wanhuigw.com
http://bindin.wanhuigw.com
http://albite.wanhuigw.com
http://appetent.wanhuigw.com
http://charactron.wanhuigw.com
http://amputee.wanhuigw.com
http://brierroot.wanhuigw.com
http://ble.wanhuigw.com
http://amygdalotomy.wanhuigw.com
http://bagarre.wanhuigw.com
http://caelian.wanhuigw.com
http://angle.wanhuigw.com
http://centremost.wanhuigw.com
http://chlorinous.wanhuigw.com
http://appealable.wanhuigw.com
http://chiao.wanhuigw.com
http://www.dtcms.com/a/133364.html

相关文章:

  • docker方式项目部署(安装容器组件+配置文件导入Nacos+dockerCompose文件创建管理多个容器+私有镜像仓库Harbor)
  • 循环神经网络 - 门控循环单元网络
  • LanDiff:赋能视频创作,语言与扩散模型的融合力量
  • 波束形成(BF)从算法仿真到工程源码实现-第八节-波束图
  • 【云平台监控】安装应用Ansible服务
  • 【Ansible自动化运维】六、ansible 实践案例与最佳实践:经验总结与分享
  • 未来七轴机器人会占据主流?深度解析具身智能方向当前六轴机器人和七轴机器人的区别,七轴力控机器人发展会加快吗?
  • AndroidStudio编译报错 Duplicate class kotlin
  • Uniapp: 修改启动时的端口号
  • 聊透多线程编程-线程池-9.C# 线程同步实现方式
  • Windows系统docker desktop安装(学习记录)
  • C++23 新特性:[[assume(expression)]] 属性
  • FileWriter 详细解析与记忆方法
  • 用 Deepseek 写的uniapp血型遗传查询工具
  • VRoid-Blender-Unity个人工作流笔记
  • 相机内外参
  • 苍穹外卖3
  • 某车企面试备忘
  • 打造AI应用基础设施:Milvus向量数据库部署与运维
  • PyTorch梯度:深度学习的引擎与实战解析
  • Git报错remote: Verify fatal: Authentication failed for ***
  • 比特币不是solidity编写吗,比特币不是基于 Solidity
  • 【项目管理】第15章 项目风险管理-- 知识点整理
  • ASP.NET Core 性能优化:分布式缓存
  • ubunut24.04 bash和zsh同时使用conda
  • cocosCreator安卓隐私弹窗(链接版)
  • (二十四)安卓开发中的AppCompatActivity详解
  • QML ListView:列表视图的数据交互与样式定制
  • UnityUI:Canvas框架获取鼠标悬浮UI
  • CExercise_05_1伪随机数_2编写程序模拟掷骰子的游戏(每一次投掷,都投掷两个骰子)