Spark 中数据读取方式详解:SparkSQL(DataFrame)与 SparkCore(RDD)方法对比及实践
在PySpark中,读取数据是数据处理流程的关键步骤。SparkCore(基于RDD)和SparkSQL(基于DataFrame)提供了不同的读取方式。下面我将以结构化的方式逐步解释这些方法,确保内容清晰易懂。所有代码示例均使用PySpark语法,并基于您提供的SparkSession创建模板(SparkSession.builder...getOrCreate())。以下内容分为两部分:SparkSQL读取方式(推荐,适用于结构化数据)和SparkCore读取方式(底层,适用于非结构化数据)。注意,实际应用中SparkSQL更常用,因为它优化了性能并提供丰富API。
1. SparkSQL读取数据方式(使用DataFrame API)
SparkSQL通过spark.read接口读取数据,支持多种数据源。核心方法是先创建SparkSession(如您示例所示),然后调用对应方法。参数包括数据源地址、类型和选项(通过option配置)。以下是常见场景的代码示例:
(1) 给定读取数据源的类型和地址,直接调用对应方法
- 根据文件格式(如CSV、JSON、Parquet)直接调用方法,指定文件路径。
 - 示例:读取CSV文件
from pyspark.sql import SparkSession# 创建SparkSession(参考您的模板) with SparkSession.builder.master("local[2]").appName("Demo").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 直接调用csv方法,指定路径df_csv = spark.read.csv("path/to/file.csv") # 基本读取df_csv.show()# 类似地,读取JSON文件 with SparkSession.builder...getOrCreate() as spark: # 省略重复代码df_json = spark.read.json("path/to/file.json")df_json.show()# 读取Parquet文件 with SparkSession.builder...getOrCreate() as spark:df_parquet = spark.read.parquet("path/to/file.parquet")df_parquet.show() 
(2) 使用option参数指定读取配置
option用于添加额外选项,如处理头信息、推断模式等。- 示例:读取CSV文件时启用头信息和模式推断
with SparkSession.builder...getOrCreate() as spark:# 使用option设置header和inferSchemadf = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/file.csv")df.printSchema() # 显示模式 - 常用选项:
header:是否使用第一行作为列名(默认false)。inferSchema:是否自动推断数据类型(默认false)。delimiter:指定分隔符(如,)。- 多个选项可链式调用:
.option("key1", "value1").option("key2", "value2")... 
 
(3) 通过JDBC读取数据库数据
- 使用
spark.read.jdbc方法连接关系型数据库(如MySQL、PostgreSQL)。 - 需提供JDBC URL、表名、连接属性(用户名、密码等)。
 - 示例:读取PostgreSQL数据库表
with SparkSession.builder...getOrCreate() as spark:# JDBC连接参数jdbc_url = "jdbc:postgresql://localhost:5432/mydatabase"properties = {"user": "your_username","password": "your_password","driver": "org.postgresql.Driver" # 需确保驱动在classpath}# 读取表数据df_jdbc = spark.read.jdbc(url=jdbc_url, table="tablename", properties=properties)df_jdbc.show() 
(4) 读取Hive表数据
- SparkSession默认支持Hive集成(需配置Hive元存储)。
 - 使用
spark.read.table读取表,或直接执行SQL查询。 - 示例:读取Hive表
with SparkSession.builder...getOrCreate() as spark:# 方法1: 直接读取表df_hive = spark.read.table("database_name.table_name") # 表名需完整df_hive.show()# 方法2: 使用SQL查询(更灵活)spark.sql("SELECT * FROM database_name.table_name").show() 
2. SparkCore读取数据方式(使用RDD API)
SparkCore基于RDD(Resilient Distributed Dataset),适用于非结构化数据(如文本文件)。它通过SparkContext读取,但功能较基础,不直接支持选项或结构化查询。核心方法是textFile读取文本文件为RDD。
- 创建SparkContext:在SparkSession中获取
SparkContext。 - 读取文本文件:使用
sc.textFile方法,指定文件路径。 - 示例:读取文本文件为RDD
from pyspark import SparkContext# 创建SparkSession(先获取SparkContext) with SparkSession.builder...getOrCreate() as spark:sc = spark.sparkContext # 从SparkSession获取SparkContext# 读取文本文件rdd = sc.textFile("path/to/file.txt") # 返回RDD[String]# 处理RDD(例如,打印前10行)print(rdd.take(10)) - 注意:
- RDD方式不支持
option参数,所有解析需手动处理。 - 对于结构化数据(如CSV),推荐先使用SparkSQL读取为DataFrame,再转换为RDD(
df.rdd),以获得更好性能。 
 - RDD方式不支持
 
总结
- SparkSQL(DataFrame API):首选方式,支持丰富数据源(文件、JDBC、Hive表)和
option参数,适合结构化数据处理。代码简洁高效。 - SparkCore(RDD API):底层方式,仅适合文本文件读取,功能有限,但灵活性强。
 - 最佳实践:优先使用SparkSQL,除非需要低级控制。在您的SparkSession模板基础上,结合数据源类型选择合适方法。确保路径和选项正确,以避免读取错误。如果您有具体场景(如文件格式或数据库类型),我可以提供更针对性的示例!
 
