当前位置: 首页 > news >正文

Spark-3.5.7文档4 - Structured Streaming 编程指南

概述

结构化流处理(Structured Streaming) 是一个基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以使用与处理静态数据的批处理计算相同的方式来表达流计算。Spark SQL 引擎会负责以增量、连续的方式运行计算,并在流数据持续到达时更新最终结果。您可以使用 Scala、Java、Python 或 R 中的 Dataset/DataFrame API 来表达流式聚合、事件时间窗口、流批连接等操作。计算会在同一经过优化的 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志(Write-Ahead Logs)确保端到端的精确一次(exactly-once)容错保证。简而言之,结构化流处理提供了快速、可扩展、容错、端到端精确一次的流处理能力,而无需用户关心流处理的底层细节。

在内部,默认情况下,结构化流查询使用微批处理(micro-batch processing)引擎进行处理,该引擎将数据流作为一系列小批量作业来处理,从而实现低至 100 毫秒的端到端延迟以及精确一次的容错保证。然而,自 Spark 2.3 起,我们引入了一种称为持续处理(Continuous Processing) 的新型低延迟处理模式,该模式可以实现低至 1 毫秒的端到端延迟,但提供至少一次(at-least-once)的保证。无需更改查询中的 Dataset/DataFrame 操作,您就可以根据应用程序需求选择处理模式。

在本指南中,我们将引导您了解编程模型和 API。我们将主要使用默认的微批处理模型来解释相关概念,然后讨论持续处理模型。首先,让我们从一个简单的结构化流查询示例开始——一个流式单词计数程序。

快速示例

假设您希望维护一个从监听 TCP 套接字的数据服务器接收到的文本数据的运行单词计数。让我们看看如何使用结构化流处理来表达这一点。您可以在 Scala/Java/Python/R 中查看完整代码。如果您下载了 Spark,可以直接运行该示例。无论如何,让我们逐步了解这个示例,并理解其工作原理。首先,我们需要导入必要的类并创建一个本地 SparkSession,这是所有 Spark 相关功能的起点。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import splitspark = SparkSession \.builder \.appName("StructuredNetworkWordCount") \.getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;import java.util.Arrays;
import java.util.Iterator;SparkSession spark = SparkSession.builder().appName("JavaStructuredNetworkWordCount").getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")

接下来,我们创建一个流式 DataFrame,表示从监听 localhost:9999 的服务器接收到的文本数据,并转换该 DataFrame 以计算单词计数。

# 创建 DataFrame,表示连接到 localhost:9999 的输入行流
lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()# 将行拆分为单词
words = lines.select(explode(split(lines.value, " ")).alias("word")
)# 生成运行单词计数
wordCounts = words.groupBy("word").count()
// 创建 DataFrame,表示连接到 localhost:9999 的输入行流
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 将行拆分为单词
val words = lines.as[String].flatMap(_.split(" "))// 生成运行单词计数
val wordCounts = words.groupBy("value").count()
// 创建 DataFrame,表示连接到 localhost:9999 的输入行流
Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();// 将行拆分为单词
Dataset<String> words = lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());// 生成运行单词计数
Dataset<Row> wordCounts = words.groupBy("value").count();
# 创建 DataFrame,表示连接到 localhost:9999 的输入行流
lines <- read.stream("socket", host = "localhost", port = 9999)# 将行拆分为单词
words <- selectExpr(lines, "explode(split(value, ' ')) as word")# 生成运行单词计数
wordCounts <- count(group_by(words, "word"))

这个 lines DataFrame 表示一个包含流式文本数据的无界表。该表包含一个名为 “value” 的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,目前它还没有接收任何数据,因为我们只是在设置转换,尚未启动它。接下来,我们使用了两个内置的 SQL 函数——splitexplode,将每一行拆分为多行,每行一个单词。此外,我们使用 alias 函数将新列命名为 “word”。最后,我们通过对 Dataset 中的唯一值进行分组并计数,定义了 wordCounts DataFrame。请注意,这是一个流式 DataFrame,表示流的运行单词计数。

现在我们已经设置好了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为每次更新时将完整的计数集(由 outputMode("complete") 指定)打印到控制台。然后使用 start() 启动流式计算。

# 启动查询,将运行计数打印到控制台
query = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.start()query.awaitTermination()
// 启动查询,将运行计数打印到控制台
val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()
// 启动查询,将运行计数打印到控制台
StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();
# 启动查询,将运行计数打印到控制台
query <- write.stream(wordCounts, "console", outputMode = "complete")awaitTermination(query)

此代码执行后,流式计算将在后台启动。query 对象是该活动流查询的句柄,我们决定使用 awaitTermination() 等待查询终止,以防止在查询活动时进程退出。

要实际执行此示例代码,您可以在自己的 Spark 应用程序中编译代码,或者在下载 Spark 后直接运行该示例。我们展示后者。您首先需要运行 Netcat(大多数类 Unix 系统中找到的一个小工具)作为数据服务器,使用以下命令:

$ nc -lk 9999

然后,在另一个终端中,您可以使用以下命令启动示例:

# Python
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999# Scala
./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999# Java
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999# R
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

接着,在运行 netcat 服务器的终端中键入的任何行都将被计数,并每秒在屏幕上打印一次。输出将类似于以下内容。

终端 1:运行 Netcat

$ nc -lk 9999
apache spark
apache hadoop
...

终端 2:运行示例

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

编程模型

结构化流处理的关键思想是将实时数据流视为一个不断追加的表。这导致了一种新的流处理模型,与批处理模型非常相似。您可以将流计算表达为对静态表的标准批处理式查询,而 Spark 将其作为对无界输入表的增量查询运行。让我们更详细地理解这个模型。

基本概念

将输入数据流视为"输入表"。流上到达的每个数据项都像是追加到输入表的新行。

流即表

对输入的查询将生成"结果表"。每个触发间隔(例如,每 1 秒),新行会追加到输入表,最终更新结果表。每当结果表更新时,我们希望将更改的结果行写入外部接收器。

模型

"输出"定义为写入外部存储的内容。输出可以以不同的模式定义:

  • 完全模式(Complete Mode) - 整个更新的结果表将写入外部存储。由存储连接器决定如何处理整个表的写入。
  • 追加模式(Append Mode) - 仅将自上次触发以来追加到结果表中的新行写入外部存储。这仅适用于结果表中现有行不期望更改的查询。
  • 更新模式(Update Mode) - 仅将自上次触发以来在结果表中更新的行写入外部存储(自 Spark 2.1.1 起可用)。请注意,这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。如果查询不包含聚合,则等效于追加模式。

请注意,每种模式适用于特定类型的查询。这将在后面详细讨论。

为了说明此模型的使用,让我们在上述快速示例的上下文中理解该模型。第一个 lines DataFrame 是输入表,最终的 wordCounts DataFrame 是结果表。请注意,对流式 lines DataFrame 生成 wordCounts 的查询与静态 DataFrame 的查询完全相同。但是,当此查询启动时,Spark 将连续检查套接字连接的新数据。如果有新数据,Spark 将运行"增量"查询,将之前的运行计数与新数据结合以计算更新后的计数,如下所示。

模型示例

请注意,结构化流处理不会物化整个表。它从流数据源读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它仅保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

此模型与许多其他流处理引擎显著不同。许多流系统要求用户自己维护运行聚合,因此必须考虑容错和数据一致性(至少一次、至多一次或精确一次)。在此模型中,Spark 负责在有新数据时更新结果表,从而使用户无需考虑这些问题。例如,让我们看看此模型如何处理基于事件时间的处理和迟到数据。

处理事件时间和迟到数据

事件时间是嵌入数据本身的时间。对于许多应用程序,您可能希望基于此事件时间进行操作。例如,如果您想获取 IoT 设备每分钟生成的事件数,则可能希望使用数据生成的时间(即数据中的事件时间),而不是 Spark 接收数据的时间。在此模型中,事件时间非常自然地表达——来自设备的每个事件是表中的一行,事件时间是行中的列值。这允许基于窗口的聚合(例如每分钟事件数)仅仅是对事件时间列进行的一种特殊类型的分组和聚合——每个时间窗口是一个组,每行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以一致地定义在静态数据集(例如从收集的设备事件日志)和数据流上,使用户的生活更加轻松。

此外,此模型自然地处理基于其事件时间晚于预期到达的数据。由于 Spark 正在更新结果表,它可以完全控制在有迟到数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自 Spark 2.1 起,我们支持水印(watermarking),允许用户指定迟到数据的阈值,并允许引擎相应地清理旧状态。这些将在后面的窗口操作部分更详细地解释。

容错语义

提供端到端精确一次语义是结构化流处理设计的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎,以可靠地跟踪处理的确切进度,从而可以通过重启和/或重新处理来处理任何类型的故障。每个流源都假定具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志记录每个触发器中正在处理的数据的偏移量范围。流接收器被设计为幂等的,以处理重新处理。结合使用可重放源和幂等接收器,结构化流可以在任何故障下确保端到端精确一次语义。

使用 Dataset 和 DataFrame 的 API

自 Spark 2.0 起,DataFrame 和 Dataset 可以表示静态、有界数据,以及流式、无界数据。与静态 Dataset/DataFrame 类似,您可以使用公共入口点 SparkSession 从流源创建流式 DataFrame/Dataset,并对它们应用与静态 DataFrame/Dataset 相同的操作。如果您不熟悉 Dataset/DataFrame,强烈建议您使用 DataFrame/Dataset 编程指南熟悉它们。

创建流式 DataFrame 和流式 Dataset

流式 DataFrame 可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口创建。在 R 中,使用 read.stream() 方法。与创建静态 DataFrame 的读取接口类似,您可以指定源的详细信息——数据格式、模式、选项等。

输入源

有几个内置源。

  • 文件源(File source) - 将写入目录的文件作为数据流读取。文件将按文件修改时间的顺序处理。如果设置了 latestFirst,顺序将反转。支持的文件格式有文本、CSV、JSON、ORC、Parquet。有关最新列表和每种文件格式支持的选项,请参阅 DataStreamReader 接口的文档。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作实现。
  • Kafka 源(Kafka source) - 从 Kafka 读取数据。与 Kafka 代理版本 0.10.0 或更高版本兼容。有关更多详细信息,请参阅 Kafka 集成指南。
  • 套接字源(Socket source,用于测试) - 从套接字连接读取 UTF8 文本数据。监听服务器套接字位于驱动程序上。请注意,这仅应用于测试,因为它不提供端到端容错保证。
  • 速率源(Rate source,用于测试) - 以指定的每秒行数生成数据,每个输出行包含时间戳和值。其中时间戳是包含消息分发时间的 Timestamp 类型,值是包含消息计数的 Long 类型,从 0 开始作为第一行。此源用于测试和基准测试。
  • 每微批速率源(Rate Per Micro-Batch source,用于测试) - 以指定的每微批行数生成数据,每个输出行包含时间戳和值。其中时间戳是包含消息分发时间的 Timestamp 类型,值是包含消息计数的 Long 类型,从 0 开始作为第一行。与速率数据源不同,此数据源每微批提供一致的输入行集,无论查询执行如何(触发器配置、查询滞后等),例如,批处理 0 将生成 0~999,批处理 1 将生成 1000~1999,依此类推。同样适用于生成的时间。此源用于测试和基准测试。

某些源不是容错的,因为它们不保证在故障后可以使用检查点偏移量重放数据。请参阅前面关于容错语义的部分。以下是 Spark 中所有源的详细信息。

选项容错说明
文件源path:输入目录的路径,对所有文件格式通用。
maxFilesPerTrigger:每个触发器中要考虑的新文件的最大数量(默认:无最大值)
latestFirst:是否首先处理最新的新文件,当有大量文件积压时有用(默认:false)
fileNameOnly:是否仅基于文件名而不是完整路径检查新文件(默认:false)。当设置为 true 时,以下文件将被视为同一文件,因为它们的文件名 “dataset.txt” 相同:
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge:在此目录中可以找到的文件的最大年龄,超过此年龄将被忽略。对于第一批,所有文件都将被视为有效。如果 latestFirst 设置为 true 并且设置了 maxFilesPerTrigger,则此参数将被忽略,因为有效且应处理的旧文件可能被忽略。最大年龄是相对于最新文件的时间戳指定的,而不是当前系统的时间戳。(默认:1 周)
cleanSource:处理完成后清理文件的选项。可用选项有 “archive”、“delete”、“off”。如果未提供选项,默认值为 “off”。
当提供 “archive” 时,还必须提供附加选项 sourceArchiveDir。“sourceArchiveDir” 的值不得在深度(从根目录开始的目录数)上与源模式匹配,其中深度是两个路径上深度的最小值。这将确保归档的文件永远不会作为新源文件包含在内。
例如,假设您提供 ‘/hello?/spark/’ 作为源模式,‘/hello1/spark/archive/dir’ 不能用作 “sourceArchiveDir” 的值,因为 '/hello?/spark/’ 和 ‘/hello1/spark/archive’ 将匹配。‘/hello1/spark’ 也不能用作 “sourceArchiveDir” 的值,因为 ‘/hello?/spark’ 和 ‘/hello1/spark’ 将匹配。‘/archived/here’ 可以,因为它不匹配。
Spark 将根据文件自身的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt,归档目录的路径是 /archived/here,文件将被移动到 /archived/here/a/b/dataset.txt。
注意:归档(通过移动)或删除完成文件将在每个微批中引入开销(减速,即使在单独线程中发生),因此在启用此选项之前,您需要了解文件系统中每个操作的成本。另一方面,启用此选项将减少列出源文件的成本,这可能是一个昂贵的操作。
完成文件清理器使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 配置(默认:1)。
注意 2:启用此选项时,源路径不应从多个源或查询中使用。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件不匹配。
注意 3:删除和移动操作都是尽力而为的。删除或移动文件失败不会使流查询失败。在某些情况下,Spark 可能不会清理一些源文件——例如,应用程序未正常关闭,太多文件排队等待清理。
有关文件格式特定选项,请参阅 DataStreamReader 中的相关方法。例如,对于 “parquet” 格式选项,请参阅 DataStreamReader.parquet()
此外,有会话配置会影响某些文件格式。有关更多详细信息,请参阅 SQL 编程指南。例如,对于 “parquet”,请参阅 Parquet 配置部分。
支持通配符路径,但不支持多个逗号分隔的路径/通配符。
套接字源host:要连接的主机,必须指定
port:要连接的端口,必须指定
速率源rowsPerSecond(例如 100,默认:1):每秒应生成的行数。
rampUpTime(例如 5s,默认:0s):在生成速度达到 rowsPerSecond 之前的预热时间。使用比秒更细的粒度将被截断为整数秒。
numPartitions(例如 10,默认:Spark 的默认并行度):生成行的分区数。
源将尽力达到 rowsPerSecond,但查询可能受资源限制,可以调整 numPartitions 以达到所需速度。
每微批速率源(格式:rate-micro-batch)rowsPerBatch(例如 100):每个微批应生成的行数。
numPartitions(例如 10,默认:Spark 的默认并行度):生成行的分区数。
startTimestamp(例如 1000,默认:0):生成时间的起始值。
advanceMillisPerBatch(例如 1000,默认:1000):每个微批生成时间前进的毫秒数。
Kafka 源请参阅 Kafka 集成指南。

以下是一些示例。

spark = SparkSession. ...# 从套接字读取文本
socketDF = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()socketDF.isStreaming()    # 对于有流源的 DataFrame 返回 TruesocketDF.printSchema()# 读取以原子方式写入目录的所有 csv 文件
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \.readStream \.option("sep", ";") \.schema(userSchema) \.csv("/path/to/directory")  # 等效于 format("csv").load("/path/to/directory")
val spark: SparkSession = ...// 从套接字读取文本
val socketDF = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()socketDF.isStreaming    // 对于有流源的 DataFrame 返回 TruesocketDF.printSchema// 读取以原子方式写入目录的所有 csv 文件
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ";").schema(userSchema)      // 指定 csv 文件的模式.csv("/path/to/directory")    // 等效于 format("csv").load("/path/to/directory")
SparkSession spark = ...// 从套接字读取文本
Dataset<Row> socketDF = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();socketDF.isStreaming();    // 对于有流源的 DataFrame 返回 TruesocketDF.printSchema();// 读取以原子方式写入目录的所有 csv 文件
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark.readStream().option("sep", ";").schema(userSchema)      // 指定 csv 文件的模式.csv("/path/to/directory");    // 等效于 format("csv").load("/path/to/directory")
sparkR.session(...)# 从套接字读取文本
socketDF <- read.stream("socket", host = hostname, port = port)isStreaming(socketDF)    # 对于有流源的 SparkDataFrame 返回 TRUEprintSchema(socketDF)# 读取以原子方式写入目录的所有 csv 文件
schema <- structType(structField("name", "string"),structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

这些示例生成无类型的流式 DataFrame,意味着 DataFrame 的模式在编译时不检查,仅在提交查询时在运行时检查。某些操作如 mapflatMap 等需要在编译时知道类型。要进行这些操作,您可以使用与静态 DataFrame 相同的方法将这些无类型流式 DataFrame 转换为有类型流式 Dataset。有关更多详细信息,请参阅 SQL 编程指南。此外,文档后面将讨论支持的流源的更多详细信息。

自 Spark 3.1 起,您还可以使用 DataStreamReader.table() 从表创建流式 DataFrame。有关更多详细信息,请参阅流式表 API。

流式 DataFrame/Dataset 的模式推断和分区

默认情况下,基于文件的流式处理需要您指定模式,而不是依赖 Spark 自动推断。此限制确保即使发生故障,流查询也将使用一致的模式。对于临时用例,您可以通过设置 spark.sql.streaming.schemaInferencetrue 重新启用模式推断。

当存在名为 /key=value/ 的子目录时,会发生分区发现,列表将自动递归到这些目录中。如果这些列出现在用户提供的模式中,Spark 将根据正在读取的文件路径填充它们。组成分区方案的目录在查询启动时必须存在且必须保持静态。例如,当存在 /data/year=2015/ 时添加 /data/year=2016/ 是可以的,但更改分区列是无效的(即通过创建目录 /data/date=2016-04-17/)。

流式 DataFrame/Dataset 上的操作

您可以对流式 DataFrame/Dataset 应用各种操作——从无类型的类似 SQL 的操作(如 selectwheregroupBy),到有类型的类似 RDD 的操作(如 mapfilterflatMap)。有关更多详细信息,请参阅 SQL 编程指南。让我们看几个可以使用的示例操作。

基本操作 - 选择、投影、聚合

DataFrame/Dataset 上的大多数常见操作都支持流式处理。本节后面将讨论不支持的一些操作。

df = ...  # 具有模式 { device: string, deviceType: string, signal: double, time: DateType } 的 IOT 设备数据流式 DataFrame# 选择信号大于 10 的设备
df.select("device").where("signal > 10")# 每种设备类型的更新次数的运行计数
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)val df: DataFrame = ... // 具有模式 { device: string, deviceType: string, signal: double, time: string } 的 IOT 设备数据流式 DataFrame
val ds: Dataset[DeviceData] = df.as[DeviceData]    // 具有 IOT 设备数据的流式 Dataset// 选择信号大于 10 的设备
df.select("device").where("signal > 10")      // 使用无类型 API
ds.filter(_.signal > 10).map(_.device)         // 使用有类型 API// 每种设备类型的更新次数的运行计数
df.groupBy("deviceType").count()                          // 使用无类型 API// 每种设备类型的运行平均信号
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // 使用有类型 API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;public class DeviceData {private String device;private String deviceType;private Double signal;private java.sql.Date time;...// 每个字段的 Getter 和 Setter 方法
}Dataset<Row> df = ...;    // 具有模式 { device: string, type: string, signal: double, time: DateType } 的 IOT 设备数据流式 DataFrame
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // 具有 IOT 设备数据的流式 Dataset// 选择信号大于 10 的设备
df.select("device").where("signal > 10"); // 使用无类型 API
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10).map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());// 每种设备类型的更新次数的运行计数
df.groupBy("deviceType").count(); // 使用无类型 API// 每种设备类型的运行平均信号
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING()).agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ...  # 具有模式 { device: string, deviceType: string, signal: double, time: DateType } 的 IOT 设备数据流式 DataFrame# 选择信号大于 10 的设备
select(where(df, "signal > 10"), "device")# 每种设备类型的更新次数的运行计数
count(groupBy(df, "deviceType"))

您还可以将流式 DataFrame/Dataset 注册为临时视图,然后对其应用 SQL 命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # 返回另一个流式 DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // 返回另一个流式 DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // 返回另一个流式 DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

注意,您可以使用 df.isStreaming 标识 DataFrame/Dataset 是否有流数据。

df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)

您可能希望检查查询的查询计划,因为 Spark 可能在解释针对流式数据集的 SQL 语句期间注入有状态操作。一旦有状态操作注入查询计划,您可能需要考虑有状态操作来检查查询(例如输出模式、水印、状态存储大小维护等)。

事件时间上的窗口操作

在滑动事件时间窗口上的聚合使用结构化流处理很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合情况下,为行的

连接操作

结构化流处理支持将流式 Dataset/DataFrame 与静态 Dataset/DataFrame 以及另一个流式 Dataset/DataFrame 进行连接。流式连接的结果是增量生成的,类似于上一节中流式聚合的结果。在本节中,我们将探讨上述情况下支持哪些类型的连接(例如内连接、外连接、半连接等)。请注意,在所有支持的连接类型中,与流式 Dataset/DataFrame 的连接结果将与包含流中相同数据的静态 Dataset/DataFrame 的连接结果完全相同。

流-静态连接

自 Spark 2.0 引入以来,结构化流处理就支持流式 DataFrame/Dataset 与静态 DataFrame/Dataset 之间的连接(内连接和某些类型的外连接)。以下是一个简单示例。

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # 与静态 DF 进行内部等值连接
streamingDf.join(staticDf, "type", "left_outer")  # 与静态 DF 进行左外连接
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...streamingDf.join(staticDf, "type")          // 与静态 DF 进行内部等值连接
streamingDf.join(staticDf, "type", "left_outer")  // 与静态 DF 进行左外连接
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // 与静态 DF 进行内部等值连接
streamingDf.join(staticDf, "type", "left_outer");  // 与静态 DF 进行左外连接
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # 与静态 DF 进行内部等值连接
joined <- join(streamingDf,staticDf,streamingDf$value == staticDf$value,"left_outer")  # 与静态 DF 进行左外连接

请注意,流-静态连接是无状态的,因此不需要状态管理。但是,某些类型的流-静态外连接尚不支持。这些在本节末尾的连接部分列出。

流-流连接

在 Spark 2.3 中,我们增加了对流-流连接的支持,也就是说,您可以连接两个流式 Dataset/DataFrame。在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的两侧都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收的任何行都可能与另一个输入流中未来的、尚未接收的行匹配。因此,对于两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来输入与过去输入匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理迟到、乱序的数据,并可以使用水印限制状态。让我们讨论支持的不同类型的流-流连接以及如何使用它们。

带可选水印的内连接

支持在任何类型的列上以及任何类型的连接条件下进行内连接。但是,随着流的运行,流状态的大小将无限增长,因为所有过去的输入都必须保存,因为任何新输入都可能与过去的任何输入匹配。为了避免无界状态,您必须定义额外的连接条件,使得无限旧的输入无法与未来输入匹配,因此可以从状态中清除。换句话说,您必须在连接中执行以下附加步骤。

  1. 在两个输入上定义水印延迟,以便引擎知道输入可能延迟多长时间(类似于流式聚合)。
  2. 在两个输入之间定义事件时间约束,以便引擎可以确定一个输入的旧行何时不再需要(即不满足时间约束)与另一个输入匹配。可以通过以下两种方式之一定义此约束。
    • 时间范围连接条件(例如 ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
    • 在事件时间窗口上连接(例如 ...JOIN ON leftTimeWindow = rightTimeWindow)。

让我们通过一个例子来理解这一点。

假设我们想要将广告展示流(显示广告时)与另一个广告用户点击流连接起来,以关联展示何时导致可货币化的点击。为了允许在此流-流连接中进行状态清理,您必须指定水印延迟和时间约束,如下所示。

  • 水印延迟:假设展示和相应的点击在事件时间上最多可能延迟/乱序 2 小时和 3 小时。
  • 事件时间范围条件:假设点击可能发生在相应展示后的 0 秒到 1 小时的时间范围内。

代码将如下所示。

from pyspark.sql.functions import exprimpressions = spark.readStream. ...
clicks = spark.readStream. ...# 在事件时间列上应用水印
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")# 使用事件时间约束进行连接
impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour""")
)
import org.apache.spark.sql.functions.exprval impressions = spark.readStream. ...
val clicks = spark.readStream. ...// 在事件时间列上应用水印
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")// 使用事件时间约束进行连接
impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour""")
)
import static org.apache.spark.sql.functions.exprDataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...// 在事件时间列上应用水印
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");// 使用事件时间约束进行连接
impressionsWithWatermark.join(clicksWithWatermark,expr("clickAdId = impressionAdId AND " +"clickTime >= impressionTime AND " +"clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)# 在事件时间列上应用水印
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")# 使用事件时间约束进行连接
joined <- join(impressionsWithWatermark,clicksWithWatermark,expr(paste("clickAdId = impressionAdId AND","clickTime >= impressionTime AND","clickTime <= impressionTime + interval 1 hour"
)))
带水印的流-流内连接的语义保证

这与水印在聚合上提供的保证类似。"2 小时"的水印延迟保证引擎永远不会丢弃任何延迟少于 2 小时的数据。但延迟超过 2 小时的数据可能会也可能不会被处理。

带水印的外连接

虽然水印 + 事件时间约束对于内连接是可选的,但对于外连接,它们必须指定。这是因为为了在外连接中生成 NULL 结果,引擎必须知道一个输入行何时在未来不会与任何内容匹配。因此,必须指定水印 + 事件时间约束以生成正确的结果。因此,带有外连接的查询将非常像前面的广告货币化示例,只是会有一个指定它为外连接的附加参数。

impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour"""),"leftOuter"                 # 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour"""),joinType = "leftOuter"      // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi")
impressionsWithWatermark.join(clicksWithWatermark,expr("clickAdId = impressionAdId AND " +"clickTime >= impressionTime AND " +"clickTime <= impressionTime + interval 1 hour "),"leftOuter"                 // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(impressionsWithWatermark,clicksWithWatermark,expr(paste("clickAdId = impressionAdId AND","clickTime >= impressionTime AND","clickTime <= impressionTime + interval 1 hour"),"left_outer"                 # 可以是 "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))

带水印的流-流外连接的语义保证
外连接在关于水印延迟以及数据是否会被丢弃方面具有与内连接相同的保证。

注意事项
关于外部结果如何生成,有几个重要特性需要注意。

  • 外部 NULL 结果的生成会有延迟,这取决于指定的水印延迟和时间范围条件。这是因为引擎必须等待那么长时间以确保没有匹配,并且未来也不会有更多匹配。
  • 在微批处理引擎的当前实现中,水印在微批处理结束时前进,下一个微批处理使用更新的水印来清理状态和输出外部结果。由于我们仅在有待处理的新数据时才触发微批处理,如果流中没有接收到新数据,外部结果的生成可能会延迟。简而言之,如果被连接的两个输入流中的任何一个在一段时间内没有接收到数据,外部(左或右)输出可能会延迟。

带水印的半连接
半连接返回关系左侧具有与右侧匹配的值。它也被称为左半连接。与外连接类似,半连接必须指定水印 + 事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时在未来不会与右侧的任何内容匹配。

带水印的流-流半连接的语义保证
半连接在关于水印延迟以及数据是否会被丢弃方面具有与内连接相同的保证。

流查询中连接的支持矩阵

左输入右输入连接类型支持情况
静态静态所有类型支持,因为即使它可能出现在流查询中,也不是在流数据上
静态内连接支持,无状态
左外连接支持,无状态
右外连接不支持
全外连接不支持
左半连接支持,无状态
静态内连接支持,无状态
左外连接不支持
右外连接支持,无状态
全外连接不支持
左半连接不支持
内连接支持,可选择在两侧指定水印 + 时间约束以进行状态清理
左外连接有条件支持,必须指定右侧水印 + 时间约束以获得正确结果,可选择指定左侧水印以进行所有状态清理
右外连接有条件支持,必须指定左侧水印 + 时间约束以获得正确结果,可选择指定右侧水印以进行所有状态清理
全外连接有条件支持,必须指定一侧水印 + 时间约束以获得正确结果,可选择指定另一侧水印以进行所有状态清理
左半连接有条件支持,必须指定右侧水印 + 时间约束以获得正确结果,可选择指定左侧水印以进行所有状态清理

关于支持连接的附加细节:

  • 连接可以级联,也就是说,您可以执行 df1.join(df2, ...).join(df3, ...).join(df4, ....)
  • 从 Spark 2.4 开始,您只能在查询处于追加输出模式时使用连接。尚不支持其他输出模式。
  • 不能在连接之前和之后使用 mapGroupsWithStateflatMapGroupsWithState
  • 在追加输出模式下,您可以在连接之前/之后构建具有非类似映射操作的查询,例如聚合、去重、流-流连接。

例如,以下是在两个流中进行时间窗口聚合,然后进行事件时间窗口的流-流连接的示例:

clicksWindow = clicksWithWatermark.groupBy(clicksWithWatermark.clickAdId,window(clicksWithWatermark.clickTime, "1 hour")
).count()impressionsWindow = impressionsWithWatermark.groupBy(impressionsWithWatermark.impressionAdId,window(impressionsWithWatermark.impressionTime, "1 hour")
).count()clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark.groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour")).count();Dataset<Row> impressionsWindow = impressionsWithWatermark.groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour")).count();clicksWindow.join(impressionsWindow, "window", "inner");
val clicksWindow = clicksWithWatermark.groupBy(window("clickTime", "1 hour")).count()val impressionsWindow = impressionsWithWatermark.groupBy(window("impressionTime", "1 hour")).count()clicksWindow.join(impressionsWindow, "window", "inner")

以下是另一个带有时间范围连接条件的流-流连接,然后进行时间窗口聚合的示例:

joined = impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour"""),"leftOuter"                 # 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)joined.groupBy(joined.clickAdId,window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(clicksWithWatermark,expr("""clickAdId = impressionAdId ANDclickTime >= impressionTime ANDclickTime <= impressionTime + interval 1 hour"""),joinType = "leftOuter"      // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)joined.groupBy($"clickAdId", window($"clickTime", "1 hour")).count()
Dataset<Row> joined = impressionsWithWatermark.join(clicksWithWatermark,expr("clickAdId = impressionAdId AND " +"clickTime >= impressionTime AND " +"clickTime <= impressionTime + interval 1 hour "),"leftOuter"                 // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);joined.groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour")).count();

流式去重
您可以使用事件中的唯一标识符对数据流中的记录进行去重。这与在静态数据上使用唯一标识符列进行去重完全相同。查询将存储来自先前记录的必要数据量,以便过滤重复记录。与聚合类似,您可以使用带水印或不带水印的去重。

  • 带水印 - 如果重复记录可能到达的延迟时间有上限,那么您可以在事件时间列上定义水印,并使用 guid 和事件时间列进行去重。查询将使用水印来移除过去记录的旧状态数据,这些记录预计不再有任何重复。这限制了查询必须维护的状态量。
  • 不带水印 - 由于重复记录可能到达的时间没有限制,查询将存储所有过去记录的数据作为状态。
streamingDf = spark.readStream. ...# 不使用水印,使用 guid 列
streamingDf.dropDuplicates("guid")# 使用水印,使用 guid 和 eventTime 列
streamingDf \.withWatermark("eventTime", "10 seconds") \.dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ...  // 列: guid, eventTime, ...// 不使用水印,使用 guid 列
streamingDf.dropDuplicates("guid")// 使用水印,使用 guid 和 eventTime 列
streamingDf.withWatermark("eventTime", "10 seconds").dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // 列: guid, eventTime, ...// 不使用水印,使用 guid 列
streamingDf.dropDuplicates("guid");// 使用水印,使用 guid 和 eventTime 列
streamingDf.withWatermark("eventTime", "10 seconds").dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)# 不使用水印,使用 guid 列
streamingDf <- dropDuplicates(streamingDf, "guid")# 使用水印,使用 guid 和 eventTime 列
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

特别针对流式处理,您可以使用事件中的唯一标识符,在水印的时间范围内对数据流中的记录进行去重。例如,如果您将水印的延迟阈值设置为"1 小时",那么发生在 1 小时内的重复事件可以被正确去重。(更多细节,请参考 dropDuplicatesWithinWatermark 的 API 文档。)

这可以用于处理事件时间列不能作为唯一标识符一部分的用例,主要是由于相同记录的事件时间可能不同的情况。(例如,非幂等写入器,在写入时发生事件时间)

鼓励用户将水印的延迟阈值设置得比重复事件之间的最大时间戳差异更长。

此功能需要在流式 DataFrame/Dataset 中设置带有延迟阈值的水印。

streamingDf = spark.readStream. ...# 基于 eventTime 列的水印,使用 guid 列进行去重
streamingDf \.withWatermark("eventTime", "10 hours") \.dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ...  // 列: guid, eventTime, ...// 基于 eventTime 列的水印,使用 guid 列进行去重
streamingDf.withWatermark("eventTime", "10 hours").dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...;  // 列: guid, eventTime, ...// 基于 eventTime 列的水印,使用 guid 列进行去重
streamingDf.withWatermark("eventTime", "10 hours").dropDuplicatesWithinWatermark("guid");

处理多个水印的策略
一个流查询可以有多个输入流,这些输入流被联合或连接在一起。每个输入流对于有状态操作可能需要容忍不同的迟到数据阈值。您可以使用每个输入流上的 withWatermark("eventTime", delay) 来指定这些阈值。例如,考虑一个在 inputStream1inputStream2 之间进行流-流连接的查询。

inputStream1.withWatermark("eventTime1", "1 hour").join(inputStream2.withWatermark("eventTime2", "2 hours"),joinCondition)

在执行查询时,结构化流处理会单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择单个全局水印用于有状态操作。默认情况下,选择最小值作为全局水印,因为这确保了如果一个流落后于其他流(例如,一个流由于上游故障而停止接收数据),不会意外地将数据丢弃为太迟。换句话说,全局水印将以最慢流的速度安全地移动,查询输出将相应地延迟。

但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中丢弃数据。自 Spark 2.4 起,您可以通过将 SQL 配置 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认为 min)来设置多水印策略以选择最大值作为全局水印。这使得全局水印以最快流的速度移动。然而,副作用是来自较慢流的数据将被积极丢弃。因此,请谨慎使用此配置。

任意有状态操作
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须从事件数据流中跟踪会话。要进行这种会话化,您必须将任意类型的数据保存为状态,并在每个触发器中利用数据流事件对状态执行任意操作。自 Spark 2.2 起,可以使用操作 mapGroupsWithState 和更强大的操作 flatMapGroupsWithState 来完成。这两种操作都允许您在分组的 Dataset 上应用用户定义的代码来更新用户定义的状态。有关更具体的细节,请查看 API 文档(Scala/Java)和示例(Scala/Java)。

尽管 Spark 无法检查和强制执行,但状态函数的实现应尊重输出模式的语义。例如,在更新模式下,Spark 不期望状态函数会发出比当前水印加上允许的迟到记录延迟更旧的行,而在追加模式下,状态函数可以发出这些行。

不支持的操作
有一些 DataFrame/Dataset 操作不支持流式 DataFrame/Dataset。其中一些如下。

  • 不支持对流式 Dataset 进行限制和获取前 N 行。
  • 不支持对流式 Dataset 进行去重操作。
  • 排序操作仅在聚合之后并且在完全输出模式下才支持在流式 Dataset 上使用。
  • 流式 Dataset 上的少数类型的外连接不受支持。有关更多详细信息,请参阅连接操作部分中的支持矩阵。
  • 在更新和完全模式下,不支持在流式 Dataset 上链接多个有状态操作。
    • 一个已知的解决方法是将您的流查询拆分为多个查询,每个查询只有一个有状态操作,并确保每个查询的端到端精确一次。确保最后一个查询的端到端精确一次是可选的。
  • 此外,有一些 Dataset 方法在流式 Dataset 上不起作用。它们是会立即运行查询并返回结果的操作,这在流式 Dataset 上没有意义。相反,这些功能可以通过显式启动流查询来完成(请参阅下一节)。
    • count() - 无法从流式 Dataset 返回单个计数。相反,使用 ds.groupBy().count(),它返回一个包含运行计数的流式 Dataset。
    • foreach() - 改用 ds.writeStream.foreach(...)(参见下一节)。
    • show() - 改用控制台接收器(参见下一节)。

如果您尝试任何这些操作,您将看到一个 AnalysisException,如"操作 XYZ 不支持流式 DataFrame/Dataset"。虽然其中一些可能在 Spark 的未来版本中得到支持,但其他一些在流数据上高效实现从根本上说是困难的。例如,不支持对输入流进行排序,因为它需要跟踪流中接收的所有数据。因此,这从根本上难以高效执行。

状态存储
状态存储是一个版本化的键值存储,提供读写操作。在结构化流处理中,我们使用状态存储提供程序来处理跨批次的有状态操作。有两个内置的状态存储提供程序实现。最终用户也可以通过扩展 StateStoreProvider 接口来实现自己的状态存储提供程序。

HDFS 状态存储提供程序
HDFS 后端状态存储提供程序是 [[StateStoreProvider]][[StateStore]] 的默认实现,其中所有数据首先存储在内存映射中,然后由 HDFS 兼容文件系统中的文件支持。对存储的所有更新都必须以事务集的形式完成,每组更新都会增加存储的版本。这些版本可用于在正确的存储版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。

RocksDB 状态存储实现
自 Spark 3.2 起,我们添加了一个新的内置状态存储实现,RocksDB 状态存储提供程序。

如果您的流查询中有有状态操作(例如,流式聚合、流式去重、流-流连接、mapGroupsWithStateflatMapGroupsWithState),并且您希望在状态中维护数百万个键,那么您可能会遇到与大型 JVM 垃圾收集(GC)暂停相关的问题,导致微批处理时间高度波动。这是因为,根据 HDFSBackedStateStore 的实现,状态数据保存在执行器的 JVM 内存中,大量的状态对象给 JVM 带来内存压力,导致高 GC 暂停。

在这种情况下,您可以选择使用基于 RocksDB 的更优化的状态管理解决方案。此解决方案不是将状态保存在 JVM 内存中,而是使用 RocksDB 在本机内存和本地磁盘中高效管理状态。此外,对此状态的任何更改都会由结构化流处理自动保存到您提供的检查点位置,从而提供完全的容错保证(与默认状态管理相同)。

要启用新的内置状态存储实现,请将 spark.sql.streaming.stateStore.providerClass 设置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

以下是有关状态存储提供程序的 RocksDB 实例的配置:

配置名称描述默认值
spark.sql.streaming.stateStore.rocksdb.compactOnCommit我们是否在提交操作时对 RocksDB 实例执行范围压缩False
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled在 RocksDB StateStore 提交期间是否上传变更日志而不是快照False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB对于 RocksDB 的 BlockBasedTable(RocksDB 的默认 SST 文件格式),每个块打包的用户数据的近似大小(KB)。4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB块缓存的大小容量(MB)。8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs在 RocksDB 实例的加载操作中获取锁的等待时间(毫秒)。60000
spark.sql.streaming.stateStore.rocksdb.maxOpenFilesRocksDB 实例可以使用的打开文件数。值为 -1 表示打开的文件始终保持打开状态。如果达到打开文件限制,RocksDB 将从打开文件缓存中逐出条目,关闭那些文件描述符并从缓存中移除条目。-1
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad我们是否在加载时重置 RocksDB 的所有计数器统计信息和直方图统计信息。True
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows我们是否跟踪状态存储中的总行数。请参阅性能方面的考虑中的详细信息。True
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBRocksDB 中 MemTable 的最大大小。值为 -1 表示将使用 RocksDB 内部默认值。-1
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumberRocksDB 中 MemTable 的最大数量,包括活动和不可变的。值为 -1 表示将使用 RocksDB 内部默认值。-1
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage单个节点上 RocksDB 状态存储实例的总内存使用是否受限。false
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB单个节点上 RocksDB 状态存储实例的总内存限制(MB)。500
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio写缓冲区占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上所有 RocksDB 实例分配内存的一部分。0.5
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio高优先级池中块占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上所有 RocksDB 实例分配内存的一部分。0.1

RocksDB 状态存储内存管理
RocksDB 为不同的对象分配内存,例如 memtables、块缓存和过滤器/索引块。如果不加以限制,跨多个实例的 RocksDB 内存使用量可能会无限增长,并可能导致 OOM(内存不足)问题。RocksDB 通过使用写缓冲区管理器功能,提供了一种限制在单个节点上运行的所有 DB 实例内存使用的方法。如果您想在 Spark 结构化流部署中限制 RocksDB 内存使用,可以通过将 spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 配置设置为 true 来启用此功能。您还可以通过将 spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 值设置为静态数字或节点上可用物理内存的一部分来确定 RocksDB 实例的最大允许内存。还可以通过将 spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBspark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber 设置为所需值来配置单个 RocksDB 实例的限制。默认情况下,这些设置使用 RocksDB 内部默认值。

RocksDB 状态存储变更日志检查点
在 Spark 的新版本中,为 RocksDB 状态存储引入了变更日志检查点。RocksDB 状态存储的传统检查点机制是增量快照检查点,其中 RocksDB 实例的清单文件和新生成的 RocksDB SST 文件被上传到持久存储。变更日志检查点不是上传 RocksDB 实例的数据文件,而是上传自上次检查点以来对状态所做的更改以实现持久性。快照在后台定期持久化,以实现可预测的故障恢复和变更日志修剪。变更日志检查点避免了捕获和上传 RocksDB 实例快照的成本,并显著降低了流查询延迟。

变更日志检查点默认禁用。您可以通过将 spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 配置设置为 true 来启用 RocksDB 状态存储变更日志检查点。变更日志检查点设计为与传统检查点机制向后兼容。RocksDB 状态存储提供程序在两个检查点机制之间的双向转换中提供无缝支持。这使您可以利用变更日志检查点的性能优势,而无需丢弃旧的状态检查点。在支持变更日志检查点的 Spark 版本中,您可以通过在 Spark 会话中启用变更日志检查点,将流查询从旧版本的 Spark 迁移到变更日志检查点。反之,您可以在新版本的 Spark 中安全地禁用变更日志检查点,然后任何已经使用变更日志检查点运行的查询将切换回传统检查点。您需要重新启动流查询才能使检查点机制的更改生效,但在此过程中您不会观察到任何性能下降。

性能方面的考虑
您可能希望禁用总行数的跟踪,以在 RocksDB 状态存储上获得更好的性能。
跟踪行数会在写操作上带来额外的查找 - 鼓励您在调整 RocksDB 状态存储时尝试关闭此配置,特别是当状态操作符的指标值很大时 - numRowsUpdated, numRowsRemoved

您可以在重新启动查询时更改配置,这使您可以改变"可观察性 vs 性能"的权衡决策。如果配置被禁用,状态中的行数(numTotalStateRows)将报告为 0。

状态存储和任务局部性
有状态操作将事件的状态存储在执行器的状态存储中。状态存储占用内存和磁盘空间等资源来存储状态。因此,在不同的流批次中保持状态存储提供程序在同一执行器上运行更有效。更改状态存储提供程序的位置需要加载检查点状态的额外开销。从检查点加载状态的开销取决于外部存储和状态的大小,这往往会损害微批运行的延迟。对于某些用例,例如处理非常大的状态数据,从检查点状态加载新的状态存储提供程序可能非常耗时且效率低下。

结构化流查询中的有状态操作依赖于 Spark RDD 的优选位置功能,以在同一执行器上运行状态存储提供程序。如果在下一个批次中相应的状态存储提供程序再次被调度到此执行器,它可以重用先前的状态并节省加载检查点状态的时间。

但是,通常优选位置不是硬性要求,Spark 仍然可能将任务调度到优选执行器以外的执行器。在这种情况下,Spark 将在新的执行器上从检查点状态加载状态存储提供程序。先前批次中运行的状态存储提供程序不会立即卸载。Spark 运行一个维护任务,检查并卸载执行器上不活动的状态存储提供程序。

通过更改与任务调度相关的 Spark 配置,例如 spark.locality.wait,用户可以配置 Spark 等待启动数据本地任务的时间。对于结构化流处理中的有状态操作,它可以用于让状态存储提供程序跨批次在同一执行器上运行。

特别针对内置的 HDFS 状态存储提供程序,用户可以检查状态存储指标,如 loadedMapCacheHitCountloadedMapCacheMissCount。理想情况下,最好最小化缓存未命中计数,这意味着 Spark 不会在加载检查点状态上浪费太多时间。用户可以增加 Spark 局部性等待配置,以避免跨批次在不同的执行器中加载状态存储提供程序。

启动流式查询
一旦您定义了最终的 DataFrame/Dataset 结果,剩下的就是启动流式计算。为此,您必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter(Scala/Java/Python 文档)。您需要在此接口中指定以下一项或多项内容。

  • 输出接收器详情:数据格式、位置等。
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:可选地,为查询指定一个唯一名称用于标识。
  • 触发间隔:可选地,指定触发间隔。如果未指定,系统将在前一个处理完成后立即检查新数据的可用性。如果由于前一个处理未完成而错过了触发时间,则系统将立即触发处理。
  • 检查点位置:对于某些可以保证端到端容错的输出接收器,指定系统将写入所有检查点信息的位置。这应该是一个 HDFS 兼容的容错文件系统中的目录。检查点的语义将在下一节中详细讨论。

输出模式
有几种类型的输出模式。

  • 追加模式(默认) - 这是默认模式,其中只有自上次触发以来添加到结果表中的新行才会输出到接收器。这仅适用于那些添加到结果表中的行永远不会改变的查询。因此,此模式保证每行只输出一次(假设是容错的接收器)。例如,仅包含 selectwheremapflatMapfilterjoin 等的查询将支持追加模式。
  • 完全模式 - 整个结果表将在每次触发后输出到接收器。这适用于聚合查询。
  • 更新模式 -(自 Spark 2.1.1 起可用)只有自上次触发以来在结果表中更新的行才会输出到接收器。更多信息将在未来版本中添加。

不同类型的流式查询支持不同的输出模式。以下是兼容性矩阵。

查询类型支持的输出模式说明
带聚合的查询
    带水印的事件时间聚合Append, Update, CompleteAppend 模式使用水印来丢弃旧的聚合状态。但是,窗口聚合的输出会延迟 withWatermark() 中指定的迟到阈值,因为根据模式语义,行只有在最终确定后(即水印越过之后)才能添加到结果表中一次。有关更多详细信息,请参阅迟到数据部分。
Update 模式使用水印来丢弃旧的聚合状态。
Complete 模式不会丢弃旧的聚合状态,因为根据定义,此模式保留结果表中的所有数据。
    其他聚合Complete, Update由于未定义水印(仅在其他类别中定义),不会丢弃旧的聚合状态。
Append 模式不支持,因为聚合可能会更新,从而违反此模式的语义。
mapGroupsWithState 的查询Update在带有 mapGroupsWithState 的查询中不允许聚合。
flatMapGroupsWithState 的查询
    Append 操作模式AppendflatMapGroupsWithState 之后允许聚合。
    Update 操作模式Update在带有 flatMapGroupsWithState 的查询中不允许聚合。
带连接的查询AppendUpdate 和 Complete 模式尚不支持。有关支持哪些类型的连接的更多详细信息,请参阅连接操作部分中的支持矩阵。
其他查询Append, UpdateComplete 模式不支持,因为在结果表中保留所有未聚合的数据是不可行的。

输出接收器
有几种内置的输出接收器。

  • 文件接收器 - 将输出存储到目录。
    writeStream.format("parquet")        # 可以是 "orc"、"json"、"csv" 等.option("path", "path/to/destination/dir").start()
    
  • Kafka 接收器 - 将输出存储到 Kafka 中的一个或多个主题。
    writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()
    
  • Foreach 接收器 - 对输出中的记录运行任意计算。有关更多详细信息,请参阅后面的章节。
    writeStream.foreach(...).start()
    
  • 控制台接收器(用于调试) - 每次触发时将输出打印到控制台/stdout。支持 Append 和 Complete 输出模式。这应用于低数据量的调试目的,因为每次触发后整个输出都会被收集并存储在驱动程序的内存中。
    writeStream.format("console").start()
    
  • 内存接收器(用于调试) - 输出作为内存表存储在内存中。支持 Append 和 Complete 输出模式。这应用于低数据量的调试目的,因为整个输出都会被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    writeStream.format("memory").queryName("tableName").start()
    

某些接收器不是容错的,因为它们不保证输出的持久性,仅用于调试目的。请参阅前面关于容错语义的部分。以下是 Spark 中所有接收器的详细信息。

接收器支持输出模式选项容错说明
文件接收器Appendpath:输出目录的路径,必须指定。
retention:输出文件的生存时间(TTL)。提交批次早于 TTL 的输出文件最终将从元数据日志中排除。这意味着读取接收器输出目录的读取器查询可能不会处理它们。您可以提供字符串格式的时间值(如 “12h”、“7d” 等)。默认情况下禁用。
有关文件格式特定选项,请参阅 DataFrameWriter 中的相关方法(Scala/Java/Python/R)。例如,对于 “parquet” 格式选项,请参阅 DataFrameWriter.parquet()
是(精确一次)支持写入分区表。按时间分区可能很有用。
Kafka 接收器Append, Update, Complete请参阅 Kafka 集成指南是(至少一次)Kafka 集成指南中有更多详细信息
Foreach 接收器Append, Update, Complete是(至少一次)下一节有更多详细信息
ForeachBatch 接收器Append, Update, Complete取决于实现下一节有更多详细信息
控制台接收器Append, Update, CompletenumRows:每次触发要打印的行数(默认值:20)
truncate:如果输出过长是否截断(默认值:true)
内存接收器Append, Complete否。但在 Complete 模式下,重新启动的查询将重新创建整个表。表名是查询名。

请注意,您必须调用 start() 来实际启动查询的执行。这将返回一个 StreamingQuery 对象,它是持续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解所有这些。

# ========== 无聚合的 DF ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")# 将新数据打印到控制台
noAggDF \.writeStream \.format("console") \.start()# 将新数据写入 Parquet 文件
noAggDF \.writeStream \.format("parquet") \.option("checkpointLocation", "path/to/checkpoint/dir") \.option("path", "path/to/destination/dir") \.start()# ========== 带聚合的 DF ==========
aggDF = df.groupBy("device").count()# 将更新的聚合打印到控制台
aggDF \.writeStream \.outputMode("complete") \.format("console") \.start()# 将所有聚合放在内存表中。查询名称将是表名
aggDF \.writeStream \.queryName("aggregates") \.outputMode("complete") \.format("memory") \.start()spark.sql("select * from aggregates").show()   # 交互式查询内存表
// ========== 无聚合的 DF ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")// 将新数据打印到控制台
noAggDF.writeStream.format("console").start()// 将新数据写入 Parquet 文件
noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()// ========== 带聚合的 DF ==========
val aggDF = df.groupBy("device").count()// 将更新的聚合打印到控制台
aggDF.writeStream.outputMode("complete").format("console").start()// 将所有聚合放在内存表中
aggDF.writeStream.queryName("aggregates")    // 此查询名称将是表名.outputMode("complete").format("memory").start()spark.sql("select * from aggregates").show()   // 交互式查询内存表
// ========== 无聚合的 DF ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");// 将新数据打印到控制台
noAggDF.writeStream().format("console").start();// 将新数据写入 Parquet 文件
noAggDF.writeStream().format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start();// ========== 带聚合的 DF ==========
Dataset<Row> aggDF = df.groupBy("device").count();// 将更新的聚合打印到控制台
aggDF.writeStream().outputMode("complete").format("console").start();// 将所有聚合放在内存表中
aggDF.writeStream().queryName("aggregates")    // 此查询名称将是表名.outputMode("complete").format("memory").start();spark.sql("select * from aggregates").show();   // 交互式查询内存表
# ========== 无聚合的 DF ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")# 将新数据打印到控制台
write.stream(noAggDF, "console")# 将新数据写入 Parquet 文件
write.stream(noAggDF,"parquet",path = "path/to/destination/dir",checkpointLocation = "path/to/checkpoint/dir")# ========== 带聚合的 DF ==========
aggDF <- count(groupBy(df, "device"))# 将更新的聚合打印到控制台
write.stream(aggDF, "console", outputMode = "complete")# 将所有聚合放在内存表中。查询名称将是表名
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")# 交互式查询内存表
head(sql("select * from aggregates"))

使用 Foreach 和 ForeachBatch
foreachforeachBatch 操作允许您在流查询的输出上应用任意操作和写入逻辑。它们的用例略有不同 - foreach 允许对每一行进行自定义写入逻辑,而 foreachBatch 允许对每个微批处理的输出进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。

ForeachBatch
foreachBatch(...) 允许您指定一个函数,该函数在流查询的每个微批处理的输出数据上执行。自 Spark 2.4 起,Scala、Java 和 Python 均支持此功能。它接受两个参数:一个包含微批处理输出数据的 DataFrame 或 Dataset,以及微批处理的唯一 ID。

def foreach_batch_function(df, epoch_id):# 转换并写入 batchDFpassstreamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>// 转换并写入 batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(new VoidFunction2<Dataset<String>, Long>() {public void call(Dataset<String> dataset, Long batchId) {// 转换并写入 batchDF}}
).start();

使用 foreachBatch,您可以执行以下操作:

  • 重用现有的批处理数据源 - 对于许多存储系统,可能还没有可用的流式接收器,但可能已经存在用于批处理查询的数据写入器。使用 foreachBatch,您可以在每个微批处理的输出上使用批处理数据写入器。
  • 写入多个位置 - 如果您希望将流查询的输出写入多个位置,您可以简单地多次写入输出 DataFrame/Dataset。但是,每次写入尝试都可能导致输出数据被重新计算(包括可能重新读取输入数据)。为了避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入多个位置,然后取消缓存。以下是一个概要。
    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>batchDF.persist()batchDF.write.format(...).save(...)  // 位置 1batchDF.write.format(...).save(...)  // 位置 2batchDF.unpersist()
    }
    
  • 应用额外的 DataFrame 操作 - 许多 DataFrame 和 Dataset 操作在流式 DataFrame 中不受支持,因为 Spark 在这些情况下不支持生成增量计划。使用 foreachBatch,您可以在每个微批处理输出上应用其中一些操作。但是,您必须自行推理执行该操作的端到端语义。

注意:

  • 默认情况下,foreachBatch 仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为去重输出的方式,以获得精确一次保证。
  • foreachBatch 不适用于连续处理模式,因为它从根本上依赖于流查询的微批处理执行。如果您在连续模式下写入数据,请改用 foreach

Foreach
如果 foreachBatch 不可行(例如,相应的批处理数据写入器不存在,或处于连续处理模式),则可以使用 foreach 表达您的自定义写入器逻辑。具体来说,您可以通过将数据写入逻辑划分为三个方法来表达:openprocessclose。自 Spark 2.4 起,Scala、Java 和 Python 均支持 foreach

Python
在 Python 中,您可以通过两种方式调用 foreach:在函数中或在对象中。函数提供了一种表达处理逻辑的简单方法,但不允许在故障导致某些输入数据重新处理时对生成的数据进行去重。对于这种情况,您必须在对象中指定处理逻辑。

首先,函数接受一行作为输入。

def process_row(row):# 将行写入存储passquery = streamingDF.writeStream.foreach(process_row).start()

其次,对象具有一个 process 方法和可选的 openclose 方法:

class ForeachWriter:def open(self, partition_id, epoch_id):# 打开连接。此方法在 Python 中是可选的。passdef process(self, row):# 将行写入连接。此方法在 Python 中不是可选的。passdef close(self, error):# 关闭连接。此方法在 Python 中是可选的。passquery = streamingDF.writeStream.foreach(ForeachWriter()).start()

Scala
在 Scala 中,您必须扩展 ForeachWriter 类(文档)。

streamingDatasetOfString.writeStream.foreach(new ForeachWriter[String] {def open(partitionId: Long, version: Long): Boolean = {// 打开连接}def process(record: String): Unit = {// 将字符串写入连接}def close(errorOrNull: Throwable): Unit = {// 关闭连接}}
).start()

Java
在 Java 中,您必须扩展 ForeachWriter 类(文档)。

streamingDatasetOfString.writeStream().foreach(new ForeachWriter<String>() {@Override public boolean open(long partitionId, long version) {// 打开连接}@Override public void process(String record) {// 将字符串写入连接}@Override public void close(Throwable errorOrNull) {// 关闭连接}}
).start();

执行语义 当流查询启动时,Spark 按以下方式调用函数或对象的方法:

  • 此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责以分布式方式处理生成的数据的一个分区。
  • 此对象必须是可序列化的,因为每个任务将获得所提供对象的新序列化-反序列化副本。因此,强烈建议任何用于写入数据的初始化(例如,打开连接或启动事务)在调用 open() 方法之后完成,这表明任务已准备好生成数据。
  • 方法的生命周期如下:
    • 对于每个具有 partition_id 的分区:
      • 对于具有 epoch_id 的每一批/轮次的流数据:
        • 调用方法 open(partitionId, epochId)
        • 如果 open(...) 返回 true,则对于分区和批/轮次中的每一行,调用方法 process(row)
        • 方法 close(error) 被调用,并带有处理行时看到的错误(如果有)。
    • 如果存在 open() 方法并且成功返回(无论返回值如何),则会调用 close() 方法(如果存在),除非 JVM 或 Python 进程在中间崩溃。

注意: Spark 不保证 (partitionId, epochId) 的输出相同,因此无法使用 (partitionId, epochId) 实现去重。例如,源由于某些原因提供不同数量的分区,Spark 优化更改分区数量等。有关更多详细信息,请参阅 SPARK-28650。如果您需要对输出进行去重,请尝试使用 foreachBatch

流式表 API
自 Spark 3.1 起,您还可以使用 DataStreamReader.table() 将表读取为流式 DataFrame,并使用 DataStreamWriter.toTable() 将流式 DataFrame 写入为表:

spark = ...  # spark session# 创建流式 DataFrame
df = spark.readStream \.format("rate") \.option("rowsPerSecond", 10) \.load()# 将流式 DataFrame 写入表
df.writeStream \.option("checkpointLocation", "path/to/checkpoint/dir") \.toTable("myTable")# 检查表结果
spark.read.table("myTable").show()# 转换源数据集并写入新表
spark.readStream \.table("myTable") \.select("value") \.writeStream \.option("checkpointLocation", "path/to/checkpoint/dir") \.format("parquet") \.toTable("newTable")# 检查新表结果
spark.read.table("newTable").show()
val spark: SparkSession = ...// 创建流式 DataFrame
val df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()// 将流式 DataFrame 写入表
df.writeStream.option("checkpointLocation", "path/to/checkpoint/dir").toTable("myTable")// 检查表结果
spark.read.table("myTable").show()// 转换源数据集并写入新表
spark.readStream.table("myTable").select("value").writeStream.option("checkpointLocation", "path/to/checkpoint/dir").format("parquet").toTable("newTable")// 检查新表结果
spark.read.table("newTable").show()
SparkSession spark = ...// 创建流式 DataFrame
Dataset<Row> df = spark.readStream().format("rate").option("rowsPerSecond", 10).load();// 将流式 DataFrame 写入表
df.writeStream().option("checkpointLocation", "path/to/checkpoint/dir").toTable("myTable");// 检查表结果
spark.read().table("myTable").show();// 转换源数据集并写入新表
spark.readStream().table("myTable").select("value").writeStream().option("checkpointLocation", "path/to/checkpoint/dir").format("parquet").toTable("newTable");// 检查新表结果
spark.read().table("newTable").show();

有关更多详细信息,请查阅 DataStreamReader(Scala/Java/Python 文档)和 DataStreamWriter(Scala/Java/Python 文档)的文档。

触发器
流查询的触发器设置定义了流数据处理的时序,即查询是以固定批间隔的微批处理查询执行,还是作为连续处理查询执行。以下是支持的不同类型的触发器。

触发器类型描述
未指定(默认)如果未显式指定触发器设置,则默认情况下,查询将以微批处理模式执行,其中前一个微批处理完成后将立即生成新的微批处理。
固定间隔微批处理查询将以微批处理模式执行,其中微批处理将在用户指定的间隔启动。
如果前一个微批处理在间隔内完成,则引擎将等待间隔结束后再启动下一个微批处理。
如果前一个微批处理完成所需的时间超过间隔(即错过了间隔边界),则下一个微批处理将在前一个完成后立即启动(即,它不会等待下一个间隔边界)。
如果没有新数据可用,则不会启动微批处理。
一次性微批处理(已弃用)查询将仅执行一个微批处理来处理所有可用数据,然后自行停止。这在您希望定期启动集群、处理自上一个周期以来所有可用数据然后关闭集群的场景中非常有用。在某些情况下,这可能会显著节省成本。请注意,此触发器已弃用,鼓励用户迁移到可用-现在微批处理触发器,因为它提供了更好的处理保证、更细粒度的批处理规模以及更好的水印前进渐进处理(包括无数据批次)。
可用-现在微批处理与一次性微批处理触发器类似,查询将处理所有可用数据,然后自行停止。不同之处在于,它将根据源选项(例如,文件源的 maxFilesPerTrigger)以(可能)多个微批处理来处理数据,这将导致更好的查询可扩展性。
此触发器提供了强大的处理保证:无论上次运行遗留了多少批次,它都确保在执行时所有可用数据在终止前得到处理。所有未提交的批次将首先被处理。
水印每批次前进,如果最后一个批次推进了水印,则在终止前执行无数据批次。这有助于维护更小且可预测的状态大小以及状态操作符输出上更小的延迟。
具有固定检查点间隔的连续处理(实验性)查询将以新的低延迟连续处理模式执行。在下面的连续处理部分相关内容。

以下是一些代码示例。

# 默认触发器(尽可能快地运行微批处理)
df.writeStream \.format("console") \.start()# 处理时间触发器,具有两秒的微批处理间隔
df.writeStream \.format("console") \.trigger(processingTime='2 seconds') \.start()# 一次性触发器(已弃用,鼓励使用可用-现在触发器)
df.writeStream \.format("console") \.trigger(once=True) \.start()# 可用-现在触发器
df.writeStream \.format("console") \.trigger(availableNow=True) \.start()# 连续触发器,具有一秒的检查点间隔
df.writeStream.format("console").trigger(continuous='1 second').start()
import org.apache.spark.sql.streaming.Trigger// 默认触发器(尽可能快地运行微批处理)
df.writeStream.format("console").start()// 处理时间触发器,具有两秒的微批处理间隔
df.writeStream.format("console").trigger(Trigger.ProcessingTime("2 seconds")).start()// 一次性触发器(已弃用,鼓励使用可用-现在触发器)
df.writeStream.format("console").trigger(Trigger.Once()).start()// 可用-现在触发器
df.writeStream.format("console").trigger(Trigger.AvailableNow()).start()// 连续触发器,具有一秒的检查点间隔
df.writeStream.format("console").trigger(Trigger.Continuous("1 second")).start()
import org.apache.spark.sql.streaming.Trigger// 默认触发器(尽可能快地运行微批处理)
df.writeStream.format("console").start();// 处理时间触发器,具有两秒的微批处理间隔
df.writeStream.format("console").trigger(Trigger.ProcessingTime("2 seconds")).start();// 一次性触发器(已弃用,鼓励使用可用-现在触发器)
df.writeStream.format("console").trigger(Trigger.Once()).start();// 可用-现在触发器
df.writeStream.format("console").trigger(Trigger.AvailableNow()).start();// 连续触发器,具有一秒的检查点间隔
df.writeStream.format("console").trigger(Trigger.Continuous("1 second")).start();
# 默认触发器(尽可能快地运行微批处理)
write.stream(df, "console")# 处理时间触发器,具有两秒的微批处理间隔
write.stream(df, "console", trigger.processingTime = "2 seconds")# 一次性触发器
write.stream(df, "console", trigger.once = TRUE)# 连续触发器尚不支持

管理流查询
启动查询时创建的 StreamingQuery 对象可用于监视和管理查询。

query = df.writeStream.format("console").start()   # 获取查询对象query.id()          # 获取运行查询的唯一标识符,该标识符在从检查点数据重启时保持不变query.runId()       # 获取此查询运行的唯一 ID,该 ID 在每次启动/重启时生成query.name()        # 获取自动生成或用户指定的名称query.explain()   # 打印查询的详细说明query.stop()      # 停止查询query.awaitTermination()   # 阻塞直到查询终止,无论是通过 stop() 还是因错误终止query.exception()       # 如果查询因错误终止,则返回异常query.recentProgress  # 此查询最新进度更新的列表query.lastProgress    # 此流查询的最新进度更新
val query = df.writeStream.format("console").start()   // 获取查询对象query.id          // 获取运行查询的唯一标识符,该标识符在从检查点数据重启时保持不变query.runId       // 获取此查询运行的唯一 ID,该 ID 在每次启动/重启时生成query.name        // 获取自动生成或用户指定的名称query.explain()   // 打印查询的详细说明query.stop()      // 停止查询query.awaitTermination()   // 阻塞直到查询终止,无论是通过 stop() 还是因错误终止query.exception       // 如果查询因错误终止,则返回异常query.recentProgress  // 此查询最新进度更新的数组query.lastProgress    // 此流查询的最新进度更新
StreamingQuery query = df.writeStream().format("console").start();   // 获取查询对象query.id();          // 获取运行查询的唯一标识符,该标识符在从检查点数据重启时保持不变query.runId();       // 获取此查询运行的唯一 ID,该 ID 在每次启动/重启时生成query.name();        // 获取自动生成或用户指定的名称query.explain();   // 打印查询的详细说明query.stop();      // 停止查询query.awaitTermination();   // 阻塞直到查询终止,无论是通过 stop() 还是因错误终止query.exception();       // 如果查询因错误终止,则返回异常query.recentProgress();  // 此查询最新进度更新的数组query.lastProgress();    // 此流查询的最新进度更新
query <- write.stream(df, "console")  # 获取查询对象queryName(query)          # 获取自动生成或用户指定的名称explain(query)            # 打印查询的详细说明stopQuery(query)          # 停止查询awaitTermination(query)   # 阻塞直到查询终止,无论是通过 stop() 还是因错误终止lastProgress(query)       # 此流查询的最新进度更新

您可以在单个 SparkSession 中启动任意数量的查询。它们将同时运行,共享集群资源。您可以使用 sparkSession.streams() 获取 StreamingQueryManager(Scala/Java/Python 文档),该管理器可用于管理当前活动的查询。

spark = ...  # spark sessionspark.streams.active  # 获取当前活动流查询的列表spark.streams.get(id)  # 通过其唯一 ID 获取查询对象spark.streams.awaitAnyTermination()  # 阻塞直到其中任何一个终止
val spark: SparkSession = ...spark.streams.active    // 获取当前活动流查询的列表spark.streams.get(id)   // 通过其唯一 ID 获取查询对象spark.streams.awaitAnyTermination()   // 阻塞直到其中任何一个终止
SparkSession spark = ...spark.streams().active();    // 获取当前活动流查询的列表spark.streams().get(id);   // 通过其唯一 ID 获取查询对象spark.streams().awaitAnyTermination();   // 阻塞直到其中任何一个终止

监控流查询
有多种方法可以监控活动流查询。您可以使用 Spark 的 Dropwizard Metrics 支持将指标推送到外部系统,或者以编程方式访问它们。

交互式读取指标
您可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获取活动查询的当前状态和指标。lastProgress() 在 Scala 和 Java 中返回一个 StreamingQueryProgress 对象,在 Python 中返回一个具有相同字段的字典。它包含有关流在上次触发中所取得进度的所有信息——处理了哪些数据、处理速率、延迟等。还有 streamingQuery.recentProgress,它返回最近几个进度的数组。

此外,streamingQuery.status() 在 Scala 和 Java 中返回一个 StreamingQueryStatus 对象,在 Python 中返回一个具有相同字段的字典。它提供有关查询当前正在做什么的信息——触发器是否活动、数据是否正在处理等。

以下是一些示例。

query = ...  # 一个 StreamingQuery
print(query.lastProgress)'''
将打印类似以下内容。{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''print(query.status)
'''
将打印类似以下内容。{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...println(query.lastProgress)/* 将打印类似以下内容。{"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109","runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a","name" : "MyQuery","timestamp" : "2016-12-14T18:45:24.873Z","numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0,"durationMs" : {"triggerExecution" : 3,"getOffset" : 2},"eventTime" : {"watermark" : "2016-12-14T18:45:24.873Z"},"stateOperators" : [ ],"sources" : [ {"description" : "KafkaSource[Subscribe[topic-0]]","startOffset" : {"topic-0" : {"2" : 0,"4" : 1,"1" : 1,"3" : 1,"0" : 1}},"endOffset" : {"topic-0" : {"2" : 0,"4" : 115,"1" : 134,"3" : 21,"0" : 534}},"numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0} ],"sink" : {"description" : "MemorySink"}
}
*/println(query.status)/*  将打印类似以下内容。
{"message" : "Waiting for data to arrive","isDataAvailable" : false,"isTriggerActive" : false
}
*/
StreamingQuery query = ...System.out.println(query.lastProgress());
/* 将打印类似以下内容。{"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109","runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a","name" : "MyQuery","timestamp" : "2016-12-14T18:45:24.873Z","numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0,"durationMs" : {"triggerExecution" : 3,"getOffset" : 2},"eventTime" : {"watermark" : "2016-12-14T18:45:24.873Z"},"stateOperators" : [ ],"sources" : [ {"description" : "KafkaSource[Subscribe[topic-0]]","startOffset" : {"topic-0" : {"2" : 0,"4" : 1,"1" : 1,"3" : 1,"0" : 1}},"endOffset" : {"topic-0" : {"2" : 0,"4" : 115,"1" : 134,"3" : 21,"0" : 534}},"numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0} ],"sink" : {"description" : "MemorySink"}
}
*/System.out.println(query.status());
/*  将打印类似以下内容。
{"message" : "Waiting for data to arrive","isDataAvailable" : false,"isTriggerActive" : false
}
*/
query <- ...  # 一个 StreamingQuery
lastProgress(query)'''
将打印类似以下内容。{"id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9","runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16","name" : null,"timestamp" : "2017-04-26T08:27:28.835Z","numInputRows" : 0,"inputRowsPerSecond" : 0.0,"processedRowsPerSecond" : 0.0,"durationMs" : {"getOffset" : 0,"triggerExecution" : 1},"stateOperators" : [ {"numRowsTotal" : 4,"numRowsUpdated" : 0} ],"sources" : [ {"description" : "TextSocketSource[host: localhost, port: 9999]","startOffset" : 1,"endOffset" : 1,"numInputRows" : 0,"inputRowsPerSecond" : 0.0,"processedRowsPerSecond" : 0.0} ],"sink" : {"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"}
}
'''status(query)
'''
将打印类似以下内容。{"message" : "Waiting for data to arrive","isDataAvailable" : false,"isTriggerActive" : false
}
'''

使用异步 API 以编程方式报告指标
您还可以通过附加 StreamingQueryListener(Scala/Java/Python 文档)异步监视与 SparkSession 关联的所有查询。一旦您使用 sparkSession.streams.addListener() 附加了您的自定义 StreamingQueryListener 对象,您将在查询启动和停止时以及在活动查询取得进展时获得回调。以下是一个示例:

spark = ...class Listener(StreamingQueryListener):def onQueryStarted(self, event):print("Query started: " + event.id)def onQueryProgress(self, event):print("Query made progress: " + str(event.progress))def onQueryTerminated(self, event):print("Query terminated: " + event.id)spark.streams.addListener(Listener())
val spark: SparkSession = ...spark.streams.addListener(new StreamingQueryListener() {override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {println("Query started: " + queryStarted.id)}override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {println("Query terminated: " + queryTerminated.id)}override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {println("Query made progress: " + queryProgress.progress)}
})
SparkSession spark = ...spark.streams().addListener(new StreamingQueryListener() {@Overridepublic void onQueryStarted(QueryStartedEvent queryStarted) {System.out.println("Query started: " + queryStarted.id());}@Overridepublic void onQueryTerminated(QueryTerminatedEvent queryTerminated) {System.out.println("Query terminated: " + queryTerminated.id());}@Overridepublic void onQueryProgress(QueryProgressEvent queryProgress) {System.out.println("Query made progress: " + queryProgress.progress());}
});

使用 Dropwizard 报告指标
Spark 支持使用 Dropwizard 库报告指标。要同时启用结构化流查询的指标报告,您必须在 SparkSession 中显式启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")

启用此配置后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 将指标报告到已配置的任何接收器(例如 Ganglia、Graphite、JMX 等)。

使用检查点从故障中恢复
在发生故障或有意关闭的情况下,您可以恢复先前查询的进度和状态,并从其停止的地方继续。这是通过使用检查点和预写日志来完成的。您可以使用检查点位置配置查询,查询将将所有进度信息(即每次触发中处理的偏移量范围)和运行中的聚合(例如,快速示例中的单词计数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为一个选项。

aggDF \.writeStream \.outputMode("complete") \.option("checkpointLocation", "path/to/HDFS/dir") \.format("memory") \.start()
aggDF.writeStream.outputMode("complete").option("checkpointLocation", "path/to/HDFS/dir").format("memory").start()
aggDF.writeStream().outputMode("complete").option("checkpointLocation", "path/to/HDFS/dir").format("memory").start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

流查询更改后的恢复语义
在从同一检查点位置重新启动时,对流查询的更改存在限制。以下是一些不允许更改的类型,或者更改的效果未明确定义。对于所有这些:

  • "允许"一词意味着您可以进行指定的更改,但其效果的语义是否明确定义取决于查询和更改。
  • "不允许"一词意味着您不应进行指定的更改,因为重新启动的查询很可能会因不可预测的错误而失败。sdf 表示使用 sparkSession.readStream 生成的流式 DataFrame/Dataset。

更改类型

  1. 输入源数量或类型(即不同源)的更改:不允许。
  2. 输入源参数的更改:是否允许此更改以及更改的语义是否明确定义取决于源和查询。以下是一些示例。
    • 允许添加/删除/修改速率限制:spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • 不允许更改订阅的主题/文件,因为结果不可预测:spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "newTopic")
  3. 输出接收器类型的更改:允许在少数特定接收器组合之间进行更改。这需要根据具体情况验证。以下是一些示例。
    • 允许文件接收器更改为 Kafka 接收器。Kafka 将只看到新数据。
    • 不允许Kafka 接收器更改为文件接收器。
    • 允许Kafka 接收器更改为 foreach,反之亦然。
  4. 输出接收器参数的更改:是否允许此更改以及更改的语义是否明确定义取决于接收器和查询。以下是一些示例。
    • 不允许更改文件接收器的输出目录:sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 允许更改输出主题:sdf.writeStream.format("kafka").option("topic", "someTopic")sdf.writeStream.format("kafka").option("topic", "anotherTopic")
    • 允许更改用户定义的 foreach 接收器(即 ForeachWriter 代码),但更改的语义取决于代码。
  5. 投影/过滤/类似映射操作的更改:某些情况是允许的。例如:
    • 允许添加/删除过滤器:sdf.selectExpr("a")sdf.where(...).selectExpr("a").filter(...)
    • 允许具有相同输出模式的投影更改:sdf.selectExpr("stringColumn AS json").writeStreamsdf.selectExpr("anotherStringColumn AS json").writeStream
    • 有条件允许具有不同输出模式的投影更改:sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream 仅在输出接收器允许模式从 “a” 更改为 “b” 时才允许。
  6. 有状态操作的更改:流查询中的某些操作需要维护状态数据以持续更新结果。结构化流自动将状态数据检查点到容错存储(例如,HDFS、AWS S3、Azure Blob 存储)并在重新启动后恢复它。但是,这假设状态数据的模式在重新启动之间保持不变。这意味着在重新启动之间,不允许对流查询的有状态操作进行任何更改(即添加、删除或模式修改)。以下是有状态操作的列表,为了确保状态恢复,在重新启动之间不应更改其模式:
    • 流式聚合:例如,sdf.groupBy("a").agg(...)。不允许分组键或聚合的数量或类型的任何更改。
    • 流式去重:例如,sdf.dropDuplicates("a")。不允许去重列的数量或类型的任何更改。
    • 流-流连接:例如,sdf1.join(sdf2, ...)(即两个输入都是使用 sparkSession.readStream 生成的)。不允许模式或等值连接列的更改。不允许连接类型(外部或内部)的更改。连接条件中的其他更改定义不明确。
    • 任意有状态操作:例如,sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)。不允许对用户定义状态的模式和超时类型的任何更改。用户定义状态映射函数内的任何更改都是允许的,但更改的语义效果取决于用户定义的逻辑。如果您确实希望支持状态模式更改,则可以使用支持模式迁移的编码/解码方案将复杂状态数据结构显式编码/解码为字节。例如,如果您将状态保存为 Avro 编码的字节,那么您可以自由地在查询重新启动之间更改 Avro 状态模式,因为二进制状态将始终成功恢复。

异步进度跟踪
是什么?
异步进度跟踪允许流查询在微批处理内异步且与实际数据处理并行地进行进度检查点,从而减少与维护偏移量日志和提交日志相关的延迟。

异步进度跟踪

它是如何工作的?
结构化流依赖于持久化和管偏移量作为查询处理的进度指示器。偏移量管理操作直接影响处理延迟,因为在这些操作完成之前无法进行数据处理。异步进度跟踪使流查询能够进行进度检查点,而不受这些偏移量管理操作的影响。

如何使用它?
下面的代码片段提供了一个如何使用此功能的示例:

val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "in").load()
val query = stream.writeStream.format("kafka").option("topic", "out").option("checkpointLocation", "/tmp/checkpoint").option("asyncProgressTrackingEnabled", "true").start()

下表描述了此功能的配置及其关联的默认值。

选项默认值描述
asyncProgressTrackingEnabledtrue/falsefalse启用或禁用异步进度跟踪
asyncProgressTrackingCheckpointIntervalMs毫秒1000我们提交偏移量和完成提交的间隔

限制
该功能的初始版本有以下限制:

  • 异步进度跟踪仅在使用 Kafka 接收器的无状态查询中受支持。
  • 使用此异步进度跟踪将不支持端到端的精确一次处理,因为在故障情况下批处理的偏移量范围可能会改变。不过,许多接收器(例如 Kafka 接收器)本身就不支持精确一次写入。

关闭该设置
关闭异步进度跟踪可能会导致抛出以下异常:
java.lang.IllegalStateException: batch x doesn't exist

此外,驱动程序日志中可能会打印以下错误消息:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

这是因为当启用异步进度跟踪时,框架不会像未使用异步进度跟踪那样为每个批处理检查点进度。要解决此问题,只需重新启用 "asyncProgressTrackingEnabled" 并将 "asyncProgressTrackingCheckpointIntervalMs" 设置为 0,然后运行流查询直到至少处理了两个微批处理。现在可以安全地禁用异步进度跟踪,重新启动查询应该可以正常进行。

连续处理
[实验性]
连续处理是 Spark 2.3 中引入的一种新的实验性流执行模式,它能够实现低延迟(约 1 毫秒)的端到端处理,并具有至少一次的容错保证。相比之下,默认的微批处理引擎可以实现精确一次的保证,但最佳延迟也只能达到约 100 毫秒。对于某些类型的查询(下面讨论),您可以选择以哪种模式执行它们,而无需修改应用程序逻辑(即无需更改 DataFrame/Dataset 操作)。

要在连续处理模式下运行受支持的查询,您需要做的就是指定一个连续触发器,并将所需的检查点间隔作为参数。例如,

spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load() \.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "topic1") \.trigger(continuous="1 second") \     # 查询中唯一的更改.start()
import org.apache.spark.sql.streaming.Triggerspark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").trigger(Trigger.Continuous("1 second"))  // 查询中唯一的更改.start()
import org.apache.spark.sql.streaming.Trigger;spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").trigger(Trigger.Continuous("1 second"))  // 查询中唯一的更改.start();

1 秒的检查点间隔意味着连续处理引擎将每秒记录一次查询的进度。生成的检查点格式与微批处理引擎兼容,因此任何查询都可以使用任何触发器重新启动。例如,以微批处理模式启动的受支持查询可以以连续模式重新启动,反之亦然。请注意,任何时候切换到连续模式,您都将获得至少一次的容错保证。

支持的查询
截至 Spark 2.4,只有以下类型的查询在连续处理模式下受支持。

  • 操作:在连续模式下仅支持类似映射的 Dataset/DataFrame 操作,即仅投影(selectmapflatMapmapPartitions 等)和选择(wherefilter 等)。
  • 除聚合函数(因为聚合尚不支持)、current_timestamp()current_date()(使用时间进行确定性计算具有挑战性)外,所有 SQL 函数都受支持。
    • Kafka 源:支持所有选项。
    • Rate 源:适用于测试。在连续模式下支持的选项只有 numPartitionsrowsPerSecond
  • 接收器
    • Kafka 接收器:支持所有选项。
    • 内存接收器:适用于调试。
    • 控制台接收器:适用于调试。支持所有选项。请注意,控制台将打印您在连续触发器中指定的每个检查点间隔。

有关它们的更多详细信息,请参阅输入源和输出接收器部分。虽然控制台接收器适用于测试,但使用 Kafka 作为源和接收器可以最好地观察端到端的低延迟处理,因为这使得引擎能够在输入数据在输入主题中可用后的几毫秒内处理数据并使结果在输出主题中可用。

注意事项

  • 连续处理引擎启动多个长时间运行的任务,这些任务持续从源读取数据、处理数据并持续写入接收器。查询所需的任务数量取决于查询可以从源并行读取的分区数量。因此,在启动连续处理查询之前,必须确保集群中有足够的核心来并行运行所有任务。例如,如果您正在读取一个有 10 个分区的 Kafka 主题,那么集群必须至少有 10 个核心,查询才能取得进展。
  • 停止连续处理流可能会产生虚假的任务终止警告。这些可以安全地忽略。
  • 目前没有对失败任务的自动重试。任何故障都将导致查询停止,需要手动从检查点重新启动。

附加信息
说明

查询运行后,有几个配置是不可修改的。要更改它们,请丢弃检查点并启动新查询。这些配置包括:

  • spark.sql.shuffle.partitions
    • 这是由于状态的物理分区造成的:状态通过对键应用哈希函数进行分区,因此状态的分区数量应保持不变。
    • 如果您希望为有状态操作运行更少的任务,coalesce 有助于避免不必要的重新分区。
    • 合并后,(减少后的)任务数量将保持不变,除非发生另一次洗牌。
  • spark.sql.streaming.stateStore.providerClass:要正确读取查询的先前状态,状态存储提供程序的类应保持不变。
  • spark.sql.streaming.multipleWatermarkPolicy:修改此配置会导致查询包含多个水印时水印值不一致,因此策略应保持不变。

进一步阅读

  • 查看并运行 Scala/Java/Python/R 示例。
  • 如何运行 Spark 示例的说明
  • 在结构化流 Kafka 集成指南中阅读有关与 Kafka 集成的信息。
  • 在 Spark SQL 编程指南中阅读有关使用 DataFrames/Datasets 的更多详细信息。
  • 第三方博客文章
    • Apache Spark 2.1 中使用结构化流的实时流式 ETL (Databricks Blog)
    • Apache Spark 结构化流中与 Apache Kafka 的实时端到端集成 (Databricks Blog)
    • Apache Spark 结构化流中的事件时间聚合和水印 (Databricks Blog)
  • 演讲
    • Spark Summit Europe 2017
      • 使用 Apache Spark 中的结构化流进行简单、可扩展、容错的流处理 - 第 1 部分 幻灯片/视频, 第 2 部分 幻灯片/视频
      • 深入探讨结构化流中的有状态流处理 - 幻灯片/视频
    • Spark Summit 2016
      • 深入探讨结构化流 - 幻灯片/视频

迁移指南

迁移指南现已存档在此页面上。

http://www.dtcms.com/a/582107.html

相关文章:

  • 汽车OTA中的证书和证书链
  • 玩转Rust高级应用 怎么理解在标准库中,有一个std::intrinsics模块,它里面包含了一系列的编译器内置函数
  • fixedbug:Idea 项目启动Command line is too long
  • 乌兰察布网站制作互联网行业属于什么行业
  • 破解“用工难”!福欣精密借力金属3D打印重塑生产效率
  • 【剑斩OFFER】算法的暴力美学——二分查找
  • 找人做个网站大概多少钱做一款什么网站赚钱
  • 一个网站是如何建设中国十大seo公司
  • Java_HashMap底层机制与原码解读
  • 【ComfyUI】Wan2.2 CharacterMotion 单图角色关键词驱动视频生成
  • 网站学习流程北京朝阳区邮编
  • 河北响应式网站建设哪家有珠海编程培训机构
  • TypeScript核心类型系统完全指南
  • 做跨境电商,怎么用Facebook如何快速测品
  • 【ZeroRange WebRTC】RTP/SRTP 在 WebRTC 中的角色与工作原理(深入指南)
  • 做网站图注意事项买完域名接下来怎么弄
  • 襄阳做网站公司哪家好wordpress json 插件安装
  • 异常的回声——C++异常机制的堆栈回滚与性能真相
  • 【AI】人类思维方式
  • 公众号微信网站开发网站免费模版代码
  • 解决Unsupported characters for the charset ‘ISO-8859-1‘
  • 机器学习在供水管网阀门管理中的应用
  • React Native (RN)项目在web、Android和IOS上运行
  • 【信息安全毕业设计】基于zkSNARK与递归证明的数字签名验证方案研究
  • 研0不会总结文献核心科学问题?
  • pyside6常用控件: QProgressBar() 进度条显示
  • H5 移动端调试全流程指南,从浏览器模拟到真机 WebView 调试的完整实践
  • a4网站建设网站建站多少钱
  • 整合多平台消息:使用n8n的HTTP请求节点创建智能通知中心
  • 基于SpringBoot的动漫周边商场系统的设计与开发