当前位置: 首页 > news >正文

数据加载与保存

通用方式

SparkSQL提供了通用的数据加载方式,使用spark.read.loa方法,并可通过format指定数据类型(如csv、jdbc、json、orc、parquet、textFile)。

load方法后需传入数据路径(针对csv、jdbc、json、orc、parquet、textFile格式)。

option方法用于设置特定格式的参数,如jdbc的url、user、password、dbtable。

特定格式加载

Parquet‌:Spark SQL的默认数据源,无需指定format即可载。

JSON‌:Spark SQL能自动推测JSON数据集结构,使用spark.read.json(path)加载。注意,每行应为一个JSON串。

val path = "/opt/module/spark-local/people.json"

val peopleDF = spark.read.json(path)

查询数据:可以通过SQL语句查询JSON数据。

val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"

CSV‌:需指定format为csv,并可通过option设置分隔符、是否推断schema、是否包含表头等信息。

MySQL‌:通过JDBC从关系型数据库读取数据,使用spark.read.format("jdbc").option(...)方式,并传入数据库连接信息。

数据保存

通用方式

使用df.write.save方法保存数据,同样可通过format指定数据类型。

save方法后需传入保存路径(针对csv、orc、parquet、textFile格式)。

option方法用于设置特定格式的参数。

保存操作可使用SaveMode来指明如何处理数据,如覆盖(overwrite)、追加(append)等,通过mode方法设置。

特定格式保存

与加载类似,Parquet、JSON、CSV等格式均可通过指定format进行保存。

MySQL等关系型数据库的写入也通过JDBC实现,需指定format为jdbc,并传入数据库连接信息及表名。

注意事项

在处理JSON数据时,需确保文件格式符合Spark的要求,即每行一个JSON串。

在读取CSV文件时,可通过设置option来指定分隔符、是否推断schema等信息,以便正确解析文件内容。

在通过JDBC连接数据库时,需确保数据库驱动已正确导入,并正确配置数据库连接信息。

在保存数据时,需根据实际需求选择合适的SaveMode,以避免数据覆盖或丢失。

Spark SQLHive的集成

Spark SQL可以编译时包含Hive支持,从而提供对Hive表访问、UDF(用户自定义函数)、Hive查询语言(HQL)等特性的支持。在使用时,无需事先安装Hive,但最好在编译Spark SQL时引入Hive支持。

IDEA通过JDBC对MySQL进行操作:

读取数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

//通用的load方式读取

spark.read.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user")

  .load().show()

spark.stop()

//通用的load方法的另一种形式

spark.read.format("jdbc")

  .options(

    Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))

  .load().show()

//通过JDBC

val pros :Properties = new Properties()

pros.setProperty("user","root")

pros.setProperty("password","123456")

val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)

df.show()

 写入数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),

  Stu("zs", 30)))

val ds:Dataset[Stu] = rdd.toDS()

ds.write.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user2")

  .mode(SaveMode.Append)

  .save()

spark.stop()

相关文章:

  • Ubuntu服务器中了木马且处于局域网内无法直接通过公网正向连接
  • Mac OS系统下kernel_task占用大量CPU资源导致系统卡顿
  • Linux:Makefile
  • 数字电子技术基础(四十七)——使用Mutlisim软件来模拟74LS85芯片
  • STM32基础教程——DMA+ADC多通道
  • 【后端】【python】利用反射器----动态设置装饰器
  • 智能语音处理+1.1下载需要的库(100%实现)
  • 【Lerobot】加载本地数据LeRobotDataset数据、读取并解析parquet
  • 【c语言】深入理解指针1
  • 排序(java)
  • 任务的状态
  • 投资理财_从0到1:如何用1000元开启你的二级市场投资之旅?
  • 实战5:Python使用循环神经网络生成诗歌
  • 解决virtualbox7.1无法启动3d加速的问题
  • 大数据人工智能
  • 算法的时间复杂度
  • L37.【LeetCode题解】三数之和(双指针思想)
  • Java练习——day2(集合嵌套)
  • Nginx:轻量级高性能的Web服务器与反向代理服务器
  • 开源推荐#6:可爱的临时邮箱服务
  • 山西太原网站建设/我国网络营销现状分析
  • 秦皇岛市做网站优化/百度提交网址多久才会收录
  • 门户网站系统源码/广州seo成功案例
  • 口碑好的镇江网站建设/市场营销推广策略
  • 哈尔滨网站建设学校/软文客
  • wordpress怎么添加网盘下载/南昌seo外包公司