当前位置: 首页 > news >正文

在scala中使用sparkSQL读入csv文件

以下是使用 Spark SQL(Scala)读取 CSV 文件的完整代码示例:

scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._object CSVReadExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("CSVReadExample").config("spark.master", "local[*]").getOrCreate()try {// 方法1:自动推断模式val df1 = spark.read.option("header", "true")  // 第一行是否包含列名.option("inferSchema", "true")  // 自动推断数据类型.csv("path/to/your/file.csv")println("方法1:自动推断模式")df1.printSchema()df1.show()// 方法2:指定自定义模式val customSchema = StructType(Array(StructField("id", IntegerType, nullable = true),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true),StructField("salary", DoubleType, nullable = true)))val df2 = spark.read.option("header", "true").schema(customSchema)  // 使用自定义模式.csv("path/to/your/file.csv")println("方法2:指定自定义模式")df2.printSchema()df2.show()// 方法3:读取多文件val df3 = spark.read.option("header", "true").csv("path/to/your/files/*.csv")  // 读取目录下所有CSV文件println("方法3:读取多文件")df3.count()df3.show()// 执行SQL查询示例df2.createOrReplaceTempView("people")val result = spark.sql("SELECT name, age FROM people WHERE age > 30")result.show()} catch {case e: Exception =>println(s"读取CSV失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}

常用 CSV 读取选项:

scala

spark.read.option("header", "true")  // 是否有表头.option("delimiter", ",")  // 分隔符,默认为逗号.option("quote", "\"")    // 引号字符,默认为双引号.option("escape", "\\")   // 转义字符.option("nullValue", "null")  // 指定空值表示.option("dateFormat", "yyyy-MM-dd")  // 日期格式.option("inferSchema", "true")  // 是否自动推断模式.csv("path/to/file.csv")

处理特殊情况:

  1. 处理引号包含的分隔符

    scala

    .option("quote", "\"")
    .option("escape", "\"")
    
  2. 处理包含换行符的字段

    scala

    .option("multiline", "true")
    
  3. 处理不同编码的文件

    scala

    .option("charset", "UTF-8")
    

执行步骤:

  1. 准备示例 CSV 文件 people.csv

    csv

    id,name,age,salary
    1,Alice,25,5000.0
    2,Bob,30,6000.0
    3,Charlie,35,7500.0
    
  2. 运行 Spark 应用:

    bash

    spark-submit --class CSVReadExample \--master local[*] \your-application.jar
    
  3. 也可以在 Spark Shell 中交互式运行:

    bash

    spark-shell
    
     

    然后粘贴代码片段执行

性能优化建议:

  1. 禁用自动推断模式(如果已知模式):

    scala

    .schema(customSchema)
    .option("inferSchema", "false")  // 提高性能
    
  2. 分区并行读取

    scala

    // 增加分区数提高并行度
    val df = spark.read.csv("path/to/file.csv").repartition(10)
    
  3. 使用列剪枝

    scala

    // 只选择需要的列
    df.select("name", "age")
    
  4. 过滤数据

    scala

    // 尽早过滤数据减少内存占用
    df.filter($"age" > 30)
    

错误处理:

  1. 处理格式错误

    scala

    .option("mode", "DROPMALFORMED")  // 丢弃格式错误的记录
    .option("mode", "PERMISSIVE")    // 将错误字段设为null
    .option("mode", "FAILFAST")      // 遇到错误立即失败
    
  2. 自定义错误处理

    scala

    import org.apache.spark.sql.Rowval df = spark.read.csv("path/to/file.csv")
    val validRows = df.rdd.filter { row =>try {// 自定义验证逻辑row.getString(1).length > 0} catch {case e: Exception => false}
    }
    

相关文章:

  • 前端面试每日三题 - Day 33
  • Vue 2 和 Vue 3的比较(二、语法差异)
  • 新一代动态可重构处理器技术,用于加速嵌入式 AI 应用
  • 索尼(sony)摄像机格式化后mp4的恢复方法
  • CAElinux系统详解
  • Retrofit vs Feign: 介绍、对比及示例
  • Spring Boot 跨域问题全解:原理、解决方案与最佳实践
  • Java GUI 开发之旅:Swing 组件与布局管理的实战探索
  • EBS 段值安全性配置
  • 【软件测试】第一章·软件测试概述
  • Spring AI 开发本地deepseek对话快速上手笔记
  • 理解计算机系统_并发编程(5)_基于线程的并发(二):线程api和基于线程的并发服务器
  • 正点原子T80烙铁拆解学习
  • 服务器制造业中,L2、L6、L10等表示什么意思
  • iVX 研发基座:大型系统开发的协作与安全架构实践
  • XA协议和Tcc
  • IP协议的特性
  • Java的进制转换
  • 通义灵码 - HTML智能编码辅助AI工具
  • OrangePi Zero 3学习笔记(Android篇)8 - OpenOCD
  • 5吨煤炭“瞬间蒸发”?掺水炭致企业损失千万,腐败窝案曝光
  • “异常”只停留在医院里,用艺术为“泡泡宝贝”加油
  • 远如《月球背面》,近似你我内心
  • 陕西河南山西等地将现“干热风”灾害,小麦产区如何防范?
  • 京东CEO许冉:外卖日单量接近2000万单,看到外卖对平台拉动和转化效应
  • 优化营商环境,服务上海“五个中心”建设,北外滩有何举措?