Spark,数据提取和保存
以下是使用 Spark 进行数据提取(读取)和保存(写入)的常见场景及代码示例(基于 Scala/Java/Python,不含图片操作):
一、数据提取(读取)
1. 读取文件数据(文本/CSV/JSON/Parquet 等)
Scala
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Data Read")
.getOrCreate()
// 读取 CSV(含表头)
val csvDf = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true") // 自动推断数据类型
.load("path/to/csv/file.csv")
// 读取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")
// 读取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
Python
python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Read").getOrCreate()
# 读取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
# 读取 JSON
json_df = spark.read.json("path/to/json/file.json")
# 读取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
2. 读取数据库数据(如 MySQL/Hive)
Scala(以 MySQL 为例)
scala
val jdbcDf = spark.read.format("jdbc")
.option("url", "jdbc:mysql://host:port/db?useSSL=false")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
Python(以 Hive 为例,需启用 Hive 支持)
python
# 读取 Hive 表(需在 SparkSession 中启用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
二、数据保存(写入)
1. 保存为文件(CSV/JSON/Parquet 等)
Scala
scala
// 保存为 CSV(覆盖模式,含表头)
csvDf.write.format("csv")
.option("header", "true")
.mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
.save("output/csv_result")
// 保存为 Parquet(分区存储,提升查询性能)
parquetDf.write.partitionBy("category") // 按字段分区
.mode("append")
.parquet("output/parquet_result")
Python
python
# 保存为 JSON
json_df.write.json("output/json_result", mode="overwrite")
# 保存为 Parquet(指定压缩格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
2. 保存到数据库(如 MySQL/Hive)
Scala(以 MySQL 为例)
scala
jdbcDf.write.format("jdbc")
.option("url", "jdbc:mysql://host:port/db?useSSL=false")
.option("dbtable", "target_table")
.option("user", "username")
.option("password", "password")
.mode("append") // 追加模式
.save()
Python(以 Hive 为例)
python
# 保存为 Hive 表(需启用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
三、关键参数说明
1. 读取模式(文件)
- inferSchema : 是否自动推断数据类型(适用于 CSV/JSON,需读取少量数据,影响性能)。
- header : CSV 是否包含表头( true/false )。
2. 写入模式( mode )
- overwrite : 覆盖已有数据。
- append : 追加到现有数据。
- ignore : 忽略写入(不报错)。
- errorIfExists : 存在则报错(默认)。
3. 数据库连接
- 需添加对应数据库驱动(如 MySQL 的 mysql-connector-java )。
- 对于大规模数据,建议使用分区并行写入(如 option("numPartitions", "4") )。
四、典型场景示例
场景:从 MySQL 读取数据,清洗后保存为 Parquet
scala
// 读取 MySQL 数据
val mysqlDf = spark.read.jdbc(
url = "jdbc:mysql://host:port/source_db",
dbtable = "source_table",
properties = Map("user" -> "u", "password" -> "p")
)
// 数据清洗(示例:过滤空值)
val cleanedDf = mysqlDf.na.drop("any")
// 保存为 Parquet(按日期分区)
cleanedDf.write.partitionBy("date")
.parquet("output/cleaned_data")
通过以上方法,可灵活使用 Spark 完成数据提取和保存任务,支持多种数据源和格式。