在scala中使用sparkSQL读入csv文件
以下是使用 Spark SQL(Scala)读取 CSV 文件的完整代码示例:
scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._object CSVReadExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("CSVReadExample").config("spark.master", "local[*]").getOrCreate()try {// 方法1:自动推断模式val df1 = spark.read.option("header", "true") // 第一行是否包含列名.option("inferSchema", "true") // 自动推断数据类型.csv("path/to/your/file.csv")println("方法1:自动推断模式")df1.printSchema()df1.show()// 方法2:指定自定义模式val customSchema = StructType(Array(StructField("id", IntegerType, nullable = true),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true),StructField("salary", DoubleType, nullable = true)))val df2 = spark.read.option("header", "true").schema(customSchema) // 使用自定义模式.csv("path/to/your/file.csv")println("方法2:指定自定义模式")df2.printSchema()df2.show()// 方法3:读取多文件val df3 = spark.read.option("header", "true").csv("path/to/your/files/*.csv") // 读取目录下所有CSV文件println("方法3:读取多文件")df3.count()df3.show()// 执行SQL查询示例df2.createOrReplaceTempView("people")val result = spark.sql("SELECT name, age FROM people WHERE age > 30")result.show()} catch {case e: Exception =>println(s"读取CSV失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}
常用 CSV 读取选项:
scala
spark.read.option("header", "true") // 是否有表头.option("delimiter", ",") // 分隔符,默认为逗号.option("quote", "\"") // 引号字符,默认为双引号.option("escape", "\\") // 转义字符.option("nullValue", "null") // 指定空值表示.option("dateFormat", "yyyy-MM-dd") // 日期格式.option("inferSchema", "true") // 是否自动推断模式.csv("path/to/file.csv")
处理特殊情况:
-
处理引号包含的分隔符:
scala
.option("quote", "\"") .option("escape", "\"")
-
处理包含换行符的字段:
scala
.option("multiline", "true")
-
处理不同编码的文件:
scala
.option("charset", "UTF-8")
执行步骤:
-
准备示例 CSV 文件
people.csv
:csv
id,name,age,salary 1,Alice,25,5000.0 2,Bob,30,6000.0 3,Charlie,35,7500.0
-
运行 Spark 应用:
bash
spark-submit --class CSVReadExample \--master local[*] \your-application.jar
-
也可以在 Spark Shell 中交互式运行:
bash
spark-shell
然后粘贴代码片段执行
性能优化建议:
-
禁用自动推断模式(如果已知模式):
scala
.schema(customSchema) .option("inferSchema", "false") // 提高性能
-
分区并行读取:
scala
// 增加分区数提高并行度 val df = spark.read.csv("path/to/file.csv").repartition(10)
-
使用列剪枝:
scala
// 只选择需要的列 df.select("name", "age")
-
过滤数据:
scala
// 尽早过滤数据减少内存占用 df.filter($"age" > 30)
错误处理:
-
处理格式错误:
scala
.option("mode", "DROPMALFORMED") // 丢弃格式错误的记录 .option("mode", "PERMISSIVE") // 将错误字段设为null .option("mode", "FAILFAST") // 遇到错误立即失败
-
自定义错误处理:
scala
import org.apache.spark.sql.Rowval df = spark.read.csv("path/to/file.csv") val validRows = df.rdd.filter { row =>try {// 自定义验证逻辑row.getString(1).length > 0} catch {case e: Exception => false} }