当前位置: 首页 > news >正文

RDD的特点、算子与创建方法

《Spark大数据开发与应用案例(视频教学版)(大数据技术丛书)》(段海涛,杨忠良,余辉)【摘要 书评 试读】- 京东图书

本节将对RDD的基本概念、特点、分类、使用方法进行详细讲解。RDD作为Spark的核心数据结构,承载着弹性分布式数据集的特性。本节将深入探讨RDD的特点、算子的精细分类以及多样化的创建方法,为Spark数据处理奠定坚实基础。

4.1.1  RDD的特点

RDD(Resilient Distributed Dataset,弹性分布式数据集)是Apache Spark中的一个核心概念,具有以下特点。

(1)不可变性(Immutability):RDD一旦创建,就不能被修改。这种不可变性确保了数据的一致性和容错性。如果需要修改数据,可以创建一个新的RDD。

(2)分布式存储:RDD中的数据是分布式存储的,可以跨多个节点进行存储和处理。这种分布式存储方式使得Spark能够处理大规模数据集。

(3)容错性(Fault Tolerance):由于RDD是不可变的,Spark可以记录RDD的创建过程(即lineage,血统)。当某个RDD的分区丢失时,Spark可以通过重新计算其依赖的RDD来恢复丢失的数据,而无需重新计算整个数据集。

(4)惰性计算(Lazy Evaluation):RDD的操作是惰性的,即只有在实际需要计算结果时(如调用collect()、count()等行动操作)才会执行。这种惰性计算机制使得Spark能够优化执行计划,提高计算效率。

(5)多种操作:RDD支持两种类型的操作,即转换(transformation)和行动(action)。转换操作会返回一个新的RDD,而行动操作会触发计算并返回结果到驱动程序。

(6)自定义分区:RDD允许用户自定义分区策略,以优化数据分布和计算性能。用户可以通过实现Partitioner接口来自定义分区方式。

(7)与存储系统的集成:RDD可以与多种存储系统(如HDFS、S3、Cassandra等)进行集成,方便地从这些存储系统中读取数据和写入数据。

(8)高效的内存计算:RDD支持将数据缓存在内存中,从而加速后续的计算操作。这种内存计算模式使得Spark在处理迭代计算、机器学习等任务时具有显著的性能优势。

(9)灵活性:RDD提供了丰富的API,允许用户以灵活的方式对数据进行处理。用户可以使用高阶函数(如map、filter、reduce等)来定义复杂的计算逻辑。

(10)扩展性:RDD的抽象层次较低,允许开发者根据需要创建自定义的RDD类型,以支持特定的数据处理需求。例如,Spark SQL中的DataFrame和Dataset都是基于RDD构建的更高层次的抽象。

这些特点使得RDD成为处理大规模数据集的强大工具,特别适用于需要高效、容错和灵活数据处理的应用场景。

RDD和RDD之间存在一系列依赖关系(参见图4-1)。RDD调用Transformation后会生成一个新的RDD,子RDD会记录父RDD的依赖关系,包括宽依赖(有shuffle)和窄依赖(没有shuffle)。下一节会对依赖关系中的算子分类进行详细讲解。

图4-1  RDD的依赖关系

4.1.2  RDD的算子分类

RDD中算子分为两大类,Transformation(即转换算子)和Action(即行动算子)。

  • Transformation:即转换算子,调用转换算子会生成一个新的RDD,Transformation是Lazy的,不会触发job执行。
  • Action:即行动算子,调用行动算子会触发job执行,本质上是调用了sc.runJob方法,该方法从最后一个RDD,根据其依赖关系,从后往前划分Stage,生成TaskSet。

RDD的算子通过转换和行动两类算子的组合,实现了对分布式数据的复杂处理。转换算子定义了数据的转换逻辑;而行动算子则触发了这些转换逻辑的执行,并产生了最终结果。这种设计使得Spark能够高效地处理大规模数据集。

4.1.3  RDD的创建方法

在Apache Spark中,RDD是一个容错、并行的数据结构,可以让用户在高层次上执行大规模的数据集操作。RDD可以通过多种方式在Spark应用程序中创建,以下介绍一些常见的RDD创建方法。

1. 从集合中创建RDD

在Spark的Scala API中,你可以从一个已经存在的Scala集合(如List、Array等)中直接创建RDD。这是通过调用SparkContext的parallelize方法实现的。

// 定义数组val data = Array(1, 2, 3, 4, 5)// 创建RDDval rdd = sc.parallelize(data)

这里,sc是SparkContext的实例,data是一个包含整数的数组,rdd是由data转换而来的RDD。

2. 从外部文件系统中读取数据创建RDD

Spark提供了多种从外部存储系统(如HDFS、Amazon S3、本地文件系统等)读取数据并创建RDD的方法。这些方法包括textFile(读取文本文件)、sequenceFile(读取Hadoop的SequenceFile文件)、wholeTextFiles(读取整个文件作为键值对,其中键是文件名,值是文件内容)等。

下面这行代码会读取指定HDFS路径下的文本文件,并将文件内容作为RDD中的元素。

// 读取HDFS文件为RDDval rdd = sc.textFile("hdfs://path/to/textfile.txt")

3. 从其他RDD转换而来

RDD支持丰富的转换(Transformations)操作,这些操作会返回一个新的RDD。这些转换操作包括map、filter、flatMap、groupByKey等。因此,你可以通过在一个已存在的RDD上应用这些转换操作来创建新的RDD。

// 创建原始RDDval originalRDD = sc.parallelize(Array(1, 2, 3, 4, 5))// 映射转换RDDval transformedRDD = originalRDD.map(x => x * 2)

4. 使用SparkContext的makeRDD方法

SparkContext的makeRDD方法允许你直接从Scala的并行集合(如ParSeq)或迭代器(Iterator)中创建RDD。这种方法不常用,但是对于从非标准数据源创建RDD特别有用。

// 本地数据集合data = [1, 2, 3, 4, 5]// 使用makeRDD方法从本地数据集合创建一个 RDDrdd = sc.makeRDD(data)

5. 从数据库读取数据

虽然Spark SQL和DataFrame API通常是处理数据库数据的首选方法,但你也可以通过JDBC等接口从数据库中读取数据并转换为RDD。这通常涉及到使用JdbcRDD(在Spark 1.x版本中可用,但在Spark 2.x及更高版本中可能需要自定义实现或使用DataFrame API)。

下面示例演示Spark通过JDBC调用MySQL。

代码4-1  SparkJDBCReadMySQL.scala

import org.apache.spark.sql.SparkSessionobject SparkJDBCReadMySQL {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder().appName("Spark JDBC Read MySQL").config("spark.master", "local[*]") // 本地模式,使用所有可用核心.getOrCreate()// MySQL JDBC URL、用户名和密码,读者需要分局自己的数据库信息修改这些配置val jdbcUrl = "jdbc:mysql://<hostname>:<port>/<database>"val jdbcUsername = "<username>"val jdbcPassword = "<password>"// JDBC查询,可以根据MySQL 自带的示例数据库。修改下面SQL语句val jdbcQuery = "(SELECT * FROM your_table) as table_alias" // 通常需要一个别名// 读取MySQL数据为DataFrameval df = spark.read.format("jdbc").option("url", jdbcUrl).option("dbtable", jdbcQuery) // 注意:这里使用jdbcQuery而不是直接表名.option("user", jdbcUsername).option("password", jdbcPassword).load()// 显示DataFrame内容df.show()// 停止SparkSessionspark.stop()}}

请读者自行安装MySQL,并运行代码测试一下。RDD的创建通常与SparkContext实例紧密相关,因为大多数创建RDD的方法都是通过SparkContext的实例调用的。

在Spark 2.x及更高版本中,DataFrame和Dataset API提供了更加高级和灵活的数据处理功能,因此在可能的情况下,推荐使用这些API。然而,对于某些特定场景或遗留代码,RDD仍然是一个有用的选项。

http://www.dtcms.com/a/572799.html

相关文章:

  • 删除小目标 cocojson
  • 汽车EDI:基于知行之桥的 Gnotec EDI解决方案
  • 垂直行业门户网站建设方案自己做的网站被黑了怎么办
  • 地图可视化实践录:leaflet学习之综合示例工程设计
  • 《Python工匠》第二章 数值与字符串 关键知识摘录与梳理
  • QuickSSO 与 ECreator 实操应用案例手册
  • Flutter Android Kotlin 插件编译错误完整解决方案
  • 网站设计问题网站开发包含的项目和分工
  • FPGA—ZYNQ学习UART环回(五)
  • 电动汽车充电云与移动应用基础设施的漏洞识别、攻击实验及缓解策略
  • PFMEA中的预防措施和探测措施区别
  • 做钢材的网站php 7 wordpress
  • 告别繁琐手工录入:智能银行票据套打软件,让制单效率飞跃
  • 【Java】理解 Java 中的 this 关键字
  • 在编译OpenHarmony遇到third_party/libnl编译报错的修复办法
  • 建c2c网站俄罗斯乌克兰为什么打仗
  • vue在获取某一个div的大小,怎么确保div渲染好,内容撑开后才去获取大小
  • ITIL 4 复习练习及解题思路
  • 数据结构:单链表(2)
  • MYSQL数据库--索引、视图练习
  • 2.2.5 运算方法和运算电路【2011统考真题】
  • 前端上传图片加裁剪功能
  • DevEco Studio 鸿蒙 引入lib中的文件
  • 简述数据库设计--范式、ER图
  • 【Linux】权限(1):初识权限与使用理解
  • 深圳专业做网站的公司河南企业建站系统信息
  • 企业门户网站设计报告wordpress接口
  • 基于大数据的天气分析与应用系统
  • spark读取table中的数据【hive】
  • 最后一轮征稿开启 | ACM出版 | 第二届大数据分析与人工智能应用学术会议(BDAIA2025)