当前位置: 首页 > news >正文

从 CSV文件的加载、分区和处理 来理解 Spark RDD

看了不少关于Spark RDD的介绍,其实看都可以看得懂,但是还是会有不少疑问。比如

  • RDD 是抽象的,那么RDD中的谈到的分区列表、依赖关系、计算函数又是存储在哪的?
  • 执行器是怎么就拿到了自己的 分区数据的?会不会多拿到其他分区的数据
  • RDD分区列表中如果不存储真实数据,那么这些数据又是怎么分配到执行器的?

这里我拆解一个 csv文件在 HDFS上 如何被 Spark RDD 加载、分区和存储的,就应该很方便理解 RDD了。


1. HDFS 存储层面:物理分块

  • 假设: 比如 CSV 文件 data.csv 大小为 256MB,HDFS 默认块大小(block size)为 128MB
  • HDFS 行为:
    • HDFS 会自动将文件切割成 2 个物理块:
      • Block 0: 0 - 128MB (存储在节点 Node1Node2 上,副本)
      • Block 1: 128MB - 256MB (存储在节点 Node3Node4 上,副本)
    • 关键点: 这是 物理存储级别 的分块,由 HDFS 控制,目的是容错和分布式存储。

2. Spark 读取:创建初始 RDD (textFile)

当我在 Spark 中执行:

JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv");  

Spark 会执行以下操作:

a) 确定 RDD 的分区数
  • 默认规则: 每个 HDFS Block 对应 1个 RDD 分区

    • 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (Partition 0, Partition 1)。
  • 手动指定分区数:

    val rdd = sc.textFile("hdfs://path/to/data.csv", 4) // 强制分成4个分区
    
    • 如果文件不可切分(如 GZIP 压缩文件),分区数 = 文件数。
    • 如果文件可切分(如 CSV),Spark 会尝试按字节划分成 4 份(可能不等分)。
b) 分区与 HDFS Block 的映射
RDD 分区负责的 HDFS Block 范围数据位置偏好 (Preferred Locations)
分区 00 - 128MB (Block 0)[Node1, Node2] (Block 0 的存储节点)
分区 1128MB - 256MB (Block 1)[Node3, Node4] (Block 1 的存储节点)

极其重要: 每个 RDD 分区知道它应该读取哪个 HDFS Block,并且知道该 Block 存储在哪些节点上。


3. 数据存储位置:运行时过程

执行阶段:
  1. Driver 分配任务:
    • 根据 RDD 分区的位置偏好,Driver 将任务分配给离数据最近的 Executor。
    • 例如:
      • Task 0 (处理分区0) → 优先调度到 Node1Node2 上的 Executor。
      • Task 1 (处理分区1) → 优先调度到 Node3Node4 上的 Executor。
  2. Executor 读取数据:
    • 每个 Task 启动后,通过 HDFS Client 读取对应的 Block。
    • 本地性优化:
      • 如果 Task 在 Node1 执行,它直接从本机的 HDFS DataNode 读取 Block 0 (本地读取,速度最快)。
      • 如果 Node1 繁忙,Task 可能调度到 Node2 → 从同一机架内的 Node1 读取 (机架本地性,速度中等)。
      • 最差情况:跨机架读取(如调度到 Node5 读取 Block 0)。
  3. 数据在 Executor 中的存储:
    • 读取的数据以 行(String) 的形式加载到内存(每行是 CSV 中的一条记录)。
    • 存储位置:Executor 的 JVM 堆内存 中(除非指定了持久化级别)。
    • 存储形式:按分区存储,每个 Task 处理的分区数据独立存在于执行它的 Executor 内存中。

4. 分区数据内容

假设 CSV 内容如下:

1,Alice,Beijing
2,Bob,Shanghai
1001,David,Shanghai
1002,Eve,Shenzhen
  • 分区 0 可能包含:

    ["1,Alice,Beijing", "2,Bob,Shanghai", "3,Charlie,Beijing", ...]  # 前128MB
    
  • 分区 1 可能包含:

    ["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]  # 后128MB
    

⚠️ 注意: 分区边界是按字节切割的,可能截断某一行(最后一行不完整)。Spark 会确保:

  • 分区0读取到第一个完整行 → 最后一个完整行
  • 分区1从上个分区未读完的位置开始 → 直到文件结束

5.详细执行过程

我们所编写的spark代码整体如下,刚刚说的是 加载csv部分,下面说的就是 数据清洗与聚合了。

JavaSparkContext sc = new JavaSparkContext();// 1. 加载 CSV(假设文件 256MB,2个HDFS Block)
JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv");  // 2个分区// 2. 数据清洗(过滤 + 提取字段)
JavaPairRDD<String, Integer> cleanedRdd = rdd.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return !line.contains("id");  // 过滤标题行}
}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple2<>(arr[2], 1);  // (城市, 1)}
});// 3. Shuffle操作(按城市聚合)
JavaPairRDD<String, Integer> cityCountRdd = cleanedRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer a, Integer b) throws Exception {return a + b;  }
});// 4. 触发计算(行动操作)
List<Tuple2<String, Integer>> results = cityCountRdd.collect();// 输出结果
for (Tuple2<String, Integer> result : results) {System.out.println(result._1() + ": " + result._2());
}

阶段1:加载与初始分区(窄依赖)

读取
读取
HDFS Block 0
Executor1-Part0
HDFS Block 1
Executor2-Part1

阶段2:转换操作(窄依赖 - 流水线执行)

Part0原始数据
filter+map操作
Part0清洗后
Part1原始数据
filter+map操作
Part1清洗后
  • Executor1 处理 Partition 0

    # 输入
    ["1,Alice,Beijing", "2,Bob,Shanghai", ...]# 转换后
    [("Beijing",1), ("Shanghai",1), ("Beijing",1), ...]
    
  • Executor2 处理 Partition 1

    # 输入
    ["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]# 转换后
    [("Shanghai",1), ("Shenzhen",1), ...]
    

这里我们其实可以发现一个特点,就是每个 Executor 都整好处理的是 自己机器上的 Partition数据。

关键特点:无需跨节点通信,各分区独立处理

阶段3:Shuffle操作(宽依赖 - 数据重组)

步骤 1: 划分 Stage(Driver决策)
  • 识别 reduceByKey 是宽依赖 → 创建新 Stage
  • Stage 0: 所有窄依赖操作(textFile → filter → map)
  • Stage 1: reduceByKey
步骤 2:Stage 0 执行(Shuffle Write)
Part0清洗后
按城市分区
Part1清洗后
按城市分区
Executor1本地磁盘
Executor2本地磁盘
  • Shuffle 写操作

    • 每个 Task 将自己的数据 按 Key(城市)分组

    • 根据默认分区器(HashPartitioner)计算目标分区

      Partition 0: Beijing, Shenzhen → Hash值%2=0
      Partition 1: Shanghai → Hash值%2=1
      
    • 将数据写入 本地磁盘的 Shuffle 临时文件,并生成索引文件

  • Executor1 写入

    # 文件:shuffle_0_temp_0.data (Partition 0)
    ("Beijing",1), ("Beijing",1)# 文件:shuffle_0_temp_1.data (Partition 1)
    ("Shanghai",1)
    
  • Executor2 写入

    # 文件:shuffle_1_temp_0.data (Partition 0)
    ("Shenzhen",1)# 文件:shuffle_1_temp_1.data (Partition 1)
    ("Shanghai",1)
    
步骤 3: Stage 1 执行(Shuffle Read)
Shuffle文件
TaskA
Shuffle文件
TaskB
  • Driver 创建新 Task

    • Task A:处理最终 RDD 的 Partition 0(Key: Beijing, Shenzhen)
    • Task B:处理最终 RDD 的 Partition 1(Key: Shanghai)
  • Shuffle 读操作

    • Task A 从 所有 Executor 拉取属于 Partition 0 的数据:

      # 从 Executor1 拉取:("Beijing",1), ("Beijing",1)
      # 从 Executor2 拉取:("Shenzhen",1)
      
    • Task B 拉取 Partition 1 的数据:

      # 从 Executor1 拉取:("Shanghai",1)
      # 从 Executor2 拉取:("Shanghai",1)
      
  • 聚合计算

    • Task A 执行 reduceByKey

      Beijing: 1 + 1 = 2
      Shenzhen: 1
      
    • Task B 执行:

      Shanghai: 1 + 1 = 2
      

阶段 4: 结果收集

TaskA结果
Driver
TaskB结果

最终数据到达 Driver:

[("Beijing",2), ("Shenzhen",1), ("Shanghai",2)]

Shuffle 数据流全景

Stage 1
Stage 0
Shuffle Write
Shuffle Write
Part0数据
Part0数据
Part1数据
Part1数据
TaskA
TaskB
磁盘文件
Executor1
磁盘文件
Executor2
最终结果

为什么需要 Shuffle?因为有些操作必须要将数据进行重新分区才好进行计算、统计。

操作类型数据移动网络开销典型操作
窄依赖数据不动 Task移动map, filter
宽依赖数据按Key重组 跨节点传输groupByKey, reduceByKey

通过这个完整示例,就可以看到:

  1. 分区如何从物理存储映射到计算单元
  2. 窄依赖如何实现零数据传输的流水线
  3. 宽依赖如何通过Shuffle重组数据
  4. Stage划分如何驱动分布式计算

这正是 RDD 抽象的核心价值——通过清晰的阶段划分和依赖关系,在分布式环境中高效执行复杂计算。不需要拘束于通过概念理解 RDD。

5. 核心问题解答

Q1:分区数据存储在哪里?
  • 原始数据: 物理存储在 HDFS DataNode(如 Node1, Node2, Node3, Node4)。
  • RDD 处理时:
    • 计算中:数据加载到 Executor 的 JVM 堆内存
    • 持久化后:可缓存到 Executor 内存/磁盘(通过 rdd.persist())。
Q2:会多获取数据吗?
  • 绝对不会! 每个 Task 只读取自己分区映射的 HDFS Block 范围

  • 优化机制:

    机制作用
    位置感知调度Task 优先在存有数据的节点执行
    HDFS 块定位Executor 直接读取本地或邻近节点的数据块
    精确的字节范围读取每个 Task 只读取分配给它的连续字节区间

6. 总结:

DriverExecutorHDFSLocalDiskOtherExecutor初始化阶段1. 创建SparkContext2. 构建RDD依赖图 (DAG)3. 划分Stage (根据Shuffle边界)4. 计算分区/任务列表(构建DAG)Stage 0 执行 (ShuffleMapStage)5. 发送Stage0任务(含分区计算逻辑)6. 读取指定数据块7. 执行流水线操作:- Map- Filter- Partitioning8. 将结果写入内存缓冲区9. Shuffle Write:- 排序- 按分区写入本地磁盘文件10. 汇报状态和Shuffle元数据Stage 1 执行 (ResultStage)11. 发送Stage1任务(含聚合逻辑)12. Shuffle Read:- 根据元数据拉取对应分区的数据13. 合并数据(可能溢写到磁盘)14. 执行聚合操作:- reduceByKey- combine15. 返回最终结果分区数据16. 汇总所有分区的结果DriverExecutorHDFSLocalDiskOtherExecutor

关键原则:

  1. 移动计算,而非数据:将 Task 发送到数据所在的节点。简而言之就是让计算尽量贴近数据。我们可能习惯于将数据传输到计算端进行处理,比如查询mysql、es、mongo。但是对于大数据处理来说,计算逻辑好移动,而数据难移动。

  2. 分而治之:大文件被划分成小分区,并行处理。这其实就是类似于后端常用的多线程,大文件划区之后,并行处理,那么整体速度就得到极大提升。

  3. 精准映射:RDD 分区与 HDFS Block 一一对应,无重复读取。不光HDFS天然分块,其实很多分布式文件系统都是有分块机制的,比如S3。对于不天然支持分块的数据源,比如mysql 是可以人工分区的。

    // 并行读取 MySQL (按照分区键进行划分区,此处就是1-1000000是分区0,1000001-2000000是分区1 以此类推...)
    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://host:3306/db").option("dbtable", "orders").option("partitionColumn", "order_id")   // 分区键.option("lowerBound", 1)                // 最小值.option("upperBound", 1000000)          // 最大值.option("numPartitions", 10)            // 分区数.load()
    

通过这种设计,Spark 能高效处理远大于内存的分布式文件,而开发者只需写几行代码。

http://www.dtcms.com/a/282077.html

相关文章:

  • 设计模式—初识设计模式
  • 【kubernetes】--安全认证机制
  • Linux4:线程
  • 前端技术之---应用国际化(vue-i18n)
  • UE5多人MOBA+GAS 24、创建属性UI(一)
  • ubuntu24 c++ 自定义目录编译opencv4.12
  • Ubuntu GRUB菜单密码重置教程
  • 电脑安装 Win10 提示无法在当前分区上安装Windows的解决办法
  • WPF+CEF 执行JS报错
  • 从零开始的云计算生活——番外3,LVS+KeepAlived+Nginx高可用实现方案
  • [1-01-01].第43节:常用类 - 比较器类 Comparator接口
  • 【DataWhale】快乐学习大模型 | 202507,Task02笔记
  • Grok 系列大模型:xAI 的智能宇宙探秘
  • web前端用MVP模式搭建项目
  • DNS防护实战:用ipset自动拦截异常解析与群联AI云防护集成
  • 用PyTorch手写透视变换
  • 【unitrix】 6.4 类型化数特征(t_number.rs)
  • Rust 基础大纲
  • AI产品经理面试宝典第27天:AI+农业精准养殖与智能决策相关面试题解答指导
  • 疗愈之手的智慧觉醒:Deepoc具身智能如何重塑按摩机器人的触觉神经
  • mongoDB集群
  • Jmeter+ant+jenkins接口自动化测试框架
  • 汽车功能安全-相关项集成和测试(系统集成测试系统合格性测试)-12
  • LabVIEW液压机智能监控
  • 【游戏引擎之路】登神长阶(十九):3D物理引擎——岁不寒,无以知松柏;事不难,无以知君子
  • WSL2更新后Ubuntu 24.04打不开(终端卡住,没有输出)
  • 模型上下文协议(MCP)的工作流程、安全威胁与未来发展方向
  • 海康线扫相机通过采集卡的取图设置
  • 作业06-文本工单调优
  • UE5 相机后处理材质与动态参数修改