当前位置: 首页 > news >正文

在scala中sparkSQL读入csv文件

以下是 Scala 中使用 Spark SQL 读取 CSV 文件的核心步骤和代码示例(纯文本):

 

1. 创建 SparkSession

 

scala

import org.apache.spark.sql.SparkSession  

 

val spark = SparkSession.builder()  

  .appName("Spark SQL Read CSV")  

  .master("local[*]") // 单机模式,集群改为 "yarn" 等  

  .getOrCreate()  

 

 

2. 基础读取(默认配置)

 

scala

// 读取 CSV 文件(默认:逗号分隔,无表头,字段类型为 String)  

val dfDefault = spark.read.csv("file:///path/to/your/data.csv")  

 

// 示例输出:显示前 20 行数据  

dfDefault.show()  

 

 

3. 指定表头和自动推断类型

 

scala

val dfWithSchema = spark.read  

  .option("header", "true") // 首行作为表头  

  .option("inferSchema", "true") // 自动推断数据类型(如 Int、Double)  

  .csv("file:///path/to/your/data_with_header.csv")  

 

// 查看表结构  

dfWithSchema.printSchema()  

 

 

4. 自定义分隔符和空值处理

 

scala

val dfCustom = spark.read  

  .option("header", "true")  

  .option("delimiter", ";") // 分隔符改为分号(适用于欧洲格式 CSV)  

  .option("nullValue", "NA") // 将 "NA" 识别为空值  

  .csv("file:///path/to/your/custom_separator.csv")  

 

 

5. 读取带引号的复杂文本

 

scala

val dfQuoted = spark.read  

  .option("header", "true")  

  .option("quote", "\"") // 文本字段引号(默认双引号)  

  .option("escape", "\"") // 转义符(处理嵌套引号)  

  .csv("file:///path/to/your/quoted_data.csv")  

 

 

6. 读取远程文件(如 HDFS/S3)

 

scala

// HDFS 路径示例  

val dfHdfs = spark.read.csv("hdfs://namenode:8020/user/data.csv")  

 

// S3 路径示例(需配置 AWS 凭证)  

val dfS3 = spark.read.csv("s3a://bucket-name/path/data.csv")  

 

 

7. 读取多个文件或目录

 

scala

// 读取目录下所有 CSV 文件  

val dfDirectory = spark.read.csv("file:///path/to/csv_directory/")  

 

// 按通配符匹配文件(如读取以 "part-" 开头的文件)  

val dfWildcard = spark.read.csv("file:///path/to/part-*.csv")  

 

 

关键参数说明

 

表格

参数名 说明 

 header  是否包含表头( true / false ,默认  false ) 

 inferSchema  是否自动推断数据类型(需  header=true ,默认  false ) 

 delimiter  字段分隔符(默认  , ,可改为  ; 、 \t  等) 

 quote  文本字段引号(默认  " ,用于包裹含分隔符的文本) 

 escape  转义符(处理引号内的特殊字符) 

 nullValue  空值标识(如  NA 、 NULL ,默认空字符串视为 null) 

 

完整代码示例

 

scala

import org.apache.spark.sql.SparkSession  

 

object SparkReadCSV {  

  def main(args: Array[String]): Unit = {  

    val spark = SparkSession.builder()  

      .appName("Read CSV with Spark SQL")  

      .master("local[*]")  

      .getOrCreate()  

 

    // 读取带表头的 CSV 并推断类型  

    val df = spark.read  

      .option("header", "true")  

      .option("inferSchema", "true")  

      .csv("file:///user/data/products.csv")  

 

    // 打印数据和结构  

    println("Data:")  

    df.show(false)  

    println("\nSchema:")  

    df.printSchema()  

 

    spark.stop()  

  }  

}  

 

 

执行时直接运行代码即可,Spark 会自动处理 CSV 解析(无需额外依赖包)。

相关文章:

  • maven工程跳过@SpringTest
  • Linux干货(一)
  • 需求实现与测试验证脱节,如何确保产品质量
  • 下周,Coinbase将被纳入标普500指数
  • 解锁 CPFR 潜力:电商智能补货优化算法的全链路设计与实战指南
  • 二叉树、红黑树与 B 树的对比
  • arctanx 导数 泰勒展开式证明
  • 代码随想录算法训练营第三十九天
  • 日常学习开发记录-rate评价组件
  • docker-compose部署thingsboard/tb-cassandra
  • MySQL:关系模型的基本理论
  • 这类物种组织heatmap有点东西
  • 贪心算法:最小生成树
  • idea2021创建web项目及其整合tomcat
  • base64形式的图片数据保存方法
  • 深入解析 I/O 模型:原理、区别与 Java 实践
  • 前端——布局方式
  • 【老飞飞源码】新版高清飞飞源码+数据库+客户端+服务器端完整文件打包
  • 易境通海外仓系统:中转业务管理的智能化解决方案!
  • 北三短报文数传终端:筑牢水利防汛“智慧防线”,守护江河安澜
  • 四部门:强化汛期农业防灾减灾,奋力夺取粮食和农业丰收
  • 李强会见巴西总统卢拉
  • 沈阳卫健委通报“健康证”办理乱象:涉事医院已被立案查处
  • 专访|西蒙·斯特朗格:以“辞典”的方式讲述二战家族史
  • 金科股份重整方案通过,正式进入重整计划执行环节
  • 耿军强任陕西延安市领导,此前任陕西省公安厅机场公安局局长