从 CSV文件的加载、分区和处理 来理解 Spark RDD
看了不少关于Spark RDD的介绍,其实看都可以看得懂,但是还是会有不少疑问。比如
- RDD 是抽象的,那么RDD中的谈到的分区列表、依赖关系、计算函数又是存储在哪的?
- 执行器是怎么就拿到了自己的 分区数据的?会不会多拿到其他分区的数据
- RDD分区列表中如果不存储真实数据,那么这些数据又是怎么分配到执行器的?
这里我拆解一个 csv文件在 HDFS上 如何被 Spark RDD 加载、分区和存储的,就应该很方便理解 RDD了。
1. HDFS 存储层面:物理分块
- 假设: 比如 CSV 文件
data.csv
大小为 256MB,HDFS 默认块大小(block size)为 128MB。 - HDFS 行为:
- HDFS 会自动将文件切割成 2 个物理块:
Block 0
: 0 - 128MB (存储在节点Node1
和Node2
上,副本)Block 1
: 128MB - 256MB (存储在节点Node3
和Node4
上,副本)
- 关键点: 这是 物理存储级别 的分块,由 HDFS 控制,目的是容错和分布式存储。
- HDFS 会自动将文件切割成 2 个物理块:
2. Spark 读取:创建初始 RDD (textFile
)
当我在 Spark 中执行:
JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv");
Spark 会执行以下操作:
a) 确定 RDD 的分区数
-
默认规则: 每个 HDFS Block 对应 1个 RDD 分区。
- 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (
Partition 0
,Partition 1
)。
- 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (
-
手动指定分区数:
val rdd = sc.textFile("hdfs://path/to/data.csv", 4) // 强制分成4个分区
- 如果文件不可切分(如 GZIP 压缩文件),分区数 = 文件数。
- 如果文件可切分(如 CSV),Spark 会尝试按字节划分成 4 份(可能不等分)。
b) 分区与 HDFS Block 的映射
RDD 分区 | 负责的 HDFS Block 范围 | 数据位置偏好 (Preferred Locations) |
---|---|---|
分区 0 | 0 - 128MB (Block 0) | [Node1, Node2] (Block 0 的存储节点) |
分区 1 | 128MB - 256MB (Block 1) | [Node3, Node4] (Block 1 的存储节点) |
✅ 极其重要: 每个 RDD 分区知道它应该读取哪个 HDFS Block,并且知道该 Block 存储在哪些节点上。
3. 数据存储位置:运行时过程
执行阶段:
- Driver 分配任务:
- 根据 RDD 分区的位置偏好,Driver 将任务分配给离数据最近的 Executor。
- 例如:
Task 0
(处理分区0) → 优先调度到Node1
或Node2
上的 Executor。Task 1
(处理分区1) → 优先调度到Node3
或Node4
上的 Executor。
- Executor 读取数据:
- 每个 Task 启动后,通过 HDFS Client 读取对应的 Block。
- 本地性优化:
- 如果 Task 在
Node1
执行,它直接从本机的 HDFS DataNode 读取Block 0
(本地读取,速度最快)。 - 如果
Node1
繁忙,Task 可能调度到Node2
→ 从同一机架内的Node1
读取 (机架本地性,速度中等)。 - 最差情况:跨机架读取(如调度到
Node5
读取Block 0
)。
- 如果 Task 在
- 数据在 Executor 中的存储:
- 读取的数据以 行(String) 的形式加载到内存(每行是 CSV 中的一条记录)。
- 存储位置:Executor 的 JVM 堆内存 中(除非指定了持久化级别)。
- 存储形式:按分区存储,每个 Task 处理的分区数据独立存在于执行它的 Executor 内存中。
4. 分区数据内容
假设 CSV 内容如下:
1,Alice,Beijing
2,Bob,Shanghai
1001,David,Shanghai
1002,Eve,Shenzhen
-
分区 0 可能包含:
["1,Alice,Beijing", "2,Bob,Shanghai", "3,Charlie,Beijing", ...] # 前128MB
-
分区 1 可能包含:
["1001,David,Shanghai", "1002,Eve,Shenzhen", ...] # 后128MB
⚠️ 注意: 分区边界是按字节切割的,可能截断某一行(最后一行不完整)。Spark 会确保:
- 分区0读取到第一个完整行 → 最后一个完整行
- 分区1从上个分区未读完的位置开始 → 直到文件结束
5.详细执行过程
我们所编写的spark代码整体如下,刚刚说的是 加载csv部分,下面说的就是 数据清洗与聚合了。
JavaSparkContext sc = new JavaSparkContext();// 1. 加载 CSV(假设文件 256MB,2个HDFS Block)
JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv"); // 2个分区// 2. 数据清洗(过滤 + 提取字段)
JavaPairRDD<String, Integer> cleanedRdd = rdd.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return !line.contains("id"); // 过滤标题行}
}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple2<>(arr[2], 1); // (城市, 1)}
});// 3. Shuffle操作(按城市聚合)
JavaPairRDD<String, Integer> cityCountRdd = cleanedRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer a, Integer b) throws Exception {return a + b; }
});// 4. 触发计算(行动操作)
List<Tuple2<String, Integer>> results = cityCountRdd.collect();// 输出结果
for (Tuple2<String, Integer> result : results) {System.out.println(result._1() + ": " + result._2());
}
阶段1:加载与初始分区(窄依赖)
阶段2:转换操作(窄依赖 - 流水线执行)
-
Executor1 处理 Partition 0:
# 输入 ["1,Alice,Beijing", "2,Bob,Shanghai", ...]# 转换后 [("Beijing",1), ("Shanghai",1), ("Beijing",1), ...]
-
Executor2 处理 Partition 1:
# 输入 ["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]# 转换后 [("Shanghai",1), ("Shenzhen",1), ...]
这里我们其实可以发现一个特点,就是每个 Executor 都整好处理的是 自己机器上的 Partition数据。
✅ 关键特点:无需跨节点通信,各分区独立处理
阶段3:Shuffle操作(宽依赖 - 数据重组)
步骤 1: 划分 Stage(Driver决策)
- 识别
reduceByKey
是宽依赖 → 创建新 Stage - Stage 0: 所有窄依赖操作(textFile → filter → map)
- Stage 1: reduceByKey
步骤 2:Stage 0 执行(Shuffle Write)
-
Shuffle 写操作:
-
每个 Task 将自己的数据 按 Key(城市)分组
-
根据默认分区器(
HashPartitioner
)计算目标分区Partition 0: Beijing, Shenzhen → Hash值%2=0 Partition 1: Shanghai → Hash值%2=1
-
将数据写入 本地磁盘的 Shuffle 临时文件,并生成索引文件
-
-
Executor1 写入:
# 文件:shuffle_0_temp_0.data (Partition 0) ("Beijing",1), ("Beijing",1)# 文件:shuffle_0_temp_1.data (Partition 1) ("Shanghai",1)
-
Executor2 写入:
# 文件:shuffle_1_temp_0.data (Partition 0) ("Shenzhen",1)# 文件:shuffle_1_temp_1.data (Partition 1) ("Shanghai",1)
步骤 3: Stage 1 执行(Shuffle Read)
-
Driver 创建新 Task:
- Task A:处理最终 RDD 的 Partition 0(Key: Beijing, Shenzhen)
- Task B:处理最终 RDD 的 Partition 1(Key: Shanghai)
-
Shuffle 读操作:
-
Task A 从 所有 Executor 拉取属于 Partition 0 的数据:
# 从 Executor1 拉取:("Beijing",1), ("Beijing",1) # 从 Executor2 拉取:("Shenzhen",1)
-
Task B 拉取 Partition 1 的数据:
# 从 Executor1 拉取:("Shanghai",1) # 从 Executor2 拉取:("Shanghai",1)
-
-
聚合计算:
-
Task A 执行
reduceByKey
:Beijing: 1 + 1 = 2 Shenzhen: 1
-
Task B 执行:
Shanghai: 1 + 1 = 2
-
阶段 4: 结果收集
最终数据到达 Driver:
[("Beijing",2), ("Shenzhen",1), ("Shanghai",2)]
Shuffle 数据流全景
为什么需要 Shuffle?因为有些操作必须要将数据进行重新分区才好进行计算、统计。
操作类型 | 数据移动 | 网络开销 | 典型操作 |
---|---|---|---|
窄依赖 | 数据不动 Task移动 | 低 | map , filter |
宽依赖 | 数据按Key重组 跨节点传输 | 高 | groupByKey , reduceByKey |
通过这个完整示例,就可以看到:
- 分区如何从物理存储映射到计算单元
- 窄依赖如何实现零数据传输的流水线
- 宽依赖如何通过Shuffle重组数据
- Stage划分如何驱动分布式计算
这正是 RDD 抽象的核心价值——通过清晰的阶段划分和依赖关系,在分布式环境中高效执行复杂计算。不需要拘束于通过概念理解 RDD。
5. 核心问题解答
Q1:分区数据存储在哪里?
- 原始数据: 物理存储在 HDFS DataNode(如
Node1
,Node2
,Node3
,Node4
)。 - RDD 处理时:
- 计算中:数据加载到 Executor 的 JVM 堆内存。
- 持久化后:可缓存到 Executor 内存/磁盘(通过
rdd.persist()
)。
Q2:会多获取数据吗?
-
绝对不会! 每个 Task 只读取自己分区映射的 HDFS Block 范围。
-
优化机制:
机制 作用 位置感知调度 Task 优先在存有数据的节点执行 HDFS 块定位 Executor 直接读取本地或邻近节点的数据块 精确的字节范围读取 每个 Task 只读取分配给它的连续字节区间
6. 总结:
关键原则:
-
移动计算,而非数据:将 Task 发送到数据所在的节点。简而言之就是让计算尽量贴近数据。我们可能习惯于将数据传输到计算端进行处理,比如查询mysql、es、mongo。但是对于大数据处理来说,计算逻辑好移动,而数据难移动。
-
分而治之:大文件被划分成小分区,并行处理。这其实就是类似于后端常用的多线程,大文件划区之后,并行处理,那么整体速度就得到极大提升。
-
精准映射:RDD 分区与 HDFS Block 一一对应,无重复读取。不光HDFS天然分块,其实很多分布式文件系统都是有分块机制的,比如S3。对于不天然支持分块的数据源,比如mysql 是可以人工分区的。
// 并行读取 MySQL (按照分区键进行划分区,此处就是1-1000000是分区0,1000001-2000000是分区1 以此类推...) val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://host:3306/db").option("dbtable", "orders").option("partitionColumn", "order_id") // 分区键.option("lowerBound", 1) // 最小值.option("upperBound", 1000000) // 最大值.option("numPartitions", 10) // 分区数.load()
通过这种设计,Spark 能高效处理远大于内存的分布式文件,而开发者只需写几行代码。