spark数据的提取和保存
Spark数据提取和保存
一、数据提取(读取数据)
1. 读取文件(文本、CSV、JSON等)
scala
// 读取文本文件
val textData = spark.read.text("路径/文件.txt")
// 读取CSV文件(带表头)
val csvData = spark.read
.option("header", "true")
.csv("路径/文件.csv")
// 读取JSON文件
val jsonData = spark.read.json("路径/文件.json")
2. 读取数据库(如MySQL)
scala
import spark.implicits._
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://host:port/db")
.option("dbtable", "表名")
.option("user", "用户名")
.option("password", "密码")
.load()
3. 读取Hive表
scala
val hiveDF = spark.sql("SELECT * FROM hive表名")
二、数据保存(写入数据)
1. 保存为文件(文本、CSV、JSON等)
scala
// 保存为CSV文件(覆盖模式)
csvData.write.mode("overwrite").csv("路径/输出.csv")
// 保存为JSON文件(分区存储)
jsonData.write.partitionBy("字段").json("路径/输出.json")
2. 保存到数据库(如MySQL)
scala
jdbcDF.write.format("jdbc")
.option("url", "jdbc:mysql://host:port/db")
.option("dbtable", "表名")
.option("user", "用户名")
.option("password", "密码")
.mode("append") // 追加模式
.save()
3. 保存到Hive表
scala
hiveDF.write.saveAsTable("hive表名")
关键参数说明
- mode :写入模式( overwrite 覆盖、 append 追加、 ignore 忽略已存在数据等)。
- option :配置数据源参数(如表头、分隔符、数据库连接信息等)。
根据具体数据源类型选择对应格式( format ),如需处理非结构化数据(如图片、日志),可结合 binaryFile 或自定义解析逻辑。