Spark-3.5.7文档5 - Spark Streaming 编程指南
注意
Spark Streaming 是 Spark 流处理引擎的上一代产品。Spark Streaming 不再有更新,它是一个遗留项目。Spark 中有一个更新且更易于使用的流处理引擎,称为 Structured Streaming。您应该为您的流处理应用程序和管道使用 Spark Structured Streaming。请参阅《Structured Streaming 编程指南》。
概述
Spark Streaming 是核心 Spark API 的扩展,能够对实时数据流进行可扩展、高吞吐量、容错的流处理。数据可以从许多来源(如 Kafka、Kinesis 或 TCP 套接字)摄取,并可以使用通过高级函数(如 map、reduce、join 和 window)表达的复杂算法进行处理。最后,处理后的数据可以被推送到文件系统、数据库和实时仪表板。实际上,您可以在数据流上应用 Spark 的机器学习和图处理算法。

在内部,它的工作方式如下。Spark Streaming 接收实时输入数据流并将数据分成批次,然后由 Spark 引擎处理这些批次,以分批生成最终的结果流。

Spark Streaming 提供了一个称为离散化流或 DStream 的高级抽象,它表示连续的数据流。DStream 可以从源(如 Kafka 和 Kinesis)的输入数据流创建,也可以通过在其他 DStream 上应用高级操作来创建。在内部,一个 DStream 表示为一个 RDD 序列。
本指南向您展示如何开始使用 DStream 编写 Spark Streaming 程序。您可以使用 Scala、Java 或 Python(在 Spark 1.2 中引入)编写 Spark Streaming 程序,本指南将介绍所有这些语言。您将在本指南中找到选项卡,让您在不同语言的代码片段之间进行选择。
注意: 有一些 API 在 Python 中要么不同,要么不可用。在本指南中,您将找到 Python API 标签,突出显示这些差异。
一个简单示例
在我们详细介绍如何编写自己的 Spark Streaming 程序之前,让我们先快速看一下一个简单的 Spark Streaming 程序是什么样子的。假设我们想要计算从监听 TCP 套接字的数据服务器接收的文本数据中的单词数。您需要做的如下所示。
Python
首先,我们导入 StreamingContext,它是所有流处理功能的主要入口点。我们创建一个具有两个执行线程、批处理间隔为 1 秒的本地 StreamingContext。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建一个具有两个工作线程、批处理间隔为 1 秒的本地 StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
使用此上下文,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。
# 创建一个将连接到 hostname:port 的 DStream,如 localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
这个 lines DStream 表示将从数据服务器接收的数据流。此 DStream 中的每条记录是一行文本。接下来,我们想按空格将行分割成单词。
# 将每一行分割成单词
words = lines.flatMap(lambda line: line.split(" "))
flatMap 是一个一对多的 DStream 操作,它通过从源 DStream 中的每条记录生成多个新记录来创建一个新的 DStream。在这种情况下,每一行将被分割成多个单词,单词流表示为 words DStream。接下来,我们想统计这些单词。
# 统计每个批次中每个单词的数量
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)# 将此 DStream 中生成的每个 RDD 的前十个元素打印到控制台
wordCounts.pprint()
words DStream 被进一步映射(一对一转换)为一个 (word, 1) 对的 DStream,然后进行归约以获取每批数据中单词的频率。最后,wordCounts.pprint() 将每秒打印生成的一些计数。
请注意,当这些行被执行时,Spark Streaming 只设置了它启动后将执行的计算,并没有开始真正的处理。为了在所有转换设置完成后开始处理,我们最后调用
ssc.start() # 开始计算
ssc.awaitTermination() # 等待计算终止
完整代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
如果您已经下载并构建了 Spark,您可以按如下方式运行此示例。您首先需要运行 Netcat(在大多数类 Unix 系统中找到的一个小实用程序)作为数据服务器,使用命令:
$ nc -lk 9999
Scala
首先,我们将 Spark Streaming 类的名称和一些隐式转换从 StreamingContext 导入到我们的环境中,以便为我们需要的其他类(如 DStream)添加有用的方法。StreamingContext 是所有流处理功能的主要入口点。我们创建一个具有两个执行线程、批处理间隔为 1 秒的本地 StreamingContext。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // 自 Spark 1.3 起不再必要// 创建一个具有两个工作线程、批处理间隔为 1 秒的本地 StreamingContext。
// master 需要 2 个核心以防止饥饿场景。val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
使用此上下文,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。
// 创建一个将连接到 hostname:port 的 DStream,如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
这个 lines DStream 表示将从数据服务器接收的数据流。此 DStream 中的每条记录是一行文本。接下来,我们想按空格字符将行分割成单词。
// 将每一行分割成单词
val words = lines.flatMap(_.split(" "))
flatMap 是一个一对多的 DStream 操作,它通过从源 DStream 中的每条记录生成多个新记录来创建一个新的 DStream。在这种情况下,每一行将被分割成多个单词,单词流表示为 words DStream。接下来,我们想统计这些单词。
import org.apache.spark.streaming.StreamingContext._ // 自 Spark 1.3 起不再必要
// 统计每个批次中每个单词的数量
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)// 将此 DStream 中生成的每个 RDD 的前十个元素打印到控制台
wordCounts.print()
words DStream 被进一步映射(一对一转换)为一个 (word, 1) 对的 DStream,然后进行归约以获取每批数据中单词的频率。最后,wordCounts.print() 将每秒打印生成的一些计数。
请注意,当这些行被执行时,Spark Streaming 只设置了它启动后将执行的计算,并没有开始真正的处理。为了在所有转换设置完成后开始处理,我们最后调用
ssc.start() // 开始计算
ssc.awaitTermination() // 等待计算终止
完整代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
如果您已经下载并构建了 Spark,您可以按如下方式运行此示例。您首先需要运行 Netcat(在大多数类 Unix 系统中找到的一个小实用程序)作为数据服务器,使用命令:
$ nc -lk 9999
Java
首先,我们创建一个 JavaStreamingContext 对象,它是所有流处理功能的主要入口点。我们创建一个具有两个执行线程、批处理间隔为 1 秒的本地 StreamingContext。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;// 创建一个具有两个工作线程、批处理间隔为 1 秒的本地 StreamingContext
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
使用此上下文,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。
// 创建一个将连接到 hostname:port 的 DStream,如 localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
这个 lines DStream 表示将从数据服务器接收的数据流。此流中的每条记录是一行文本。然后,我们想按空格将行分割成单词。
// 将每一行分割成单词
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
flatMap 是一个 DStream 操作,它通过从源 DStream 中的每条记录生成多个新记录来创建一个新的 DStream。在这种情况下,每一行将被分割成多个单词,单词流表示为 words DStream。请注意,我们使用 FlatMapFunction 对象定义了转换。随着我们的深入,我们将发现 Java API 中有许多这样的便利类,有助于定义 DStream 转换。
接下来,我们想统计这些单词。
// 统计每个批次中每个单词的数量
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);// 将此 DStream 中生成的每个 RDD 的前十个元素打印到控制台
wordCounts.print();
words DStream 使用 PairFunction 对象被进一步映射(一对一转换)为一个 (word, 1) 对的 DStream。然后,使用 Function2 对象对其进行归约,以获取每批数据中单词的频率。最后,wordCounts.print() 将每秒打印生成的一些计数。
请注意,当这些行被执行时,Spark Streaming 只设置了它启动后将执行的计算,并没有开始真正的处理。为了在所有转换设置完成后开始处理,我们最后调用 start 方法。
jssc.start(); // 开始计算
jssc.awaitTermination(); // 等待计算终止
完整代码可以在 Spark Streaming 示例 JavaNetworkWordCount 中找到。
如果您已经下载并构建了 Spark,您可以按如下方式运行此示例。您首先需要运行 Netcat(在大多数类 Unix 系统中找到的一个小实用程序)作为数据服务器,使用命令:
$ nc -lk 9999
然后,在另一个终端中,您可以通过以下方式启动示例:
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
然后,在运行 netcat 服务器的终端中键入的任何行都将被计数并每秒打印在屏幕上。它看起来将类似于以下内容。
# 终端 1:
# 运行 Netcat$ nc -lk 9999hello world# 终端 2: 运行 network_wordcount.py$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
# 终端 1:
# 运行 Netcat$ nc -lk 9999hello world# 终端 2: 运行 NetworkWordCount$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# 终端 1:
# 运行 Netcat$ nc -lk 9999hello world# 终端 2: 运行 JavaNetworkWordCount$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
基本概念
接下来,我们超越简单示例,详细阐述 Spark Streaming 的基础知识。
链接
与 Spark 类似,Spark Streaming 可通过 Maven Central 获得。要编写您自己的 Spark Streaming 程序,您必须将以下依赖项添加到您的 SBT 或 Maven 项目中。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.5.7</version><scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.7" % "provided"
对于从 Spark Streaming 核心 API 中不存在的源(如 Kafka 和 Kinesis)摄取数据,您必须将相应的构件 spark-streaming-xyz_2.12 添加到依赖项中。例如,一些常见的如下所示。
| 源 | 构件 |
|---|---|
| Kafka | spark-streaming-kafka-0-10_2.12 |
| Kinesis | spark-streaming-kinesis-asl_2.12 [Amazon Software License] |
有关最新列表,请参阅 Maven 存储库以获取支持的源和构件的完整列表。
初始化 StreamingContext
要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,它是所有 Spark Streaming 功能的主要入口点。
可以从 SparkContext 对象创建 StreamingContext 对象。
Python
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
Scala
import org.apache.spark._
import org.apache.spark.streaming._val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
Java
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
appName 参数是您的应用程序在集群 UI 上显示的名称。master 是 Spark、Mesos 或 YARN 集群 URL,或者是特殊的 “local[]" 字符串以在本地模式下运行。实际上,在集群上运行时,您不会希望在程序中硬编码 master,而是使用 spark-submit 启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以传递 "local[]” 以在进程内运行 Spark Streaming(检测本地系统中的核心数)。
批处理间隔必须根据应用程序的延迟要求和可用集群资源进行设置。有关更多详细信息,请参阅性能调优部分。
定义上下文后,您必须执行以下操作:
- 通过创建输入 DStream 来定义输入源。
- 通过对 DStream 应用转换和输出操作来定义流式计算。
- 使用
streamingContext.start()开始接收数据并进行处理。 - 使用
streamingContext.awaitTermination()等待处理停止(手动或由于任何错误)。 - 可以使用
streamingContext.stop()手动停止处理。
需要记住的要点:
- 一旦上下文启动,就不能设置或向其添加新的流式计算。
- 一旦上下文停止,它就不能重新启动。
- 一个 JVM 中同时只能有一个活跃的
StreamingContext。 StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。- 一个
SparkContext可以被重用来创建多个StreamingContext,只要在创建下一个StreamingContext之前停止了前一个StreamingContext(而不停止SparkContext)。
离散化流(DStreams)
离散化流或 DStream 是 Spark Streaming 提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,一个 DStream 由一系列连续的 RDD 表示,RDD 是 Spark 对不可变分布式数据集的抽象(有关更多详细信息,请参阅《Spark 编程指南》)。DStream 中的每个 RDD 包含来自某个时间间隔的数据,如下图所示。

应用于 DStream 的任何操作都会转换为对底层 RDD 的操作。例如,在之前将行流转换为单词的示例中,flatMap 操作应用于 lines DStream 中的每个 RDD,以生成 words DStream 的 RDD。如下图所示。

这些底层的 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了大部分这些细节,并为开发人员提供了更高级别的 API 以方便使用。这些操作将在后面的部分中详细讨论。
输入 DStream 和接收器
输入 DStream 是表示从流源接收的输入数据流的 DStream。在快速示例中,lines 是一个输入 DStream,因为它表示从 netcat 服务器接收的数据流。每个输入 DStream(除了文件流,本节后面讨论)都与一个接收器(Scala 文档, Java 文档)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以便处理。
Spark Streaming 提供两类内置的流源:
- 基本源:直接在
StreamingContextAPI 中可用的源。例如:文件系统和套接字连接。 - 高级源:如 Kafka、Kinesis 等源可通过额外的实用程序类获得。这些需要链接额外的依赖项,如链接部分所述。
我们将在本节后面讨论每个类别中存在的一些源。
请注意,如果您想在流处理应用程序中并行接收多个数据流,您可以创建多个输入 DStream(在性能调优部分进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker/executor 是一个长时间运行的任务,因此它占用分配给 Spark Streaming 应用程序的一个核心。因此,重要的是要记住,需要为 Spark Streaming 应用程序分配足够的核心(如果在本地运行,则为线程)来处理接收到的数据,以及运行接收器。
需要记住的要点:
- 在本地运行 Spark Streaming 程序时,不要使用 “local” 或 “local[1]” 作为 master URL。这两种方式都意味着只有一个线程将用于在本地运行任务。如果您使用的是基于接收器的输入 DStream(例如套接字、Kafka 等),那么该单线程将用于运行接收器,而没有线程用于处理接收到的数据。因此,在本地运行时,始终使用 “local[n]” 作为 master URL,其中 n > 要运行的接收器数量(有关如何设置 master 的信息,请参阅 Spark 属性)。
- 将逻辑扩展到在集群上运行,分配给 Spark Streaming 应用程序的核心数必须大于接收器的数量。否则系统将接收数据,但无法处理它。
基本源
我们已经在快速示例中查看了 ssc.socketTextStream(...),它从通过 TCP 套接字连接接收的文本数据创建了一个 DStream。除了套接字,StreamingContext API 还提供了从文件作为输入源创建 DStream 的方法。
文件流
为了从任何与 HDFS API 兼容的文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 创建 DStream。
文件流不需要运行接收器,因此无需为接收文件数据分配任何核心。
对于简单的文本文件,最简单的方法是 StreamingContext.textFileStream(dataDirectory)。
注意: fileStream 在 Python API 中不可用;只有 textFileStream 可用。
Scala
streamingContext.textFileStream(dataDirectory)streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
对于文本文件
streamingContext.textFileStream(dataDirectory)
Java
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
对于文本文件
streamingContext.textFileStream(dataDirectory);
如何监控目录
Spark Streaming 将监控目录 dataDirectory 并处理在该目录中创建的任何文件。
- 可以监控一个简单的目录,例如 “hdfs://namenode:8040/logs/”。该路径下的所有文件在被发现时将被处理。
- 可以提供 POSIX glob 模式,例如 “hdfs://namenode:8040/logs/2017/*”。这里,DStream 将由匹配该模式的目录中的所有文件组成。也就是说:它是目录的模式,而不是目录中文件的模式。
- 所有文件必须采用相同的数据格式。
- 文件基于其修改时间(而非创建时间)被视为某个时间段的一部分。
- 一旦处理完毕,当前窗口内对文件的更改不会导致文件被重新读取。也就是说:更新被忽略。
- 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
- 如果使用通配符来标识目录,例如 “hdfs://namenode:8040/logs/2016-*”,将整个目录重命名以匹配该路径会将目录添加到受监控目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
- 调用
FileSystem.setTimes()来修复时间戳是一种让文件在后续窗口中被拾取的方法,即使其内容未更改。
使用对象存储作为数据源
"完整"的文件系统(如 HDFS)倾向于在创建输出流时立即设置其文件的修改时间。当文件被打开时,甚至在数据完全写入之前,它可能被包含在 DStream 中——之后同一窗口内对文件的更新将被忽略。也就是说:可能会错过更改,数据会从流中省略。
为了保证在窗口中拾取更改,请将文件写入未监控的目录,然后在输出流关闭后立即将其重命名到目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将拾取新数据。
相反,对象存储(如 Amazon S3 和 Azure Storage)通常具有缓慢的重命名操作,因为数据实际上是被复制的。此外,重命名的对象可能将 rename() 操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。
需要对目标对象存储进行仔细测试,以验证该存储的时间戳行为与 Spark Streaming 预期的行为一致。对于通过所选对象存储的流数据,直接写入目标目录可能是适当的策略。
有关此主题的更多详细信息,请查阅 Hadoop 文件系统规范。
基于自定义接收器的流
可以使用通过自定义接收器接收的数据流创建 DStream。有关更多详细信息,请参阅《自定义接收器指南》。
作为流的 RDD 队列
为了使用测试数据测试 Spark Streaming 应用程序,还可以基于 RDD 队列创建 DStream,使用 streamingContext.queueStream(queueOfRDDs)。推入队列的每个 RDD 将被视为 DStream 中的一批数据,并像流一样被处理。
有关来自套接字和文件的流的更多详细信息,请参阅 StreamingContext(Scala)、JavaStreamingContext(Java)和 StreamingContext(Python)中相关函数的 API 文档。
高级源
Python API 注意: 截至 Spark 3.5.7,在这些源中,Kafka 和 Kinesis 在 Python API 中可用。
此类别的源需要与外部非 Spark 库接口,其中一些具有复杂的依赖关系(例如 Kafka)。因此,为了最小化与依赖项版本冲突相关的问题,从这些源创建 DStream 的功能已移至单独的库,可以在必要时显式链接。
请注意,这些高级源在 Spark shell 中不可用,因此基于这些高级源的应用程序无法在 shell 中测试。如果您确实想在 Spark shell 中使用它们,则必须下载相应的 Maven 构件的 JAR 及其依赖项,并将其添加到类路径中。
其中一些高级源如下:
- Kafka:Spark Streaming 3.5.7 与 Kafka 代理版本 0.10 或更高版本兼容。有关更多详细信息,请参阅《Kafka 集成指南》。
- Kinesis:Spark Streaming 3.5.7 与 Kinesis Client Library 1.2.1 兼容。有关更多详细信息,请参阅《Kinesis 集成指南》。
自定义源
Python API 注意: 这在 Python 中尚不支持。
输入 DStream 也可以从自定义数据源创建。您所要做的就是实现一个用户定义的接收器(参见下一节了解这是什么),该接收器可以从自定义源接收数据并将其推送到 Spark 中。有关详细信息,请参阅《自定义接收器指南》。
接收器可靠性
根据其可靠性,可以有两种数据源。源(如 Kafka)允许对传输的数据进行确认。如果从这些可靠源接收数据的系统正确确认接收到的数据,则可以确保不会因任何类型的故障而丢失数据。这导致两种接收器:
- 可靠接收器 - 可靠接收器在数据已被接收并存储在 Spark 中(带有复制)时,向可靠源正确发送确认。
- 不可靠接收器 - 不可靠接收器不向源发送确认。这可用于不支持确认的源,或者即使对于可靠源,当人们不希望或不需要处理确认的复杂性时。
有关如何编写可靠接收器的详细信息,在《自定义接收器指南》中讨论。
DStream 上的转换
与 RDD 类似,转换允许修改来自输入 DStream 的数据。DStream 支持普通 Spark RDD 上可用的许多转换。一些常见的如下所示。
| 转换 | 含义 |
|---|---|
map(func) | 通过将源 DStream 的每个元素传递给函数 func 来返回一个新的 DStream。 |
flatMap(func) | 类似于 map,但每个输入项可以映射到 0 个或多个输出项。 |
filter(func) | 通过仅选择源 DStream 中 func 返回 true 的记录来返回一个新的 DStream。 |
repartition(numPartitions) | 通过创建更多或更少的分区来更改此 DStream 中的并行级别。 |
union(otherStream) | 返回一个新的 DStream,其中包含源 DStream 和 otherDStream 中元素的并集。 |
count() | 通过计算源 DStream 的每个 RDD 中的元素数,返回一个单元素 RDD 的新 DStream。 |
reduce(func) | 通过使用函数 func(接受两个参数并返回一个)聚合源 DStream 的每个 RDD 中的元素,返回一个单元素 RDD 的新 DStream。该函数应该是关联和交换的,以便可以并行计算。 |
countByValue() | 当在类型为 K 的元素的 DStream 上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是其在源 DStream 的每个 RDD 中的频率。 |
reduceByKey(func, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值使用给定的归约函数进行聚合。注意: 默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,集群模式下由配置属性 spark.default.parallelism 确定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同数量的任务。 |
join(otherStream, [numTasks]) | 当在两个 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中包含每个键的所有元素对。 |
cogroup(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个新的 (K, Seq[V], Seq[W]) 元组的 DStream。 |
transform(func) | 通过对源 DStream 的每个 RDD 应用一个 RDD-to-RDD 函数来返回一个新的 DStream。这可用于在 DStream 上进行任意的 RDD 操作。 |
updateStateByKey(func) | 返回一个新的"状态" DStream,其中每个键的状态通过将给定函数应用于键的先前状态和键的新值来更新。这可用于维护每个键的任意状态数据。 |
其中一些转换值得更详细地讨论。
UpdateStateByKey 操作
updateStateByKey 操作允许您维护任意状态,同时使用新信息不断更新它。要使用此功能,您必须执行两个步骤:
- 定义状态 - 状态可以是任意数据类型。
- 定义状态更新函数 - 指定一个函数,说明如何使用先前的状态和输入流中的新值来更新状态。
在每个批次中,Spark 将为所有现有键应用状态更新函数,无论它们在该批次中是否有新数据。如果更新函数返回 None,则键值对将被消除。
让我们用一个例子来说明。假设您想要维护在文本数据流中看到的每个单词的运行计数。这里,运行计数是状态,它是一个整数。我们将更新函数定义为:
Python
def updateFunction(newValues, runningCount):if runningCount is None:runningCount = 0return sum(newValues, runningCount) # 将新值与先前的运行计数相加得到新计数
Scala
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = ... // 将新值与先前的运行计数相加得到新计数Some(newCount)
}
Java
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =(values, state) -> {Integer newSum = ... // 将新值与先前的运行计数相加得到新计数return Optional.of(newSum);};
这应用于包含单词的 DStream(例如,前面示例中包含 (word, 1) 对的 pairs DStream)。
Python
runningCounts = pairs.updateStateByKey(updateFunction)
更新函数将为每个单词调用,newValues 具有一个 1 的序列(来自 (word, 1) 对),runningCount 具有先前的计数。有关完整的 Python 代码,请查看示例 stateful_network_wordcount.py。
注意: 使用 updateStateByKey 需要配置检查点目录,这将在检查点部分详细讨论。
Transform 操作
transform 操作(及其变体,如 transformWith)允许在 DStream 上应用任意的 RDD-to-RDD 函数。它可用于应用 DStream API 中未公开的任何 RDD 操作。例如,将数据流中的每个批次与另一个数据集连接的功能在 DStream API 中没有直接公开。但是,您可以轻松使用 transform 来实现这一点。这实现了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(可能也是用 Spark 生成的)连接,然后基于其进行过滤来进行实时数据清理。
Python
spamInfoRDD = sc.pickleFile(...) # 包含垃圾邮件信息的 RDD# 将数据流与垃圾邮件信息连接以进行数据清理
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
Scala
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾邮件信息的 RDDval cleanedDStream = wordCounts.transform { rdd =>rdd.join(spamInfoRDD).filter(...) // 将数据流与垃圾邮件信息连接以进行数据清理...
}
Java
import org.apache.spark.streaming.api.java.*;
// 包含垃圾邮件信息的 RDD
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {rdd.join(spamInfoRDD).filter(...); // 将数据流与垃圾邮件信息连接以进行数据清理...
});
请注意,提供的函数在每个批处理间隔都会被调用。这允许您进行随时间变化的 RDD 操作,也就是说,RDD 操作、分区数量、广播变量等可以在批次之间更改。
窗口操作
Spark Streaming 还提供窗口计算,允许您对数据的滑动窗口应用转换。下图说明了这个滑动窗口。

如图所示,每次窗口在源 DStream 上滑动时,落在窗口内的源 RDD 被组合并操作以生成窗口化 DStream 的 RDD。在这个特定情况下,操作应用于最后 3 个时间单位的数据,并以 2 个时间单位滑动。这表明任何窗口操作都需要指定两个参数:
- 窗口长度 - 窗口的持续时间(图中为 3)。
- 滑动间隔 - 执行窗口操作的间隔(图中为 2)。
这两个参数必须是源 DStream 的批处理间隔(图中为 1)的倍数。
让我们用一个例子来说明窗口操作。假设您想通过每 10 秒生成过去 30 秒数据的单词计数来扩展前面的示例。为此,我们必须对 (word, 1) 对的 pairs DStream 在过去 30 秒的数据上应用 reduceByKey 操作。这是使用 reduceByKeyAndWindow 操作完成的。
Python
# 每 10 秒归约过去 30 秒的数据
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
Scala
// 每 10 秒归约过去 30 秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Java
// 每 10 秒归约过去 30 秒的数据
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一些常见的窗口操作如下。所有这些操作都采用上述两个参数 - windowLength 和 slideInterval。
| 转换 | 含义 |
|---|---|
window(windowLength, slideInterval) | 返回一个新的 DStream,该 DStream 基于源 DStream 的窗口化批次计算。 |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口计数。 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素流,通过使用函数 func 在滑动间隔内聚合流中的元素创建。该函数应该是关联和交换的,以便可以正确并行计算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值使用给定的归约函数 func 在滑动窗口中的批次上进行聚合。注意: 默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,集群模式下由配置属性 spark.default.parallelism 确定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同数量的任务。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上述 reduceByKeyAndWindow() 的更高效版本,其中每个窗口的归约值是使用前一个窗口的归约值增量计算的。这是通过归约进入滑动窗口的新数据,并"反向归约"离开窗口的旧数据来完成的。一个例子是当窗口滑动时"加"和"减"键的计数。然而,它仅适用于"可逆归约函数",即那些具有相应"逆归约"函数(作为参数 invFunc)的归约函数。与 reduceByKeyAndWindow 一样,归约任务的数量可通过可选参数配置。注意: 使用此操作必须启用检查点。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在滑动窗口内的频率。与 reduceByKeyAndWindow 一样,归约任务的数量可通过可选参数配置。 |
连接操作
最后,值得强调的是在 Spark Streaming 中执行不同类型的连接是多么容易。
流-流连接
流可以非常容易地与其他流连接。
Python
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
Scala
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
Java
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
这里,在每个批处理间隔中,由 stream1 生成的 RDD 将与由 stream2 生成的 RDD 连接。您还可以执行 leftOuterJoin、rightOuterJoin、fullOuterJoin。此外,在流的窗口上进行连接通常非常有用。这也很容易。
Python
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Scala
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Java
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
流-数据集连接
这在前面解释 DStream.transform 操作时已经展示过。这是另一个将窗口化流与数据集连接的例子。
Python
dataset = ... # 某个 RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
Scala
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
Java
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
实际上,您还可以动态更改要连接的数据集。提供给 transform 的函数在每个批处理间隔都会被评估,因此将使用 dataset 引用指向的当前数据集。
DStream 转换的完整列表可在 API 文档中找到。对于 Scala API,请参阅 DStream 和 PairDStreamFunctions。对于 Java API,请参阅 JavaDStream 和 JavaPairDStream。对于 Python API,请参阅 DStream。
DStream 上的输出操作
输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统消费转换后的数据,因此它们会触发所有 DStream 转换的实际执行(类似于 RDD 的 action)。目前定义了以下输出操作:
| 输出操作 | 含义 |
|---|---|
print() | 在运行流应用程序的驱动节点上打印 DStream 中每批数据的前十个元素。这对于开发和调试很有用。 Python API 注意: 在 Python API 中,这称为 pprint()。 |
saveAsTextFiles(prefix, [suffix]) | 将此 DStream 的内容保存为文本文件。每个批处理间隔的文件名基于 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。 |
saveAsObjectFiles(prefix, [suffix]) | 将此 DStream 的内容保存为序列化 Java 对象的 SequenceFiles。每个批处理间隔的文件名基于 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。Python API 注意: 这在 Python API 中不可用。 |
saveAsHadoopFiles(prefix, [suffix]) | 将此 DStream 的内容保存为 Hadoop 文件。每个批处理间隔的文件名基于 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。Python API 注意: 这在 Python API 中不可用。 |
foreachRDD(func) | 最通用的输出操作符,将一个函数 func 应用于从流生成的每个 RDD。此函数应将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或通过网络将其写入数据库。请注意,函数 func 在运行流应用程序的驱动进程中执行,并且通常内部包含 RDD action,这些 action 将强制对流式 RDD 进行计算。 |
使用 foreachRDD 的设计模式
dstream.foreachRDD 是一个强大的原语,允许将数据发送到外部系统。然而,了解如何正确且高效地使用此原语非常重要。以下是一些需要避免的常见错误。
通常,将数据写入外部系统需要创建一个连接对象(例如,到远程服务器的 TCP 连接)并使用它向远程系统发送数据。为此,开发人员可能会无意中尝试在 Spark 驱动程序中创建连接对象,然后尝试在 Spark worker 中使用它来保存 RDD 中的记录。例如(在 Scala 中),
Python
def sendRecord(rdd):connection = createNewConnection() # 在驱动节点上执行rdd.foreach(lambda record: connection.send(record))connection.close()dstream.foreachRDD(sendRecord)
Scala
dstream.foreachRDD { rdd =>val connection = createNewConnection() // 在驱动节点上执行rdd.foreach { record =>connection.send(record) // 在 worker 节点上执行}
}
Java
dstream.foreachRDD(rdd -> {Connection connection = createNewConnection(); // 在驱动节点上执行rdd.foreach(record -> {connection.send(record); // 在 worker 节点上执行});
});
这是不正确的,因为这需要连接对象被序列化并从驱动程序发送到 worker。这样的连接对象很少能在机器间传输。此错误可能表现为序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在 worker 节点上初始化)等。正确的解决方案是在 worker 节点上创建连接对象。
然而,这可能导致另一个常见错误——为每条记录创建一个新连接。例如,
Python
def sendRecord(record):connection = createNewConnection()connection.send(record)connection.close()dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
Scala
dstream.foreachRDD { rdd =>rdd.foreach { record =>val connection = createNewConnection()connection.send(record)connection.close()}
}
Java
dstream.foreachRDD(rdd -> {rdd.foreach(record -> {Connection connection = createNewConnection();connection.send(record);connection.close();});
});
通常,创建连接对象具有时间和资源开销。因此,为每条记录创建和销毁连接对象可能带来不必要的高开销,并显著降低系统的整体吞吐量。更好的解决方案是使用 rdd.foreachPartition——创建一个连接对象,并使用该连接发送 RDD 分区中的所有记录。
Python
def sendPartition(iter):connection = createNewConnection()for record in iter:connection.send(record)connection.close()dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
Scala
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()}
}
Java
dstream.foreachRDD(rdd -> {rdd.foreachPartition(partitionOfRecords -> {Connection connection = createNewConnection();while (partitionOfRecords.hasNext()) {connection.send(partitionOfRecords.next());}connection.close();});
});
这将在多条记录上分摊连接创建的开销。
最后,可以通过在多个 RDD/批次之间重用连接对象来进一步优化。可以维护一个静态连接池,当多个批次的 RDD 被推送到外部系统时可以重用这些连接,从而进一步减少开销。
Python
def sendPartition(iter):# ConnectionPool 是一个静态的、延迟初始化的连接池connection = ConnectionPool.getConnection()for record in iter:connection.send(record)# 返回池中以供将来重用ConnectionPool.returnConnection(connection)dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
Scala
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>// ConnectionPool 是一个静态的、延迟初始化的连接池val connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection) // 返回池中以供将来重用}
}
Java
dstream.foreachRDD(rdd -> {rdd.foreachPartition(partitionOfRecords -> {// ConnectionPool 是一个静态的、延迟初始化的连接池Connection connection = ConnectionPool.getConnection();while (partitionOfRecords.hasNext()) {connection.send(partitionOfRecords.next());}ConnectionPool.returnConnection(connection); // 返回池中以供将来重用});
});
请注意,池中的连接应按需延迟创建,如果一段时间未使用则应超时。这实现了向外部系统最高效的数据发送。
其他需要记住的要点:
- DStream 由输出操作延迟执行,就像 RDD 由 RDD action 延迟执行一样。具体来说,DStream 输出操作内部的 RDD action 会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者有像
dstream.foreachRDD()这样的输出操作但内部没有任何 RDD action,那么什么也不会执行。系统将只是接收数据并丢弃它。 - 默认情况下,输出操作一次执行一个。并且它们按照在应用程序中定义的顺序执行。
DataFrame 和 SQL 操作
您可以轻松地在流数据上使用 DataFrame 和 SQL 操作。您必须使用 StreamingContext 正在使用的 SparkContext 来创建一个 SparkSession。此外,必须这样做以便在驱动程序故障时可以重新启动。这是通过创建一个延迟实例化的 SparkSession 单例实例来完成的。下面的示例展示了这一点。它修改了之前的单词计数示例,使用 DataFrame 和 SQL 生成单词计数。每个 RDD 被转换为 DataFrame,注册为临时表,然后使用 SQL 查询。
Python
# 延迟实例化的全局 SparkSession 实例
def getSparkSessionInstance(sparkConf):if ("sparkSessionSingletonInstance" not in globals()):globals()["sparkSessionSingletonInstance"] = SparkSession \.builder \.config(conf=sparkConf) \.getOrCreate()return globals()["sparkSessionSingletonInstance"]...# 流程序内部的 DataFrame 操作words = ... # 字符串的 DStreamdef process(time, rdd):print("========= %s =========" % str(time))try:# 获取 SparkSession 的单例实例spark = getSparkSessionInstance(rdd.context.getConf())# 将 RDD[String] 转换为 RDD[Row] 再到 DataFramerowRdd = rdd.map(lambda w: Row(word=w))wordsDataFrame = spark.createDataFrame(rowRdd)# 使用 DataFrame 创建临时视图wordsDataFrame.createOrReplaceTempView("words")# 使用 SQL 对表进行单词计数并打印wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")wordCountsDataFrame.show()except:passwords.foreachRDD(process)
Scala
/** 流程序内部的 DataFrame 操作 */val words: DStream[String] = ...words.foreachRDD { rdd =>// 获取 SparkSession 的单例实例val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()import spark.implicits._// 将 RDD[String] 转换为 DataFrameval wordsDataFrame = rdd.toDF("word")// 创建临时视图wordsDataFrame.createOrReplaceTempView("words")// 使用 SQL 对 DataFrame 进行单词计数并打印val wordCountsDataFrame =spark.sql("select word, count(*) as total from words group by word")wordCountsDataFrame.show()
}
Java
/** 用于将 RDD 转换为 DataFrame 的 Java Bean 类 */
public class JavaRow implements java.io.Serializable {private String word;public String getWord() {return word;}public void setWord(String word) {this.word = word;}
}.../** 流程序内部的 DataFrame 操作 */JavaDStream<String> words = ...words.foreachRDD((rdd, time) -> {// 获取 SparkSession 的单例实例SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();// 将 RDD[String] 转换为 RDD[case class] 再到 DataFrameJavaRDD<JavaRow> rowRDD = rdd.map(word -> {JavaRow record = new JavaRow();record.setWord(word);return record;});DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);// 使用 DataFrame 创建临时视图wordsDataFrame.createOrReplaceTempView("words");// 使用 SQL 对表进行单词计数并打印DataFrame wordCountsDataFrame =spark.sql("select word, count(*) as total from words group by word");wordCountsDataFrame.show();
});
查看完整源代码。
您还可以在来自不同线程(即,与运行的 StreamingContext 异步)的流数据上定义的表上运行 SQL 查询。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,StreamingContext(不知道任何异步 SQL 查询)将在查询完成之前删除旧的流数据。例如,如果您想查询最后一个批次,但您的查询可能需要 5 分钟才能运行,那么请调用 streamingContext.remember(Minutes(5))(在 Scala 中,或其他语言中的等效方法)。
有关 DataFrames 的更多信息,请参阅《DataFrames 和 SQL 指南》。
MLlib 操作
您还可以轻松使用 MLlib 提供的机器学习算法。首先,有流式机器学习算法(例如 Streaming Linear Regression、Streaming KMeans 等),它们可以同时从流数据中学习并将模型应用于流数据。除此之外,对于更广泛的机器学习算法类别,您可以离线学习模型(即使用历史数据),然后在线将模型应用于流数据。有关更多详细信息,请参阅《MLlib 指南》。
缓存 / 持久化
与 RDD 类似,DStream 也允许开发人员将流数据持久化在内存中。也就是说,在 DStream 上使用 persist() 方法会自动将该 DStream 的每个 RDD 持久化在内存中。如果 DStream 中的数据将被多次计算(例如,对相同数据的多个操作),这将非常有用。对于基于窗口的操作(如 reduceByWindow 和 reduceByKeyAndWindow)和基于状态的操作(如 updateStateByKey),这一点是隐含的。因此,由基于窗口的操作生成的 DStream 会自动持久化在内存中,无需开发人员调用 persist()。
对于通过网络接收数据的输入流(例如,Kafka、套接字等),默认的持久化级别设置为将数据复制到两个节点以实现容错。
注意: 与 RDD 不同,DStream 的默认持久化级别将数据序列化后保存在内存中。这将在性能调优部分进一步讨论。有关不同持久化级别的更多信息,请参阅《Spark 编程指南》。
检查点
流应用程序必须 24/7 运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)具有弹性。为了实现这一点,Spark Streaming 需要将足够的信息检查点到容错存储系统,以便能够从故障中恢复。有两种类型的数据被检查点。
- 元数据检查点 - 将定义流计算的信息保存到容错存储(如 HDFS)中。这用于从运行流应用程序驱动程序的节点故障中恢复(稍后详细讨论)。元数据包括:
- 配置 - 用于创建流应用程序的配置。
- DStream 操作 - 定义流应用程序的 DStream 操作集。
- 未完成的批次 - 作业已排队但尚未完成的批次。
- 数据检查点 - 将生成的 RDD 保存到可靠存储。这在一些跨多个批次组合数据的有状态转换中是必需的。在此类转换中,生成的 RDD 依赖于先前批次的 RDD,这导致依赖链的长度随时间不断增加。为了避免恢复时间(与依赖链成正比)的这种无限制增加,有状态转换的中间 RDD 会定期检查点到可靠存储(例如 HDFS)以切断依赖链。
总之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用了有状态转换,则数据或 RDD 检查点即使是基本功能也是必需的。
何时启用检查点
对于具有以下任何要求的应用程序,必须启用检查点:
- 使用有状态转换 - 如果在应用程序中使用了
updateStateByKey或reduceByKeyAndWindow(带逆函数),则必须提供检查点目录以允许定期 RDD 检查点。 - 从运行应用程序的驱动程序故障中恢复 - 元数据检查点用于恢复进度信息。
注意: 没有上述有状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,从驱动程序故障中恢复将是部分的(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,许多 Spark Streaming 应用程序以这种方式运行。预计未来对非 Hadoop 环境的支持将得到改进。
如何配置检查点
可以通过在容错、可靠的文件系统(例如,HDFS、S3 等)中设置一个目录来启用检查点,检查点信息将保存到该目录。这是通过使用 streamingContext.checkpoint(checkpointDirectory) 完成的。这将允许您使用上述有状态转换。此外,如果您希望应用程序从驱动程序故障中恢复,您应该重写您的流应用程序以具有以下行为。
- 当程序首次启动时,它将创建一个新的
StreamingContext,设置所有流,然后调用start()。 - 当程序在故障后重新启动时,它将从检查点目录中的检查点数据重新创建一个
StreamingContext。
通过使用 StreamingContext.getOrCreate 可以简化此行为。使用方式如下。
Python
# 创建和设置新 StreamingContext 的函数
def functionToCreateContext():sc = SparkContext(...) # 新上下文ssc = StreamingContext(...)lines = ssc.socketTextStream(...) # 创建 DStreams...ssc.checkpoint(checkpointDirectory) # 设置检查点目录return ssc# 从检查点数据获取 StreamingContext 或创建一个新的
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)# 在上下文上执行需要完成的额外设置,无论它是正在启动还是重新启动
context. ...# 启动上下文
context.start()
context.awaitTermination()
Scala
// 创建和设置新 StreamingContext 的函数
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...) // 新上下文val lines = ssc.socketTextStream(...) // 创建 DStreams...ssc.checkpoint(checkpointDirectory) // 设置检查点目录ssc
}// 从检查点数据获取 StreamingContext 或创建一个新的
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// 在上下文上执行需要完成的额外设置,无论它是正在启动还是重新启动
context. ...// 启动上下文
context.start()
context.awaitTermination()
Java
// 创建可以创建和设置新 JavaStreamingContext 的工厂对象
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {@Override public JavaStreamingContext create() {JavaStreamingContext jssc = new JavaStreamingContext(...); // 新上下文JavaDStream<String> lines = jssc.socketTextStream(...); // 创建 DStreams...jssc.checkpoint(checkpointDirectory); // 设置检查点目录return jssc;}
};// 从检查点数据获取 JavaStreamingContext 或创建一个新的
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);// 在上下文上执行需要完成的额外设置,无论它是正在启动还是重新启动
context. ...// 启动上下文
context.start();
context.awaitTermination();
如果 checkpointDirectory 存在,则将从检查点数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数 functionToCreateContext 来创建新上下文并设置 DStream。请参阅 Python 示例 recoverable_network_wordcount.py。此示例将网络数据的单词计数附加到文件中。
您还可以使用 StreamingContext.getOrCreate(checkpointDirectory, None) 从检查点数据显式创建 StreamingContext 并开始计算。
除了使用 getOrCreate 之外,还需要确保驱动程序进程在故障时自动重启。这只能由用于运行应用程序的部署基础设施来完成。这将在部署部分进一步讨论。
注意: RDD 的检查点会带来保存到可靠存储的成本。这可能会导致那些 RDD 被检查点的批次处理时间增加。因此,需要仔细设置检查点间隔。在较小的批次大小(比如 1 秒)下,每批都进行检查点可能会显著降低操作吞吐量。相反,检查点过于不频繁会导致血缘关系和任务大小增长,这可能产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是批处理间隔的倍数,至少为 10 秒。可以通过使用 dstream.checkpoint(checkpointInterval) 来设置。通常,将检查点间隔设置为 DStream 的 5-10 个滑动间隔是一个不错的尝试设置。
累加器、广播变量和检查点
累加器和广播变量无法从 Spark Streaming 的检查点中恢复。如果您启用了检查点并且也使用了累加器或广播变量,则必须为累加器和广播变量创建延迟实例化的单例实例,以便在驱动程序因故障重启后可以重新实例化它们。下面的示例展示了这一点。
Python
def getWordExcludeList(sparkContext):if ("wordExcludeList" not in globals()):globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])return globals()["wordExcludeList"]def getDroppedWordsCounter(sparkContext):if ("droppedWordsCounter" not in globals()):globals()["droppedWordsCounter"] = sparkContext.accumulator(0)return globals()["droppedWordsCounter"]def echo(time, rdd):# 获取或注册 excludeList 广播变量excludeList = getWordExcludeList(rdd.context)# 获取或注册 droppedWordsCounter 累加器droppedWordsCounter = getDroppedWordsCounter(rdd.context)# 使用 excludeList 丢弃单词,并使用 droppedWordsCounter 对它们进行计数def filterFunc(wordCount):if wordCount[0] in excludeList.value:droppedWordsCounter.add(wordCount[1])Falseelse:Truecounts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())wordCounts.foreachRDD(echo)
Scala
object WordExcludeList {@volatile private var instance: Broadcast[Seq[String]] = nulldef getInstance(sc: SparkContext): Broadcast[Seq[String]] = {if (instance == null) {synchronized {if (instance == null) {val wordExcludeList = Seq("a", "b", "c")instance = sc.broadcast(wordExcludeList)}}}instance}
}object DroppedWordsCounter {@volatile private var instance: LongAccumulator = nulldef getInstance(sc: SparkContext): LongAccumulator = {if (instance == null) {synchronized {if (instance == null) {instance = sc.longAccumulator("DroppedWordsCounter")}}}instance}
}wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>// 获取或注册 excludeList 广播变量val excludeList = WordExcludeList.getInstance(rdd.sparkContext)// 获取或注册 droppedWordsCounter 累加器val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)// 使用 excludeList 丢弃单词,并使用 droppedWordsCounter 对它们进行计数val counts = rdd.filter { case (word, count) =>if (excludeList.value.contains(word)) {droppedWordsCounter.add(count)false} else {true}}.collect().mkString("[", ", ", "]")val output = "Counts at time " + time + " " + counts
})
Java
class JavaWordExcludeList {private static volatile Broadcast<List<String>> instance = null;public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {if (instance == null) {synchronized (JavaWordExcludeList.class) {if (instance == null) {List<String> wordExcludeList = Arrays.asList("a", "b", "c");instance = jsc.broadcast(wordExcludeList);}}}return instance;}
}class JavaDroppedWordsCounter {private static volatile LongAccumulator instance = null;public static LongAccumulator getInstance(JavaSparkContext jsc) {if (instance == null) {synchronized (JavaDroppedWordsCounter.class) {if (instance == null) {instance = jsc.sc().longAccumulator("DroppedWordsCounter");}}}return instance;}
}wordCounts.foreachRDD((rdd, time) -> {// 获取或注册 excludeList 广播变量Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));// 获取或注册 droppedWordsCounter 累加器LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));// 使用 excludeList 丢弃单词,并使用 droppedWordsCounter 对它们进行计数String counts = rdd.filter(wordCount -> {if (excludeList.value().contains(wordCount._1())) {droppedWordsCounter.add(wordCount._2());return false;} else {return true;}}).collect().toString();String output = "Counts at time " + time + " " + counts;
}
查看完整源代码。
部署应用程序
本节讨论部署 Spark Streaming 应用程序的步骤。
要求
要运行 Spark Streaming 应用程序,您需要具备以下条件:
- 具有集群管理器的集群 - 这是任何 Spark 应用程序的通用要求,并在部署指南中详细讨论。
- 打包应用程序 JAR - 您必须将流应用程序编译成 JAR。如果您使用
spark-submit启动应用程序,则无需在 JAR 中提供 Spark 和 Spark Streaming。但是,如果您的应用程序使用高级源(例如 Kafka),则您必须将它们链接的额外构件及其依赖项打包到用于部署应用程序的 JAR 中。例如,使用KafkaUtils的应用程序必须在应用程序 JAR 中包含spark-streaming-kafka-0-10_2.12及其所有传递依赖项。 - 为执行器配置足够的内存 - 由于接收到的数据必须存储在内存中,因此必须为执行器配置足够的内存来保存接收到的数据。请注意,如果您正在进行 10 分钟的窗口操作,系统必须在内存中至少保留最近 10 分钟的数据。因此,应用程序的内存要求取决于其中使用的操作。
- 配置检查点 - 如果流应用程序需要,则必须将 Hadoop API 兼容的容错存储(例如 HDFS、S3 等)中的目录配置为检查点目录,并且流应用程序的编写方式应使检查点信息可用于故障恢复。有关更多详细信息,请参阅检查点部分。
- 配置应用程序驱动程序的自动重启 - 为了从驱动程序故障中自动恢复,用于运行流应用程序的部署基础设施必须监控驱动程序进程,并在其失败时重新启动它。不同的集群管理器有不同的工具来实现这一点。
- Spark Standalone - 可以将 Spark 应用程序驱动程序提交到 Spark Standalone 集群中运行(请参阅集群部署模式),即应用程序驱动程序本身在其中一个工作节点上运行。此外,可以指示 Standalone 集群管理器监督驱动程序,并在驱动程序因非零退出代码或运行驱动程序的节点故障而失败时重新启动它。有关更多详细信息,请参阅 Spark Standalone 指南中的集群模式和监督。
- YARN - Yarn 支持类似的自动重启应用程序的机制。请参阅 YARN 文档以了解更多详细信息。
- Mesos - Marathon 已用于在 Mesos 上实现此功能。
- 配置预写日志 - 从 Spark 1.2 开始,我们引入了预写日志以实现强容错保证。如果启用,从接收器接收的所有数据都会写入配置的检查点目录中的预写日志。这可以防止驱动程序恢复时的数据丢失,从而确保零数据丢失(在容错语义部分详细讨论)。可以通过将配置参数
spark.streaming.receiver.writeAheadLog.enable设置为true来启用此功能。然而,这些更强的语义可能会以单个接收器的接收吞吐量为代价。可以通过并行运行更多接收器来增加总吞吐量来纠正这一点。此外,建议在启用预写日志时禁用 Spark 内接收数据的复制,因为日志已经存储在复制的存储系统中。可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来完成。当使用 S3(或任何不支持刷新的文件系统)作为预写日志时,请记住启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关更多详细信息,请参阅 Spark Streaming 配置。请注意,当启用 I/O 加密时,Spark 不会加密写入预写日志的数据。如果需要对预写日志数据进行加密,应将其存储在原生支持加密的文件系统中。 - 设置最大接收速率 - 如果集群资源不够大,无法让流应用程序以接收数据的速度处理数据,则可以通过设置记录/秒的最大速率限制来限制接收器的速率。请参阅接收器的配置参数
spark.streaming.receiver.maxRate和 Direct Kafka 方法的spark.streaming.kafka.maxRatePerPartition。在 Spark 1.5 中,我们引入了一个称为反压的功能,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动找出速率限制并在处理条件变化时动态调整它们。可以通过将配置参数spark.streaming.backpressure.enabled设置为true来启用此反压。
升级应用程序代码
如果正在运行的 Spark Streaming 应用程序需要使用新的应用程序代码进行升级,则有两种可能的机制。
- 升级后的 Spark Streaming 应用程序与现有应用程序并行启动和运行。一旦新的应用程序(接收与旧应用程序相同的数据)已经预热并准备好投入生产,就可以关闭旧的应用程序。请注意,这可以用于支持将数据发送到两个目的地(即,早期和升级后的应用程序)的数据源。
- 优雅地关闭现有应用程序(请参阅
StreamingContext.stop(...)或JavaStreamingContext.stop(...)以获取优雅关闭选项),确保在关闭之前已完全处理接收到的数据。然后可以启动升级后的应用程序,它将从早期应用程序停止的同一点开始处理。请注意,这只能用于支持源端缓冲的输入源(如 Kafka),因为在前一个应用程序关闭而升级后的应用程序尚未启动时需要缓冲数据。并且无法从升级前代码的早期检查点信息重新启动。检查点信息本质上包含序列化的 Scala/Java/Python 对象,尝试使用新的、修改过的类反序列化对象可能导致错误。在这种情况下,要么使用不同的检查点目录启动升级后的应用程序,要么删除以前的检查点目录。
监控应用程序
除了 Spark 的监控功能之外,还有特定于 Spark Streaming 的附加功能。当使用 StreamingContext 时,Spark Web UI 会显示一个额外的 Streaming 选项卡,该选项卡显示有关运行中的接收器(接收器是否活动、接收的记录数、接收器错误等)和已完成的批次(批次处理时间、排队延迟等)的统计信息。这可用于监控流应用程序的进度。
Web UI 中的以下两个指标尤其重要:
- 处理时间 - 处理每批数据的时间。
- 调度延迟 - 一个批次在队列中等待前一批次处理完成的时间。
如果批处理时间持续超过批处理间隔,和/或排队延迟持续增加,则表明系统无法像生成批次那样快地处理批次,并且正在落后。在这种情况下,请考虑减少批处理时间。
Spark Streaming 程序的进度也可以使用 StreamingListener 接口进行监控,该接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员 API,预计未来会改进(即,报告更多信息)。
性能调优
在集群上获得 Spark Streaming 应用程序的最佳性能需要进行一些调优。本节解释了许多可以调整的参数和配置,以提高应用程序的性能。在高层面上,您需要考虑两件事:
- 通过高效使用集群资源来减少每批数据的处理时间。
- 设置正确的批次大小,使得数据批次能够以接收的速度被处理(即,数据处理跟上数据摄入)。
减少批处理时间
在 Spark 中可以进行许多优化以最小化每批的处理时间。这些已在调优指南中详细讨论。本节重点介绍其中一些最重要的。
数据接收中的并行级别
通过网络接收数据(如 Kafka、套接字等)需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统的瓶颈,则考虑并行化数据接收。请注意,每个输入 DStream 创建一个单独的接收器(运行在工作机器上),接收单个数据流。因此,可以通过创建多个输入 DStream 并配置它们从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个数据主题的单个 Kafka 输入 DStream 可以拆分为两个 Kafka 输入流,每个仅接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个 DStream 可以合并在一起以创建单个 DStream。然后,可以将应用于单个输入 DStream 的转换应用于统一流。操作如下:
Python
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
Scala
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
Java
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
另一个应考虑的参数是接收器的块间隔,它由配置参数 spark.streaming.blockInterval 决定。对于大多数接收器,接收到的数据在存储在 Spark 内存之前会被合并成数据块。每批中的块数决定了将用于在类似 map 的转换中处理接收到的数据的任务数。每个接收器每批的任务数大约为(批处理间隔 / 块间隔)。例如,200 毫秒的块间隔将为 2 秒的批次创建 10 个任务。如果任务数太少(即,少于每台机器的核心数),那么效率会很低,因为所有可用的核心将不会被用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是,推荐的块间隔最小值约为 50 毫秒,低于此值,任务启动开销可能成为问题。
使用多个输入流/接收器接收数据的另一种方法是显式地对输入数据流进行重新分区(使用 inputStream.repartition(<number of partitions>))。这将在进一步处理之前将接收到的数据批次分布在集群中指定数量的机器上。
对于直接流,请参阅 Spark Streaming + Kafka 集成指南
数据处理中的并行级别
如果计算任何阶段中使用的并行任务数不够高,则集群资源可能未得到充分利用。例如,对于像 reduceByKey 和 reduceByKeyAndWindow 这样的分布式归约操作,默认的并行任务数由 spark.default.parallelism 配置属性控制。您可以将并行级别作为参数传递(请参阅 PairDStreamFunctions 文档),或设置 spark.default.parallelism 配置属性来更改默认值。
数据序列化
可以通过调整序列化格式来减少数据序列化的开销。在流式处理的情况下,有两种类型的数据正在被序列化。
- 输入数据:默认情况下,通过接收器接收的输入数据以
StorageLevel.MEMORY_AND_DISK_SER_2存储在执行器的内存中。也就是说,数据被序列化为字节以减少 GC 开销,并复制以容忍执行器故障。此外,数据首先保存在内存中,只有当内存不足以容纳流计算所需的所有输入数据时才会溢出到磁盘。这种序列化显然有开销——接收器必须反序列化接收到的数据,并使用 Spark 的序列化格式重新序列化它。 - 流操作生成的持久化 RDD:由流计算生成的 RDD 可能会持久化在内存中。例如,窗口操作将数据持久化在内存中,因为它们会被处理多次。然而,与 Spark Core 默认的
StorageLevel.MEMORY_ONLY不同,由流计算生成的持久化 RDD 默认使用StorageLevel.MEMORY_ONLY_SER(即序列化)进行持久化,以最小化 GC 开销。
在这两种情况下,使用 Kryo 序列化可以减少 CPU 和内存开销。有关更多详细信息,请参阅 Spark 调优指南。对于 Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅配置指南中与 Kryo 相关的配置)。
在特定情况下,如果流应用程序需要保留的数据量不大,则可能可以将数据(两种类型)作为反序列化对象持久化,而不会产生过多的 GC 开销。例如,如果您使用几秒钟的批处理间隔并且没有窗口操作,那么您可以尝试通过显式设置相应的存储级别来禁用持久化数据中的序列化。这将减少由于序列化带来的 CPU 开销,可能在不过多 GC 开销的情况下提高性能。
任务启动开销
如果每秒启动的任务数很高(例如,每秒 50 个或更多),那么将任务发送给执行器的开销可能很显著,并且将难以实现亚秒级延迟。可以通过以下更改减少开销:
- 执行模式:在 Standalone 模式或粗粒度 Mesos 模式下运行 Spark 比细粒度 Mesos 模式能带来更好的任务启动时间。请参阅在 Mesos 上运行指南以了解更多详细信息。
这些更改可能将批处理时间减少几百毫秒,从而使亚秒级批次大小变得可行。
设置正确的批处理间隔
对于在集群上运行的 Spark Streaming 应用程序要稳定,系统应该能够以接收数据的速度处理数据。换句话说,数据批次应该以生成的速度被处理。对于应用程序是否满足这一点,可以通过监控流 Web UI 中的处理时间来发现,其中批处理时间应小于批处理间隔。
根据流计算的性质,使用的批处理间隔可能对应用程序在固定集群资源集上可以维持的数据速率产生重大影响。例如,让我们考虑之前的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够跟上每 2 秒报告一次单词计数(即,批处理间隔为 2 秒),但不能跟上每 500 毫秒报告一次。因此,需要设置批处理间隔,使得在生产中预期的数据速率可以维持。
为您的应用程序找出正确批次大小的一个好方法是使用保守的批处理间隔(例如,5-10 秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,您可以检查每个已处理批次经历的端到端延迟值(可以在 Spark 驱动程序 log4j 日志中查找"Total delay",或使用 StreamingListener 接口)。如果延迟保持与批次大小相当,则系统是稳定的。否则,如果延迟持续增加,则意味着系统无法跟上,因此不稳定。一旦您有了稳定配置的概念,可以尝试增加数据速率和/或减少批次大小。请注意,由于临时数据速率增加导致的延迟瞬时增加可能是可以的,只要延迟降低回一个低值(即,小于批次大小)。
内存调优
Spark 应用程序的内存使用和 GC 行为的调优已在调优指南中详细讨论。强烈建议您阅读该指南。在本节中,我们讨论一些特定于 Spark Streaming 应用程序上下文的调优参数。
Spark Streaming 应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对最后 10 分钟的数据使用窗口操作,那么您的集群应该有足够的内存来在内存中保存 10 分钟的数据。或者,如果您想对大量键使用 updateStateByKey,那么所需的内存将会很高。相反,如果您想进行简单的 map-filter-store 操作,那么所需的内存将会很低。
通常,由于通过接收器接收的数据以 StorageLevel.MEMORY_AND_DISK_SER_2 存储,不适合内存的数据将溢出到磁盘。这可能会降低流应用程序的性能,因此建议根据流应用程序的要求提供足够的内存。最好在小规模上尝试并查看内存使用情况,并据此进行估算。
内存调优的另一个方面是垃圾收集。对于需要低延迟的流应用程序,由 JVM 垃圾收集引起的大暂停是不可取的。
有几个参数可以帮助您调优内存使用和 GC 开销:
- DStream 的持久化级别:如数据序列化部分前面提到的,输入数据和 RDD 默认持久化为序列化字节。与反序列化持久化相比,这减少了内存使用和 GC 开销。启用 Kryo 序列化进一步减少了序列化大小和内存使用。可以通过压缩进一步减少内存使用(请参阅 Spark 配置
spark.rdd.compress),但代价是 CPU 时间。 - 清理旧数据:默认情况下,所有输入数据和由 DStream 转换生成的持久化 RDD 都会自动清理。Spark Streaming 根据使用的转换决定何时清理数据。例如,如果您使用 10 分钟的窗口操作,那么 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据。可以通过设置
streamingContext.remember将数据保留更长时间(例如,交互式查询旧数据)。 - CMS 垃圾收集器:强烈建议使用并发标记清除 GC 来保持与 GC 相关的暂停持续较低。尽管已知并发 GC 会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用
spark-submit中的--driver-java-options)和执行器(使用 Spark 配置spark.executor.extraJavaOptions)上都设置 CMS GC。 - 其他提示:为了进一步减少 GC 开销,这里还有一些可以尝试的提示。
- 使用
OFF_HEAP存储级别持久化 RDD。有关更多详细信息,请参阅 Spark 编程指南。 - 使用更多具有较小堆大小的执行器。这将减少每个 JVM 堆内的 GC 压力。
- 使用
需要记住的要点:
- 一个 DStream 与一个接收器相关联。为了达到读取并行性,需要创建多个接收器,即多个 DStream。一个接收器在一个执行器内运行。它占用一个核心。确保在预订接收器插槽后有足够的核心进行处理,即
spark.cores.max应考虑接收器插槽。接收器以轮询方式分配给执行器。 - 当从流源接收数据时,接收器创建数据块。每
blockInterval毫秒生成一个新的数据块。在batchInterval期间创建 N 个数据块,其中 N = batchInterval / blockInterval。这些块由当前执行器的 BlockManager 分发给其他执行器的块管理器。之后,运行在驱动程序上的 Network Input Tracker 会被告知块的位置以进行进一步处理。 - 为在
batchInterval期间创建的块在驱动程序上创建一个 RDD。在batchInterval期间生成的块是 RDD 的分区。每个分区是 spark 中的一个任务。blockInterval == batchinterval将意味着创建单个分区,并且可能它在本地处理。 - 块上的 map 任务在处理拥有块的执行器(一个接收了块,另一个块被复制到的执行器)中处理,无论块间隔如何,除非非本地调度启动。拥有较大的 blockinterval 意味着较大的块。较高的
spark.locality.wait值增加了在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保较大的块在本地处理。 - 不依赖于
batchInterval和blockInterval,您可以通过调用inputDstream.repartition(n)来定义分区数。这将随机重新洗牌 RDD 中的数据以创建 n 个分区。是的,为了更大的并行性。尽管会带来洗牌的代价。RDD 的处理由驱动程序的 jobscheduler 作为一个作业进行调度。在给定的时间点,只有一个作业是活动的。所以,如果一个作业正在执行,其他作业就会排队。 - 如果您有两个 dstream,将形成两个 RDD,并将创建两个作业,这些作业将按顺序调度。为了避免这种情况,您可以合并两个 dstream。这将确保为 dstream 的两个 RDD 形成一个单一的 unionRDD。然后这个 unionRDD 被视为一个作业。然而,RDD 的分区不会受到影响。
- 如果批处理时间超过 batchinterval,那么显然接收器的内存将开始填满,并最终抛出异常(很可能是
BlockNotFoundException)。目前,没有办法暂停接收器。使用 SparkConf 配置spark.streaming.receiver.maxRate,可以限制接收器的速率。
容错语义
在本节中,我们将讨论 Spark Streaming 应用程序在发生故障时的行为。
背景
要理解 Spark Streaming 提供的语义,让我们记住 Spark RDD 的基本容错语义。
- 一个 RDD 是一个不可变的、确定性地可重新计算的分布式数据集。每个 RDD 都记住了用于在容错输入数据集上创建它的确定性操作的血缘关系。
- 如果 RDD 的任何分区由于工作节点故障而丢失,那么可以使用操作的血缘关系从容错的原始数据集中重新计算该分区。
- 假设所有的 RDD 转换都是确定性的,那么无论 Spark 集群中发生什么故障,最终转换后的 RDD 中的数据将始终相同。
Spark 在容错文件系统(如 HDFS 或 S3)上操作数据。因此,所有从容错数据生成的 RDD 也是容错的。然而,对于 Spark Streaming 来说,情况并非如此,因为在大多数情况下数据是通过网络接收的(除了使用 fileStream 时)。为了对所有生成的 RDD 实现相同的容错属性,接收到的数据在集群的工作节点中的多个 Spark 执行器之间进行复制(默认复制因子为 2)。这导致系统中有两种数据在发生故障时需要恢复:
- 已接收并复制的数据 - 这些数据在单个工作节点故障时能够幸存,因为它的副本存在于其他节点上。
- 已接收但缓冲以待复制的数据 - 由于这些数据未被复制,恢复这些数据的唯一方法是从源再次获取。
此外,我们应该关注两种故障:
- 工作节点故障 - 任何运行执行器的工作节点都可能发生故障,这些节点上的所有内存中的数据都将丢失。如果任何接收器在故障节点上运行,那么它们缓冲的数据将丢失。
- 驱动节点故障 - 如果运行 Spark Streaming 应用程序的驱动节点发生故障,那么显然
SparkContext会丢失,所有执行器及其内存中的数据都会丢失。
有了这些基础知识,让我们来了解 Spark Streaming 的容错语义。
定义
流系统的语义通常根据每条记录可以被系统处理多少次来捕获。系统在所有可能的操作条件下(尽管有故障等)可以提供三种类型的保证:
- 至多一次:每条记录将被处理一次或根本不处理。
- 至少一次:每条记录将被处理一次或多次。这比至多一次更强,因为它确保不会丢失数据。但可能有重复。
- 精确一次:每条记录将被精确处理一次——不会丢失数据,也不会多次处理数据。这显然是三种保证中最强的。
基本语义
在任何流处理系统中,广义上说,处理数据有三个步骤。
- 接收数据:使用接收器或其他方式从源接收数据。
- 转换数据:使用 DStream 和 RDD 转换对接收到的数据进行转换。
- 输出数据:将最终转换后的数据推送到外部系统,如文件系统、数据库、仪表板等。
如果一个流应用程序要实现端到端的精确一次保证,那么每个步骤都必须提供精确一次保证。也就是说,每条记录必须被精确接收一次,精确转换一次,并精确推送到下游系统一次。让我们在 Spark Streaming 的上下文中理解这些步骤的语义。
- 接收数据:不同的输入源提供不同的保证。这将在下一小节中详细讨论。
- 转换数据:所有已接收的数据将被精确处理一次,这得益于 RDD 提供的保证。即使出现故障,只要接收到的输入数据是可访问的,最终转换后的 RDD 将始终具有相同的内容。
- 输出数据:输出操作默认确保至少一次语义,因为它取决于输出操作的类型(幂等与否)和下游系统的语义(是否支持事务)。但用户可以实现自己的事务机制来实现精确一次语义。这将在本节后面更详细地讨论。
接收数据的语义
不同的输入源提供不同的保证,范围从至少一次到精确一次。详细信息。
基于文件的源
如果所有输入数据已经存在于像 HDFS 这样的容错文件系统中,Spark Streaming 总是可以从任何故障中恢复并处理所有数据。这提供了精确一次语义,意味着无论发生什么故障,所有数据都将被精确处理一次。
基于接收器的源
对于基于接收器的输入源,容错语义取决于故障场景和接收器的类型。正如我们之前讨论的,有两种类型的接收器:
- 可靠接收器 - 这些接收器仅在确保接收到的数据已被复制后才向可靠源发送确认。如果这样的接收器发生故障,源将不会收到缓冲(未复制)数据的确认。因此,如果接收器重新启动,源将重新发送数据,并且不会因故障而丢失数据。
- 不可靠接收器 - 此类接收器不发送确认,因此当它们因工作节点或驱动程序故障而失败时可能会丢失数据。
根据使用的接收器类型,我们实现以下语义。如果工作节点发生故障,那么使用可靠接收器不会丢失数据。使用不可靠接收器,已接收但未复制的数据可能会丢失。如果驱动节点发生故障,那么除了这些损失之外,所有过去接收并在内存中复制的数据都将丢失。这将影响有状态转换的结果。
为了避免这种过去接收数据的丢失,Spark 1.2 引入了预写日志,它将接收到的数据保存到容错存储中。启用预写日志和可靠接收器后,可以实现零数据丢失。在语义方面,它提供了至少一次保证。
下表总结了故障下的语义:
| 部署场景 | 工作节点故障 | 驱动节点故障 |
|---|---|---|
| Spark 1.1 或更早版本,或 Spark 1.2 或更高版本但未启用预写日志 | 使用不可靠接收器时缓冲数据丢失 使用可靠接收器时零数据丢失 至少一次语义 | 使用不可靠接收器时缓冲数据丢失 所有接收器的过去数据丢失 未定义的语义 |
| Spark 1.2 或更高版本并启用预写日志 | 使用可靠接收器时零数据丢失 至少一次语义 | 使用可靠接收器和文件时零数据丢失 至少一次语义 |
使用 Kafka Direct API
在 Spark 1.3 中,我们引入了新的 Kafka Direct API,它可以确保所有 Kafka 数据被 Spark Streaming 精确接收一次。与此相结合,如果您实现精确一次输出操作,则可以实现端到端的精确一次保证。这种方法在 Kafka 集成指南中进一步讨论。
输出操作的语义
输出操作(如 foreachRDD)具有至少一次语义,也就是说,在工作节点故障的情况下,转换后的数据可能会多次写入外部实体。虽然这对于使用 saveAs***Files 操作保存到文件系统是可以接受的(因为文件只会被相同的数据覆盖),但可能需要额外的努力来实现精确一次语义。有两种方法。
- 幂等更新:多次尝试总是写入相同的数据。例如,
saveAs***Files总是将相同的数据写入生成的文件。 - 事务性更新:所有更新都以事务方式进行,以便更新以原子方式精确执行一次。一种方法如下:
- 使用批次时间(在
foreachRDD中可用)和 RDD 的分区索引创建一个标识符。该标识符唯一标识流应用程序中的一个数据块。 - 使用此标识符以事务方式(即,精确一次,原子地)更新外部系统。也就是说,如果该标识符尚未提交,则原子地提交分区数据和标识符。否则,如果已经提交过,则跳过更新。
- 使用批次时间(在
dstream.foreachRDD { (rdd, time) =>rdd.foreachPartition { partitionIterator =>val partitionId = TaskContext.get.partitionId()val uniqueId = generateUniqueId(time.milliseconds, partitionId)// 使用此 uniqueId 以事务方式提交 partitionIterator 中的数据}
}
