SparkSQL读取普通文件的方式
SparkSQL 读取普通文件(如 CSV、JSON、文本文件等)主要通过 SparkSession 的 read API 实现。以下是详细步骤和示例:
- 类型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
- 语法:spark.read.format(格式).load(读取的地址)
方式一:给定读取数据源的类型和地址
spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)
方式二:直接调用对应数据源类型的方法
spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)
特殊参数:option,用于指定读取时的一些配置选项
spark.read.format("csv").option("sep", "\t").load(path)
jdbcDF = spark.read \
.format("jdbc") \
.option("网址", "jdbc:postgresql:dbserver") \
.option("数据库名", "schema.tablename") \
.option("用户名", "username") \
.option("密码", "password") \
.load()
1. 基础语法
val df = spark.read.format("文件格式") // 如 csv, json, text, parquet.option("参数名", "参数值") // 可选配置.load("文件路径")
2. 读取不同格式的文件
(1) 读取 CSV 文件
// Scala 示例
val csvDF = spark.read.format("csv").option("header", "true") // 包含表头.option("inferSchema", "true") // 自动推断数据类型.load("/path/to/file.csv")// Python 示例 (PySpark)
csvDF = spark.read \.format("csv") \.option("header", "true") \.option("inferSchema", "true") \.load("/path/to/file.csv")
(2) 读取 JSON 文件
val jsonDF = spark.read.format("json").load("/path/to/file.json")
(3) 读取纯文本文件
val textDF = spark.read.format("text").load("/path/to/file.txt") // 每行作为字符串存入 DataFrame
3. 关键配置参数
| 参数 | 说明 |
|---|---|
header | 是否包含列头(默认 false) |
inferSchema | 是否自动推断数据类型(默认 false,建议小文件开启) |
delimiter | 指定分隔符(例如 ,、;、\t) |
quote | 定义引号字符(默认 ") |
escape | 定义转义字符(默认 \) |
multiLine | 是否允许单字段跨多行(适用于复杂 JSON) |
4. 路径指定方式
- 本地文件:
file:///absolute/path/to/file.csv - HDFS 文件:
hdfs://namenode:port/path/to/file.csv - S3 文件:
s3a://bucket-name/path/to/file.csv
📌 示例:
.load("hdfs://localhost:9000/data/sales.json")
5. 直接使用格式简写
// 读取 CSV(等价于 .format("csv"))
val df = spark.read.csv("/path/to/file.csv") // 读取 JSON
val df = spark.read.json("/path/to/file.json")
6. 查看数据与 Schema
df.show(5) // 显示前5行
df.printSchema() // 打印表结构
输出示例:
root|-- name: string (nullable = true)|-- age: integer (nullable = true)|-- city: string (nullable = true)
7. 注意事项
- 文件路径:
集群环境下需使用分布式存储路径(如 HDFS/S3),避免本地路径。 - 性能优化:
- 大文件避免使用
inferSchema(显式定义 Schema 更快) - 优先选择列式存储格式(如 Parquet/ORC)
- 大文件避免使用
- 依赖库:
读取特定格式(如 Excel)需添加额外依赖包。
完整示例(读取 CSV)
// 创建 SparkSession
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("ReadExample").getOrCreate()// 读取 CSV 并定义 Schema
val df = spark.read.option("header", "true").option("delimiter", ";").schema("name STRING, age INT, city STRING") // 显式定义 Schema.csv("hdfs:///data/users.csv")// 执行操作
df.filter($"age" > 30).show()
通过上述方法,可灵活处理各类普通文件数据。
