当前位置: 首页 > news >正文

Spark,数据提取和保存

以下是使用 Spark 进行数据提取(读取)和保存(写入)的常见场景及代码示例(基于 Scala/Java/Python,不含图片操作):
 
一、数据提取(读取)
 
1. 读取文件数据(文本/CSV/JSON/Parquet 等)
 
Scala
 
scala   
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Data Read")
  .getOrCreate()

// 读取 CSV(含表头)
val csvDf = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true") // 自动推断数据类型
  .load("path/to/csv/file.csv")

// 读取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")

// 读取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
 
 
Python
 
python   
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Read").getOrCreate()

# 读取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

# 读取 JSON
json_df = spark.read.json("path/to/json/file.json")

# 读取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
 
 
2. 读取数据库数据(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
val jdbcDf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "table_name")
  .option("user", "username")
  .option("password", "password")
  .load()
 
 
Python(以 Hive 为例,需启用 Hive 支持)
 
python   
# 读取 Hive 表(需在 SparkSession 中启用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
 
 
二、数据保存(写入)
 
1. 保存为文件(CSV/JSON/Parquet 等)
 
Scala
 
scala   
// 保存为 CSV(覆盖模式,含表头)
csvDf.write.format("csv")
  .option("header", "true")
  .mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
  .save("output/csv_result")

// 保存为 Parquet(分区存储,提升查询性能)
parquetDf.write.partitionBy("category") // 按字段分区
  .mode("append")
  .parquet("output/parquet_result")
 
 
Python
 
python   
# 保存为 JSON
json_df.write.json("output/json_result", mode="overwrite")

# 保存为 Parquet(指定压缩格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
 
 
2. 保存到数据库(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
jdbcDf.write.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "target_table")
  .option("user", "username")
  .option("password", "password")
  .mode("append") // 追加模式
  .save()
 
 
Python(以 Hive 为例)
 
python   
# 保存为 Hive 表(需启用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
 
 
三、关键参数说明
 
1. 读取模式(文件)
 
-  inferSchema : 是否自动推断数据类型(适用于 CSV/JSON,需读取少量数据,影响性能)。
 
-  header : CSV 是否包含表头( true/false )。
 
2. 写入模式( mode )
 
-  overwrite : 覆盖已有数据。
 
-  append : 追加到现有数据。
 
-  ignore : 忽略写入(不报错)。
 
-  errorIfExists : 存在则报错(默认)。
 
3. 数据库连接
 
- 需添加对应数据库驱动(如 MySQL 的  mysql-connector-java )。
 
- 对于大规模数据,建议使用分区并行写入(如  option("numPartitions", "4") )。
 
四、典型场景示例
 
场景:从 MySQL 读取数据,清洗后保存为 Parquet
 
scala   
// 读取 MySQL 数据
val mysqlDf = spark.read.jdbc(
  url = "jdbc:mysql://host:port/source_db",
  dbtable = "source_table",
  properties = Map("user" -> "u", "password" -> "p")
)

// 数据清洗(示例:过滤空值)
val cleanedDf = mysqlDf.na.drop("any")

// 保存为 Parquet(按日期分区)
cleanedDf.write.partitionBy("date")
  .parquet("output/cleaned_data")
 
 
通过以上方法,可灵活使用 Spark 完成数据提取和保存任务,支持多种数据源和格式。

相关文章:

  • 数组随机重排与维度转换算法
  • 深入解析Python中的Vector2d类:从基础实现到特殊方法的应用
  • ngx_http_random_index_module 模块概述
  • LoadBarWorks:一款赛博风加载动画生成器的构建旅程
  • linux下的 xargs命令使用详解
  • 墨水屏显示模拟器程序解读
  • jqGrid冻结列错行问题,将冻结表格(悬浮表格)与 正常表格进行高度同步
  • HarmonyOS AVPlayer 音频播放器
  • MyBatis 核心技术详解:从连接池到多表查询
  • 聊天室项目总结
  • 主成分分析的应用之sklearn.decomposition模块的PCA函数
  • [Android] 青木扫描全能文档3.0,支持自动扫描功能
  • 从0开始学linux韦东山教程第四章问题小结(1)
  • 单片机设计_停车场车位管理系统(AT89C52、LCD1602)
  • NDRange(OpenCL)和 Grid/Block(CUDA)对比
  • oppo手机安装APK失败报错:安装包异常
  • ngx_http_referer_module 模块概述
  • HTTPS的工作过程
  • 2025/5/18
  • 卷积神经网络进阶:转置卷积与棋盘效应详解
  • 三件珍贵标本开箱!中国恐龙大展5月26日在沪开幕,明星标本汇聚一堂
  • 意德首脑会谈,梅洛尼警告欧盟绿色政策面临“工业荒漠化”
  • “南昌航空一号”成功发射,赣江鄱阳湖有了专属卫星守护
  • 人民网:激发博物馆创新活力,让“过去”拥有“未来”
  • 英国警方再逮捕一名涉嫌参与首相住宅纵火案嫌疑人
  • 工人日报:应对“职场肥胖”,健康与减重同受关注