kafka-3.3.1
kafka-3.3.1
1. 开始
1.1 简介
什么是事件流?
事件流是人体中枢神经系统的数字等价物。它是“永远在线”世界的技术基础,在这个世界中,企业越来越多地由软件定义和自动化,并且软件的用户更多是软件。
从技术上讲,事件流是以事件流的形式从数据库、传感器、移动设备、云服务和软件应用程序等事件源实时捕获数据的实践;持久存储这些事件流以供以后检索;实时和回顾性地操纵、处理和响应事件流;并根据需要将事件流路由到不同的目标技术。因此,事件流可确保数据的连续流动和解释,从而使正确的信息在正确的时间出现在正确的位置。
我可以使用事件流来做什么?
事件流应用于 众多行业和组织的各种用例。它的许多例子包括:
-
实时处理支付和金融交易,例如证券交易所、银行和保险。
-
实时跟踪和监控汽车、卡车、车队和货物,例如物流和汽车行业。
-
持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电场。
-
收集客户互动和订单并立即做出反应,例如在零售、酒店和旅游业以及移动应用程序中。
-
监测住院患者并预测病情变化,确保在紧急情况下及时救治。
-
连接、存储和提供公司不同部门生成的数据。
-
作为数据平台、事件驱动架构和微服务的基础
Apache Kafka® 是一个事件流平台。这意味着什么?
Kafka 结合了三个关键功能,因此您可以 使用 一个久经考验的解决方案实现端到端事件流 的用例:
- 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据 。
- 只要您愿意,就可以持久可靠地存储事件 流**。**
- 在事件发生时或回顾性 地处理事件流。
所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以选择自行管理 Kafka 环境和使用各种供应商提供的完全托管服务。
简而言之,Kafka 是如何工作的?
Kafka 是一个分布式系统,由通过高性能TCP 网络协议进行通信的服务器和客户端组成。它可以部署在本地和云环境中的裸机硬件、虚拟机和容器上。
服务器:Kafka 作为一个或多个服务器集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以事件流的形式持续导入和导出数据,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一台服务器发生故障,其他服务器将接管它们的工作以确保连续运行而不会丢失任何数据。
客户端:它们允许您编写分布式应用程序和微服务,即使在出现网络问题或机器故障的情况下,也能以容错的方式并行、大规模地读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的 数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。
主要概念和术语
事件记录了世界上或您的企业中“发生了某事” 的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您会以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。这是一个示例事件:
- 活动key:“Alice”
- 事件vlue:“向 Bob 支付了 200 美元”
- 事件时间戳:“2020 年 6 月 25 日下午 2:06”
生产者是那些向 Kafka 发布(写入)事件的客户端应用程序,而消费者是那些订阅(读取和处理)这些事件的客户端应用程序。在 Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是实现 Kafka 著名的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如能够精确地处理一次事件。
事件被组织并持久存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。示例主题名称可以是“付款”。Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统的消息系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该保留您的事件多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据完全没问题。
主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。这种数据的分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从/向许多代理读取和写入数据。当一个新事件被发布到一个主题时,它实际上被附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件将写入同一分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。
图:这个示例主题有四个分区 P1–P4。两个不同的生产者客户端通过网络将事件写入主题的分区,彼此独立地向主题发布新事件。具有相同键的事件(在图中用它们的颜色表示)被写入相同的分区。请注意,如果合适,两个生产者都可以写入同一分区。
为了使您的数据具有容错性和高可用性,可以复制每个主题,甚至可以跨地理区域或数据中心,以便始终有多个代理拥有数据副本以防万一出现问题,您希望对经纪人进行维护,等等。一个常见的生产设置是复制因子 3,即你的数据总是有三个副本。这种复制是在主题分区级别执行的。
这本入门书应该足以进行介绍。如果您有兴趣,文档的设计部分详细解释了 Kafka 的各种概念。
Kafka APIs
除了用于管理和管理任务的命令行工具外,Kafka 还具有五个用于 Java 和 Scala 的核心 API:
-
Admin API用于管理和检查主题 、代理和其他 Kafka 对象。
-
Producer API用于 将事件流发布(写入)到一个或多个 Kafka 主题。
-
Consumer API用于订阅(读取)一个或多个主题并处理为它们生成的事件流 。
-
用于实现流处理应用程序和微服务 的Kafka Streams API 。它提供更高级别的功能来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个主题读取输入以生成一个或多个主题的输出,有效地将输入流转换为输出流。
-
Kafka Connect API用于构建和运行可重复使用 的数据导入/导出连接器,这些连接器使用(读取)或生成(写入)来自外部系统和应用程序的事件流,以便它们可以与 Kafka 集成。例如,连接到关系数据库(如 PostgreSQL)的连接器可能会捕获对一组表的每个更改。但是,实际上,您通常不需要实现自己的连接器,因为 Kafka 社区已经提供了数百个现成的连接器。
怎样学习
- 要获得 Kafka 的实践经验,请遵循快速入门。
- 要更详细地了解 Kafka,请阅读文档。您还可以选择Kafka 书籍和学术论文。
- 浏览用例,了解我们全球社区中的其他用户如何从 Kafka 中获得价值。
- 加入当地的 Kafka 聚会小组, 观看Kafka 社区的主要会议 Kafka Summit 的演讲。
1.2 用例
以下是 Apache Kafka® 的一些流行用例的描述。有关其中一些领域的概述,请参阅此博客文章。
讯息
Kafka 可以很好地替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。
根据我们的经验,消息传递的使用通常吞吐量相对较低,但可能需要低端到端延迟,并且通常依赖于 Kafka 提供的强大持久性保证。
在这个领域中,Kafka 可与传统消息系统(如ActiveMQ或 RabbitMQ )相媲美。
网站活动跟踪
Kafka 的最初用例是能够将用户活动跟踪管道重建为一组实时发布-订阅提要。这意味着站点活动(页面浏览、搜索或用户可能采取的其他操作)将发布到中心主题,每个活动类型一个主题。这些提要可用于订阅一系列用例,包括实时处理、实时监控以及加载到 Hadoop 或离线数据仓库系统以进行离线处理和报告。
活动跟踪的量通常非常大,因为每个用户页面视图都会生成许多活动消息。
指标
Kafka常用于运营监控数据。这涉及汇总来自分布式应用程序的统计数据,以生成集中的运营数据提要。
许多人使用 Kafka 作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在一个中央位置(可能是文件服务器或 HDFS)进行处理。Kafka 抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低的延迟处理和更容易支持多个数据源和分布式数据消费。与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka 提供了同样出色的性能、由于复制而提供的更强的持久性保证以及更低的端到端延迟。
流处理
Kafka 的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从 Kafka 主题中使用,然后聚合、丰富或以其他方式转换为新主题以供进一步使用或后续处理。例如,用于推荐新闻文章的处理管道可能会从 RSS 提要中抓取文章内容并将其发布到“文章”主题;进一步处理可能会规范化或删除重复内容,并将清理后的文章内容发布到新主题;最后的处理阶段可能会尝试向用户推荐此内容。此类处理管道根据各个主题创建实时数据流图。从0.10.0.0开始,一个轻量级但强大的流处理库Kafka Streams 在 Apache Kafka 中可用以执行上述数据处理。除了 Kafka Streams,替代的开源流处理工具包括Apache Storm和 Apache Samza。
事件溯源
事件溯源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列。Kafka 对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。
提交日志
Kafka 可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka 中的日志压缩功能有助于支持这种用法。在这种用法中,Kafka 类似于Apache BookKeeper项目。
1.3 快速入门
第 1 步:获取 KAFKA
下载 最新的 Kafka 版本并解压:
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
第2步:启动KAFKA环境
注意:您的本地环境必须安装 Java 8+。
Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动。要开始使用任一配置,请遵循以下部分,但不要同时遵循这两个部分。
Kafka 与 ZooKeeper
运行以下命令以按正确顺序启动所有服务:
# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话并运行:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务都成功启动,您将拥有一个正在运行并可以使用的基本 Kafka 环境。注:自带的zookeeper不适合作为集群应用
kafka与KRaft
修改配置文件 …/config/kraft/server.properties
#kafka#kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功
能)
process.roles=broker, controller 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功
能)
node.id=1
controller.quorum.voters=@192.168.88.139:9093
advertised.listeners=PLAINTEXT://192.168.88.139:9092
log.dirs=/opt/kafka_2.13-3.3.1/kafka_log
生成集群 UUID
$ bin/kafka-storage.sh random-uuid
生成的uuid
格式化日志目录
$ bin/kafka-storage.sh format -t RbHFvk0wTTKMYnSRNpMTpA -c config/kraft/server.properties
启动卡夫卡服务器
$ bin/kafka-server-start.sh config/kraft/server.properties
Kafka 服务器成功启动后,您将拥有一个正在运行并可以使用的基本 Kafka 环境。
第 3 步:创建一个主题来存储您的事件
Kafka 是一个分布式事件流平台,可让您跨多台计算机 读取、写入、存储和处理 事件(在文档中也称为记录或 消息)。
示例事件包括支付交易、来自手机的地理位置更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。
因此,在您编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --list --bootstrap-server 192.168.88.134:9092
Kafka 的所有命令行工具都有额外的选项:运行kafka-topics.sh
不带任何参数的命令以显示使用信息。例如,它还可以向您显示 新主题 的分区计数等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
第 4 步:将一些事件写入主题
Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理将以持久和容错的方式存储事件,只要您需要——甚至永远。
运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致一个单独的事件被写入主题。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以随时停止生产者客户端Ctrl-C
。
第 5 步:读取事件
打开另一个终端会话并运行控制台消费者客户端以读取您刚刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以随时停止消费者客户端Ctrl-C
。
随意尝试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在您的消费者终端中。
因为事件持久存储在 Kafka 中,所以它们可以被任意多次读取,并且可以被任意多的消费者读取。您可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。
第 6 步:使用 KAFKA CONNECT 将数据导入/导出为事件流
您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经在使用这些系统的应用程序。 Kafka Connect允许您不断地将数据从外部系统提取到 Kafka 中,反之亦然。它是一个运行 连接器的可扩展工具,连接器实现了与外部系统交互的自定义逻辑。因此很容易将现有系统与 Kafka 集成。为了使这个过程更容易,有数百个这样的连接器随时可用。
在本快速入门中,我们将了解如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka 主题并将数据从 Kafka 主题导出到文件。
首先,确保添加connect-file-3.3.1.jar
到plugin.path
Connect worker 配置中的属性。出于本快速入门的目的,我们将使用相对路径并将连接器的包视为超级 jar,当从安装目录运行快速入门命令时,它会起作用。但是,值得注意的是,对于生产部署,使用绝对路径始终是可取的。有关如何设置此配置的详细说明, 请参阅plugin.path 。
编辑config/connect-standalone.properties
文件,添加或更改plugin.path
与以下匹配的配置属性,然后保存文件:
> echo "plugin.path=libs/connect-file-3.3.1.jar"
然后,首先创建一些种子数据进行测试:
> echo -e "foo\nbar" > test.txt
或者在 Windows 上:
> echo foo> test.txt
> echo bar>> test.txt
接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka 代理和数据的序列化格式。其余配置文件分别指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件包含在 Kafka 中,使用您之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成到 Kafka 主题,第二个是接收器连接器它从 Kafka 主题读取消息并在输出文件中将每条消息生成为一行。
在启动过程中,您会看到许多日志消息,包括一些表明正在实例化连接器的消息。一旦 Kafka Connect 进程启动,源连接器应该开始从主题读取行test.txt
并将它们生成到主题connect-test
,而接收器连接器应该开始从主题读取消息connect-test
并将它们写入文件test.sink.txt
。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传送:
> more test.sink.txt
foo
bar
请注意,数据存储在 Kafka 主题connect-test
中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器继续处理数据,因此我们可以将数据添加到文件中,并看到数据在管道中移动:
> echo Another line>> test.txt
您应该会在控制台使用者输出和接收器文件中看到这一行。
第 7 步:使用 KAFKA STREAMS 处理您的事件
一旦您的数据作为事件存储在 Kafka 中,您就可以使用适用于 Java/Scala 的 Kafka Streams客户端库处理数据。它允许您实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式。该库支持恰好一次处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了给你一个初步的体验,下面是如何实现流行的WordCount
算法:
KStream<String, String> textLines = builder.stream("quickstart-events");KTable<String, Long> wordCounts = textLines.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))).groupBy((keyIgnored, word) -> word).count();wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka Streams 演示 和应用程序开发教程演示 了 如何从头到尾编写和运行此类流应用程序。
第8步:终止KAFKA环境
现在您已经完成了快速入门,可以随意拆除 Kafka 环境,或者继续尝试。
- 使用 停止生产者和消费者客户
Ctrl-C
,如果您还没有这样做的话。 - 使用 停止 Kafka 代理
Ctrl-C
。 - 最后,如果遵循 Kafka with ZooKeeper 部分,请使用 停止 ZooKeeper 服务器
Ctrl-C
。
如果您还想删除本地 Kafka 环境的任何数据,包括您在此过程中创建的任何事件,请运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
祝贺 你!
阿帕奇已经成功地完成了快速入门。
为了了解更多信息,我们建议以下步骤:
- 通读简报介绍了解卡夫卡如何在高水平上工作,它的主要概念,以及它与其他技术的比较。要更详细地了解卡夫卡,请前往文档 .
- 浏览用例了解全球社区的其他用户如何从卡夫卡身上获得价值。
- 加入当地卡夫卡会议小组和观看卡夫卡峰会会谈卡夫卡社区的主要会议。
1.4 生态系统
在主发行版之外有很多工具可以与Kafka集成。这个生态系统页面列出其中许多,包括流处理系统、Hadoop集成、监视和部署工具
2.APIS
Kafka包括五个核心API:
- 这个ProducerAPI允许应用程序向Kafka集群中的主题发送数据流。
- 这个ConsumerAPI允许应用程序从Kafka集群中的主题读取数据流。
- 这个StreamsAPI允许将数据流从输入主题转换为输出主题。
- 这个connectAPI允许实现连接器,这些连接器不断地从源系统或应用程序拉入Kafka,或从Kafka推送到某些接收器系统或应用程序。
- 这个AdminAPI允许管理和检查主题、代理和其他Kafka对象。
Kafka通过一个独立于语言的协议公开了它的所有功能,该协议有许多编程语言的客户端。然而,只有Java客户机作为Kafka主项目的一部分进行维护,其他客户机作为独立的开源项目提供。提供了一个非Java客户机列表
在这里.
2.1生产者API
producerapi允许应用程序向Kafka集群中的主题发送数据流。
中提供了演示如何使用producer的示例 javadocs公司 .
要使用producer,可以使用以下maven依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>
2.2消费者api
消费者API允许应用程序从Kafka集群中的主题读取数据流。
中给出了如何使用消费者的示例 javadocs公司 .
要使用使用者,可以使用以下maven依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>
2.3Streams API
这个StreamsAPI允许将数据流从输入主题转换为输出主题。
中提供了演示如何使用此库的示例 javadocs公司
有关使用Streams API的其他文档可用在这里 .
要使用Kafka流,可以使用以下maven依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>3.3.1</version>
</dependency>
使用Scala时,可以选择包含kafka-streams-scala
图书馆。关于在Scala中使用Kafka Streams DSL的其他文档也可用在开发人员指南中 .
要使用Kafka Streams DSL for Scala for Scala 2.13,可以使用以下maven依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams-scala_2.13</artifactId><version>3.3.1</version>
</dependency>
2.4连接API
connectapi允许实现连接器,这些连接器不断地从源数据系统拉入Kafka或从Kafka推送到某个sink数据系统。
许多Connect用户不需要直接使用这个API,但是他们可以使用预先构建的连接器,而不需要编写任何代码。提供了有关使用Connect的其他信息在这里 .
希望实现自定义连接器的用户可以看到 java文档 .
2.5管理API
管理API支持管理和检查主题、代理、ACL和其他Kafka对象。
要使用管理API,请添加以下Maven依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>
有关管理的更多信息,请参阅API
java文档
.
3. 配置
Kafka使用属性文件格式的键值对进行配置。这些值可以从文件或以编程方式提供。
3.1 Broker Configs
对于ZooKeeper集群,代理必须具有以下配置:
broker.id
log.dirs
zookeeper.connect
对于KRaft集群,代理和控制器必须具有以下配置:
node.id
log.dirs
process.roles
关于KRaft经纪人,如果是broker.id
已设置,它必须等于node.id。下面将详细讨论主题级配置和默认值。
advertised.listeners
要发布到ZooKeeper供客户端使用的侦听器,如果不同于侦听器配置属性。在IaaS环境中,这可能需要与代理绑定的接口不同。如果未设置此值,将使用侦听器的值。与侦听器不同,通告0.0.0.0原地址是无效的。
与侦听器不同的是,此属性中可以有重复的端口,因此可以将一个侦听器配置为通告另一个侦听程序的地址。这在使用外部负载平衡器的某些情况下非常有用
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | per-broker |
auto.create.topics.enable
启用在服务器上自动创建主题
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
auto.leader.rebalance.enable
启用自动引线平衡。后台线程定期检查分区前导的分布,可通过“leader.inbalance.check.interval.seconds”进行配置。如果领导者失衡超过`leader.imbalance.per.broker。百分比”,则会触发分区的首选引线重新平衡。
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
background.threads
用于各种后台处理任务的线程数
Type: | int |
---|---|
Default: | 10 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
broker.id
此服务器的代理id。如果未设置,将生成唯一的代理id。为了避免zookeeper生成的代理id和用户配置的代理id之间的冲突,生成的代理id从reserved.broker.max.id+1开始。
Type: | int |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
compression.type
指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“zip”、“snappy”、“lz4”、“zstd”)。它还接受“未压缩”,这相当于没有压缩;和“生产者”,这意味着保留生产者设置的原始压缩编解码器。
Type: | string |
---|---|
Default: | producer |
Valid Values: | [uncompressed, zstd, lz4, snappy, gzip, producer] |
Importance: | high |
Update Mode: | cluster-wide |
control.plane.listener.name
用于控制器和代理之间通信的侦听器的名称。Broker将使用control.plane.listener.name在侦听器列表中查找端点,侦听来自控制器的连接。例如,如果代理的配置为:
listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER
启动时,代理将开始使用安全协议“SSL”侦听“192.1.1.8:9094”。在控制器端,当它通过zookeeper发现代理的发布端点时,它将使用control.plane.listener.name来查找端点,它将使用该端点来建立与代理的连接。
“端点” : [“INTERNAL://broker1.example.com:9092”,“EXTERNAL://broker1.example.com:9093”,"控制器的配置为:
listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER
则控制器将使用带有安全协议“SSL”的“broker1.example.com:9094”连接到代理。
如果未显式配置,默认值将为空,并且控制器连接将没有专用端点。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
controller.listener.names
控制器使用的侦听器名称的逗号分隔列表。如果在KRaft模式下运行,这是必需的。当与控制器仲裁通信时,代理将始终使用此列表中的第一个侦听器。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
controller.quorum.election.backoff.max.ms
开始新选举前的最长时间(毫秒)。这用于二元指数退避机制,有助于防止选举僵局
Type: | int |
---|---|
Default: | 1000 (1 second) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
controller.quorum.election.timeout.ms
在触发新选举之前无法从领导人那里获取信息的最长等待时间(毫秒)
Type: | int |
---|---|
Default: | 1000 (1 second) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
controller.quorum.fetch.timeout.ms
在成为候选人并引发选民选举之前,现任领导人没有成功提名的最长时间;在询问领导人是否有新的时代之前,最长的时间没有收到大多数法定人数的请求
Type: | int |
---|---|
Default: | 2000 (2 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
controller.quorum.voters
以逗号分隔的“{id}@{host}:{port}”条目列表中的一组投票者的id/端点信息的映射。例如:1@localhost:9092,2@localhost:9093,3@localhost:9094
Type: | list |
---|---|
Default: | “” |
Valid Values: | non-empty list |
Importance: | high |
Update Mode: | read-only |
delete.topic.enable
启用删除主题。如果关闭此配置,则通过管理工具删除主题将无效
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
early.start.listeners
一个逗号分隔的侦听器名称列表,可以在授权者完成初始化之前启动。当授权者依赖集群本身进行引导时,这非常有用,就像StandardAuthorizer(它将ACL存储在元数据日志中)的情况一样。默认情况下,controller.listener中包含所有侦听器。名字也将是早期开始的听众。如果侦听器接受外部通信,则它不应出现在此列表中。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
leader.imbalance.check.interval.seconds
控制器触发分区重新平衡检查的频率
Type: | long |
---|---|
Default: | 300 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
leader.imbalance.per.broker.percentage
每个经纪人允许的领先者失衡比率。如果超过每个经纪人的这个值,控制器将触发领先余额。该值以百分比表示
Type: | int |
---|---|
Default: | 10 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
listeners
侦听器列表-我们将侦听的URI和侦听器名称的逗号分隔列表。如果侦听器名称不是安全协议,则listener.security.protocol。还必须设置映射。
侦听器名称和端口号必须唯一。将主机名指定为0.0.0.0以绑定到所有接口。将主机名保留为空以绑定到默认接口。合法听众列表示例:
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
Type: | string |
---|---|
Default: | PLAINTEXT://:9092 |
Valid Values: | |
Importance: | high |
Update Mode: | per-broker |
log.dir
保存日志数据的目录(log.dirs属性的补充)
Type: | string |
---|---|
Default: | /tmp/kafka-logs |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
log.dirs
存储日志数据的目录的逗号分隔列表。如果未设置,则为日志中的值。使用log.dir。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
log.flush.interval.messages
在将消息刷新到磁盘之前,日志分区上累积的消息数
Type: | long |
---|---|
Default: | 9223372036854775807 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
log.flush.interval.ms
任何主题中的消息在刷新到磁盘之前保留在内存中的最长时间(毫秒)。如果未设置,则为log.flush.scheduler.interval中的值。使用ms
Type: | long |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
log.flush.offset.checkpoint.interval.ms
更新作为日志恢复点的上次刷新的持久记录的频率
The frequency with which we update the persistent record of the last flush which acts as the log recovery point
Type: | int |
---|---|
Default: | 60000 (1 minute) |
Valid Values: | [0,…] |
Importance: | high |
Update Mode: | read-only |
log.flush.scheduler.interval.ms
日志刷新器检查是否需要将任何日志刷新到磁盘的频率(毫秒)
Type: | long |
---|---|
Default: | 9223372036854775807 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
log.flush.start.offset.checkpoint.interval.ms
更新日志开始偏移的持久记录的频率
Type: | int |
---|---|
Default: | 60000 (1 minute) |
Valid Values: | [0,…] |
Importance: | high |
Update Mode: | read-only |
log.retention.bytes
删除之前日志的最大大小
Type: | long |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
log.retention.hours
日志文件在删除前保留的小时数(以小时为单位),是log.retention.ms属性的第三级
Type: | int |
---|---|
Default: | 168 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
log.retention.minutes
日志文件在删除前保留的分钟数(以分钟为单位),次于log.retention.ms属性。如果未设置,则使用log.retention.hours中的值
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
log.retention.ms
删除日志文件之前保留日志文件的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值。如果设置为-1,则不应用时间限制。
Type: | long |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
log.roll.hours
新日志段转出之前的最长时间(小时),仅次于log.roll.ms属性
Type: | int |
---|---|
Default: | 168 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
log.roll.jitter.hours
要从logRollTimeMillis(以小时为单位)中减去的最大抖动,从属于log.roll.jitter.ms属性
Type: | int |
---|---|
Default: | 0 |
Valid Values: | [0,…] |
Importance: | high |
Update Mode: | read-only |
log.roll.jitter.ms
要从logRollTimeMillis中减去的最大抖动(以毫秒为单位)。如果未设置,则使用log.roll.jitter.hours中的值
Type: | long |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
log.roll.ms
新日志段转出之前的最长时间(毫秒)。如果未设置,则使用log.roll.hours中的值
Type: | long |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
log.segment.bytes
单个日志文件的最大大小
Type: | int |
---|---|
Default: | 1073741824 (1 gibibyte) |
Valid Values: | [14,…] |
Importance: | high |
Update Mode: | cluster-wide |
log.segment.delete.delay.ms
从文件系统中删除文件之前等待的时间
Type: | long |
---|---|
Default: | 60000 (1 minute) |
Valid Values: | [0,…] |
Importance: | high |
Update Mode: | cluster-wide |
message.max.bytes
Kafka允许的最大记录批处理大小(如果启用了压缩,则在压缩之后)。如果该值增加,并且存在早于0.10.2的使用者,则使用者的提取大小也必须增加,以便他们可以提取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。这可以通过主题级别“max.message.bytes”配置按主题进行设置。
Type: | int |
---|---|
Default: | 1048588 |
Valid Values: | [0,…] |
Importance: | high |
Update Mode: | cluster-wide |
metadata.log.dir
这个配置决定了我们在KRaft模式下将集群的元数据日志放在哪里。如果没有设置,元数据日志将放在log.dirs的第一个日志目录中。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
metadata.log.max.record.bytes.between.snapshots
这是日志中最新快照与生成新快照之前所需的高水位线之间的最大字节数。
Type: | long |
---|---|
Default: | 20971520 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
metadata.log.segment.bytes
单个元数据日志文件的最大大小
Type: | int |
---|---|
Default: | 1073741824 (1 gibibyte) |
Valid Values: | [12,…] |
Importance: | high |
Update Mode: | read-only |
metadata.log.segment.ms
转出新元数据日志文件之前的最长时间(以毫秒为单位)。
Type: | long |
---|---|
Default: | 604800000 (7 days) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
metadata.max.retention.bytes
The maximum combined size of the metadata log and snapshots before deleting old snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.
Type: | long |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
metadata.max.retention.ms
The number of milliseconds to keep a metadata log file or snapshot before deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.
Type: | long |
---|---|
Default: | 604800000 (7 days) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
min.insync.replicas
When a producer sets acks to “all” (or “-1”), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
node.id
The node ID associated with the roles this process is playing when process.roles
is non-empty. Every node in a KRaft cluster must have a unique node.id
, this includes broker and controller nodes. This is required configuration when running in KRaft mode.
Type: | int |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
num.io.threads
The number of threads that the server uses for processing requests, which may include disk I/O
Type: | int |
---|---|
Default: | 8 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
num.network.threads
The number of threads that the server uses for receiving requests from the network and sending responses to the network
Type: | int |
---|---|
Default: | 3 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
num.recovery.threads.per.data.dir
The number of threads per data directory to be used for log recovery at startup and flushing at shutdown
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | cluster-wide |
num.replica.alter.log.dirs.threads
The number of threads that can move replicas between log directories, which may include disk I/O
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
num.replica.fetchers
Number of fetcher threads used to replicate records from each source broker. The total number of fetchers on each broker is bound by num.replica.fetchers
multiplied by the number of brokers in the cluster.Increasing this value can increase the degree of I/O parallelism in the follower and leader broker at the cost of higher CPU and memory utilization.
Type: | int |
---|---|
Default: | 1 |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
offset.metadata.max.bytes
The maximum size for a metadata entry associated with an offset commit
Type: | int |
---|---|
Default: | 4096 (4 kibibytes) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
offsets.commit.required.acks
The required acks before the commit can be accepted. In general, the default (-1) should not be overridden
Type: | short |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
offsets.commit.timeout.ms
Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.
Type: | int |
---|---|
Default: | 5000 (5 seconds) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.load.buffer.size
Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large).
Type: | int |
---|---|
Default: | 5242880 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.retention.check.interval.ms
Frequency at which to check for stale offsets
Type: | long |
---|---|
Default: | 600000 (10 minutes) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.retention.minutes
After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.
Type: | int |
---|---|
Default: | 10080 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.topic.compression.codec
Compression codec for the offsets topic - compression may be used to achieve “atomic” commits
Type: | int |
---|---|
Default: | 0 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
offsets.topic.num.partitions
The number of partitions for the offset commit topic (should not change after deployment)
Type: | int |
---|---|
Default: | 50 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.topic.replication.factor
The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
Type: | short |
---|---|
Default: | 3 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
offsets.topic.segment.bytes
The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads
Type: | int |
---|---|
Default: | 104857600 (100 mebibytes) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
process.roles
The roles that this process plays: ‘broker’, ‘controller’, or ‘broker,controller’ if it is both. This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for Zookeeper clusters.
Type: | list |
---|---|
Default: | “” |
Valid Values: | [broker, controller] |
Importance: | high |
Update Mode: | read-only |
queued.max.requests
The number of queued requests allowed for data-plane, before blocking the network threads
Type: | int |
---|---|
Default: | 500 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
replica.fetch.min.bytes
Minimum bytes expected for each fetch response. If not enough bytes, wait up to replica.fetch.wait.max.ms
(broker config).
Type: | int |
---|---|
Default: | 1 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
replica.fetch.wait.max.ms
The maximum wait time for each fetcher request issued by follower replicas. This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics
Type: | int |
---|---|
Default: | 500 |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
replica.high.watermark.checkpoint.interval.ms
The frequency with which the high watermark is saved out to disk
Type: | long |
---|---|
Default: | 5000 (5 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
replica.lag.time.max.ms
If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr
Type: | long |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
replica.socket.receive.buffer.bytes
The socket receive buffer for network requests
Type: | int |
---|---|
Default: | 65536 (64 kibibytes) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
replica.socket.timeout.ms
The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms
Type: | int |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
request.timeout.ms
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: | int |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
sasl.mechanism.controller.protocol
SASL mechanism used for communication with controllers. Default is GSSAPI.
Type: | string |
---|---|
Default: | GSSAPI |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
socket.receive.buffer.bytes
The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.
Type: | int |
---|---|
Default: | 102400 (100 kibibytes) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
socket.request.max.bytes
The maximum number of bytes in a socket request
Type: | int |
---|---|
Default: | 104857600 (100 mebibytes) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
socket.send.buffer.bytes
The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.
Type: | int |
---|---|
Default: | 102400 (100 kibibytes) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
transaction.max.timeout.ms
The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
Type: | int |
---|---|
Default: | 900000 (15 minutes) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transaction.state.log.load.buffer.size
Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large).
Type: | int |
---|---|
Default: | 5242880 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transaction.state.log.min.isr
Overridden min.insync.replicas config for the transaction topic.
Type: | int |
---|---|
Default: | 2 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transaction.state.log.num.partitions
The number of partitions for the transaction topic (should not change after deployment).
Type: | int |
---|---|
Default: | 50 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transaction.state.log.replication.factor
The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
Type: | short |
---|---|
Default: | 3 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transaction.state.log.segment.bytes
The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads
Type: | int |
---|---|
Default: | 104857600 (100 mebibytes) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
transactional.id.expiration.ms
The time in ms that the transaction coordinator will wait without receiving any transaction status updates for the current transaction before expiring its transactional id. This setting also influences producer id expiration - producer ids are expired once this time has elapsed after the last write with the given producer id. Note that producer ids may expire sooner if the last write from the producer id is deleted due to the topic’s retention settings.
Type: | int |
---|---|
Default: | 604800000 (7 days) |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
unclean.leader.election.enable
Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | high |
Update Mode: | cluster-wide |
zookeeper.connect
Specifies the ZooKeeper connection string in the form hostname:port
where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3
.
The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path
you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
zookeeper.connection.timeout.ms
The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
zookeeper.max.in.flight.requests
The maximum number of unacknowledged requests the client will send to Zookeeper before blocking.
Type: | int |
---|---|
Default: | 10 |
Valid Values: | [1,…] |
Importance: | high |
Update Mode: | read-only |
zookeeper.session.timeout.ms
Zookeeper session timeout
Type: | int |
---|---|
Default: | 18000 (18 seconds) |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
zookeeper.set.acl
Set client to use secure ACLs
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | high |
Update Mode: | read-only |
broker.heartbeat.interval.ms
The length of time in milliseconds between broker heartbeats. Used when running in KRaft mode.
Type: | int |
---|---|
Default: | 2000 (2 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
broker.id.generation.enable
Enable automatic broker id generation on the server. When enabled the value configured for reserved.broker.max.id should be reviewed.
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
broker.rack
Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: RACK1
, us-east-1d
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
broker.session.timeout.ms
The length of time in milliseconds that a broker lease lasts if no heartbeats are made. Used when running in KRaft mode.
Type: | int |
---|---|
Default: | 9000 (9 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
connections.max.idle.ms
Idle connections timeout: the server socket processor threads close the connections that idle more than this
Type: | long |
---|---|
Default: | 600000 (10 minutes) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
connections.max.reauth.ms
When explicitly set to a positive number (the default is 0, not a positive number), a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000
Type: | long |
---|---|
Default: | 0 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controlled.shutdown.enable
Enable controlled shutdown of the server
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controlled.shutdown.max.retries
Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens
Type: | int |
---|---|
Default: | 3 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controlled.shutdown.retry.backoff.ms
Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying.
Type: | long |
---|---|
Default: | 5000 (5 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controller.quorum.append.linger.ms
The duration in milliseconds that the leader will wait for writes to accumulate before flushing them to disk.
Type: | int |
---|---|
Default: | 25 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controller.quorum.request.timeout.ms
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: | int |
---|---|
Default: | 2000 (2 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
controller.socket.timeout.ms
The socket timeout for controller-to-broker channels
Type: | int |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
default.replication.factor
The default replication factors for automatically created topics
Type: | int |
---|---|
Default: | 1 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
delegation.token.expiry.time.ms
The token validity time in miliseconds before the token needs to be renewed. Default value 1 day.
Type: | long |
---|---|
Default: | 86400000 (1 day) |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
delegation.token.master.key
DEPRECATED: An alias for delegation.token.secret.key, which should be used instead of this config.
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
delegation.token.max.lifetime.ms
The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days.
Type: | long |
---|---|
Default: | 604800000 (7 days) |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
delegation.token.secret.key
Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. If the key is not set or set to empty string, brokers will disable the delegation token support.
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
delete.records.purgatory.purge.interval.requests
The purge interval (in number of requests) of the delete records request purgatory
Type: | int |
---|---|
Default: | 1 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
fetch.max.bytes
The maximum number of bytes we will return for a fetch request. Must be at least 1024.
Type: | int |
---|---|
Default: | 57671680 (55 mebibytes) |
Valid Values: | [1024,…] |
Importance: | medium |
Update Mode: | read-only |
fetch.purgatory.purge.interval.requests
The purge interval (in number of requests) of the fetch request purgatory
Type: | int |
---|---|
Default: | 1000 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
group.initial.rebalance.delay.ms
The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.
Type: | int |
---|---|
Default: | 3000 (3 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
group.max.session.timeout.ms
The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
Type: | int |
---|---|
Default: | 1800000 (30 minutes) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
group.max.size
The maximum number of consumers that a single consumer group can accommodate.
Type: | int |
---|---|
Default: | 2147483647 |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
group.min.session.timeout.ms
The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources.
Type: | int |
---|---|
Default: | 6000 (6 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
initial.broker.registration.timeout.ms
用于代理之间通信的侦听器的名称。如果未设置此属性,监听程序名称由security.inter.broker.protocol定义。同时设置此属性和security.inter.broker.protocol属性是错误的。
Type: | int |
---|---|
Default: | 60000 (1 minute) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
inter.broker.listener.name
Name of listener used for communication between brokers. If this is unset, the listener name is defined by security.inter.broker.protocol. It is an error to set this and security.inter.broker.protocol properties at the same time.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
inter.broker.protocol.version
指定将使用哪个版本的代理间协议。
这通常是在所有代理升级到新版本后发生的。
一些有效值的示例有:0.8.0、0.8.1、0.8.1.1、0.8.2、0.8.2.0、0.8.2.1、0.9.0.0、0.9.0.1检查元数据版本以获取完整列表。
Type: | string |
---|---|
Default: | 3.3-IV3 |
Valid Values: | [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2-IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2-IV0, 2.2-IV1, 2.3-IV0, 2.3-IV1, 2.4-IV0, 2.4-IV1, 2.5-IV0, 2.6-IV0, 2.7-IV0, 2.7-IV1, 2.7-IV2, 2.8-IV0, 2.8-IV1, 3.0-IV0, 3.0-IV1, 3.1-IV0, 3.2-IV0, 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3] |
Importance: | medium |
Update Mode: | read-only |
log.cleaner.backoff.ms
没有日志要清理时的睡眠时间
Type: | long |
---|---|
Default: | 15000 (15 seconds) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.dedupe.buffer.size
The total memory used for log deduplication across all cleaner threads
Type: | long |
---|---|
Default: | 134217728 |
Valid Values: | |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.delete.retention.ms
The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).
Type: | long |
---|---|
Default: | 86400000 (1 day) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.enable
Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
log.cleaner.io.buffer.load.factor
Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value will allow more log to be cleaned at once but will lead to more hash collisions
Type: | double |
---|---|
Default: | 0.9 |
Valid Values: | |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.io.buffer.size
The total memory used for log cleaner I/O buffers across all cleaner threads
Type: | int |
---|---|
Default: | 524288 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.io.max.bytes.per.second
The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average
Type: | double |
---|---|
Default: | 1.7976931348623157E308 |
Valid Values: | |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.max.compaction.lag.ms
The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.
Type: | long |
---|---|
Default: | 9223372036854775807 |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.min.cleanable.ratio
The minimum ratio of dirty log to total log for a log to eligible for cleaning. If the log.cleaner.max.compaction.lag.ms or the log.cleaner.min.compaction.lag.ms configurations are also specified, then the log compactor considers the log eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the log.cleaner.min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the log.cleaner.max.compaction.lag.ms period.
Type: | double |
---|---|
Default: | 0.5 |
Valid Values: | [0,…,1] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.min.compaction.lag.ms
The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
Type: | long |
---|---|
Default: | 0 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleaner.threads
The number of background threads to use for log cleaning
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.cleanup.policy
The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: “delete” and “compact”
Type: | list |
---|---|
Default: | delete |
Valid Values: | [compact, delete] |
Importance: | medium |
Update Mode: | cluster-wide |
log.index.interval.bytes
The interval with which we add an entry to the offset index
Type: | int |
---|---|
Default: | 4096 (4 kibibytes) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.index.size.max.bytes
The maximum size in bytes of the offset index
Type: | int |
---|---|
Default: | 10485760 (10 mebibytes) |
Valid Values: | [4,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.message.format.version
Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check MetadataVersion for more details. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don’t understand.
Type: | string |
---|---|
Default: | 3.0-IV1 |
Valid Values: | [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2-IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2-IV0, 2.2-IV1, 2.3-IV0, 2.3-IV1, 2.4-IV0, 2.4-IV1, 2.5-IV0, 2.6-IV0, 2.7-IV0, 2.7-IV1, 2.7-IV2, 2.8-IV0, 2.8-IV1, 3.0-IV0, 3.0-IV1, 3.1-IV0, 3.2-IV0, 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3] |
Importance: | medium |
Update Mode: | read-only |
log.message.timestamp.difference.max.ms
The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling.
Type: | long |
---|---|
Default: | 9223372036854775807 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
log.message.timestamp.type
Define whether the timestamp in the message is message create time or log append time. The value should be either CreateTime
or LogAppendTime
Type: | string |
---|---|
Default: | CreateTime |
Valid Values: | [CreateTime, LogAppendTime] |
Importance: | medium |
Update Mode: | cluster-wide |
log.preallocate
Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | medium |
Update Mode: | cluster-wide |
log.retention.check.interval.ms
The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
Type: | long |
---|---|
Default: | 300000 (5 minutes) |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
max.connection.creation.rate
The maximum connection creation rate we allow in the broker at any time. Listener-level limits may also be configured by prefixing the config name with the listener prefix, for example, listener.name.internal.max.connection.creation.rate
.Broker-wide connection rate limit should be configured based on broker capacity while listener limits should be configured based on application requirements. New connections will be throttled if either the listener or the broker limit is reached, with the exception of inter-broker listener. Connections on the inter-broker listener will be throttled only when the listener-level rate limit is reached.
Type: | int |
---|---|
Default: | 2147483647 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
max.connections
The maximum number of connections we allow in the broker at any time. This limit is applied in addition to any per-ip limits configured using max.connections.per.ip. Listener-level limits may also be configured by prefixing the config name with the listener prefix, for example, listener.name.internal.max.connections
. Broker-wide limit should be configured based on broker capacity while listener limits should be configured based on application requirements. New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case.
Type: | int |
---|---|
Default: | 2147483647 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
max.connections.per.ip
The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides configured using max.connections.per.ip.overrides property. New connections from the ip address are dropped if the limit is reached.
Type: | int |
---|---|
Default: | 2147483647 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | cluster-wide |
max.connections.per.ip.overrides
A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is “hostName:100,127.0.0.1:200”
Type: | string |
---|---|
Default: | “” |
Valid Values: | |
Importance: | medium |
Update Mode: | cluster-wide |
max.incremental.fetch.session.cache.slots
The maximum number of incremental fetch sessions that we will maintain.
Type: | int |
---|---|
Default: | 1000 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | read-only |
num.partitions
The default number of log partitions per topic
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
password.encoder.old.secret
The old secret that was used for encoding dynamically configured passwords. This is required only when the secret is updated. If specified, all dynamically encoded passwords are decoded using this old secret and re-encoded using password.encoder.secret when broker starts up.
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
password.encoder.secret
The secret used for encoding dynamically configured passwords for this broker.
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
principal.builder.class
The fully qualified name of a class that implements the KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during authorization. If no principal builder is defined, the default behavior depends on the security protocol in use. For SSL authentication, the principal will be derived using the rules defined by ssl.principal.mapping.rules
applied on the distinguished name from the client certificate if one is provided; otherwise, if client authentication is not required, the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the rules defined by sasl.kerberos.principal.to.local.rules
if GSSAPI is in use, and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.
Type: | class |
---|---|
Default: | org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
producer.purgatory.purge.interval.requests
The purge interval (in number of requests) of the producer request purgatory
Type: | int |
---|---|
Default: | 1000 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
queued.max.request.bytes
The number of queued bytes allowed before no more requests are read
Type: | long |
---|---|
Default: | -1 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
replica.fetch.backoff.ms
The amount of time to sleep when fetch partition error occurs.
Type: | int |
---|---|
Default: | 1000 (1 second) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | read-only |
replica.fetch.max.bytes
The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via message.max.bytes
(broker config) or max.message.bytes
(topic config).
Type: | int |
---|---|
Default: | 1048576 (1 mebibyte) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | read-only |
replica.fetch.response.max.bytes
Maximum bytes expected for the entire fetch response. Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes
(broker config) or max.message.bytes
(topic config).
Type: | int |
---|---|
Default: | 10485760 (10 mebibytes) |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | read-only |
replica.selector.class
The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
reserved.broker.max.id
Max number that can be used for a broker.id
Type: | int |
---|---|
Default: | 1000 |
Valid Values: | [0,…] |
Importance: | medium |
Update Mode: | read-only |
sasl.client.callback.handler.class
The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.enabled.mechanisms
The list of SASL mechanisms enabled in the Kafka server. The list may contain any mechanism for which a security provider is available. Only GSSAPI is enabled by default.
Type: | list |
---|---|
Default: | GSSAPI |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.jaas.config
JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: loginModuleClass controlFlag (optionName=optionValue)*;
. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.kinit.cmd
Kerberos kinit command path.
Type: | string |
---|---|
Default: | /usr/bin/kinit |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.min.time.before.relogin
Login thread sleep time between refresh attempts.
Type: | long |
---|---|
Default: | 60000 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.principal.to.local.rules
A list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}. For more details on the format please see security authorization and acls. Note that this configuration is ignored if an extension of KafkaPrincipalBuilder is provided by the principal.builder.class
configuration.
Type: | list |
---|---|
Default: | DEFAULT |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.service.name
The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.ticket.renew.jitter
Percentage of random jitter added to the renewal time.
Type: | double |
---|---|
Default: | 0.05 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.kerberos.ticket.renew.window.factor
Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.
Type: | double |
---|---|
Default: | 0.8 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.login.callback.handler.class
The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.login.class
The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.login.refresh.buffer.seconds
The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.
Type: | short |
---|---|
Default: | 300 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.login.refresh.min.period.seconds
The desired minimum time for the login refresh thread to wait before refreshing a credential, in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.
Type: | short |
---|---|
Default: | 60 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.login.refresh.window.factor
Login refresh thread will sleep until the specified window factor relative to the credential’s lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER.
Type: | double |
---|---|
Default: | 0.8 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.login.refresh.window.jitter
The maximum amount of random jitter relative to the credential’s lifetime that is added to the login refresh thread’s sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.
Type: | double |
---|---|
Default: | 0.05 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.mechanism.inter.broker.protocol
SASL mechanism used for inter-broker communication. Default is GSSAPI.
Type: | string |
---|---|
Default: | GSSAPI |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
sasl.oauthbearer.jwks.endpoint.url
The OAuth/OIDC provider URL from which the provider’s JWKS (JSON Web Key Set) can be retrieved. The URL can be HTTP(S)-based or file-based. If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a “kid” header claim value that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received. If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a “kid” header value that isn’t in the JWKS file, the broker will reject the JWT and authentication will fail.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.oauthbearer.token.endpoint.url
The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer’s token endpoint URL to which requests will be made to login based on the configuration in sasl.jaas.config. If the URL is file-based, it specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.server.callback.handler.class
The fully qualified name of a SASL server callback handler class that implements the AuthenticateCallbackHandler interface. Server callback handlers must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler.
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
sasl.server.max.receive.size
The maximum receive size allowed before and during initial SASL authentication. Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice, PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.
Type: | int |
---|---|
Default: | 524288 |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
security.inter.broker.protocol
Security protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time.
Type: | string |
---|---|
Default: | PLAINTEXT |
Valid Values: | [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] |
Importance: | medium |
Update Mode: | read-only |
socket.connection.setup.timeout.max.ms
The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
Type: | long |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
socket.connection.setup.timeout.ms
The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.
Type: | long |
---|---|
Default: | 10000 (10 seconds) |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
socket.listen.backlog.size
The maximum number of pending connections on the socket. In Linux, you may also need to configure somaxconn
and tcp_max_syn_backlog
kernel parameters accordingly to make the configuration takes effect.
Type: | int |
---|---|
Default: | 50 |
Valid Values: | [1,…] |
Importance: | medium |
Update Mode: | read-only |
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
Type: | list |
---|---|
Default: | “” |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.client.auth
Configures kafka broker to request client authentication. The following settings are common:
ssl.client.auth=required
If set to required client authentication is required.ssl.client.auth=requested
This means client authentication is optional. unlike required, if this option is set client can choose not to provide authentication information about itselfssl.client.auth=none
This means client authentication is not needed.
Type: | string |
---|---|
Default: | none |
Valid Values: | [required, requested, none] |
Importance: | medium |
Update Mode: | per-broker |
ssl.enabled.protocols
The list of protocols enabled for SSL connections. The default is ‘TLSv1.2,TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the config documentation for ssl.protocol
.
Type: | list |
---|---|
Default: | TLSv1.2,TLSv1.3 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.key.password
The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key’.
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
Type: | string |
---|---|
Default: | SunX509 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keystore.certificate.chain
Certificate chain in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with a list of X.509 certificates
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keystore.key
Private key in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, key password must be specified using ‘ssl.key.password’
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keystore.password
密钥存储文件的存储密码。这对于客户端是可选的,只有在配置了“ssl.keystore.location”时才需要。PEM格式不支持密钥存储密码。
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.keystore.type
密钥存储文件的文件格式。这对客户端是可选的。默认“ssl.engine.factory.class”当前支持的值有[JKS,PKCS12,PEM]。
Type: | string |
---|---|
Default: | JKS |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.protocol
用于生成SSLContext的SSL协议。使用Java 11或更新版本运行时,默认为“TLSv1.3”,否则为“TLSv1.2”。对于大多数用例来说,这个值应该是合适的。最近的JVM中允许的值是“TLSv1.2”和“TLSv1.3”。TLS ‘,’ TLSv1.1 ‘,’ SSL ‘,’ SSLv2 ‘和’ SSLv3 '可能在较旧的JVM中受支持,但由于已知的安全漏洞,不鼓励使用它们。使用此配置和“ssl.enabled.protocols”的默认值,如果服务器不支持“TLSv1.3”,客户端将降级到“TLSv1.2”。如果此配置设置为“TLSv1.2”,客户端将不会使用“TLSv1.3”,即使它是ssl.enabled.protocols中的值之一,并且服务器仅支持“TLSv1.3”。
Type: | string |
---|---|
Default: | TLSv1.3 |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.provider
用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供者。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.trustmanager.algorithm
信任管理器工厂用于SSL连接的算法。默认值是为Java虚拟机配置的信任管理器工厂算法。
Type: | string |
---|---|
Default: | PKIX |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.truststore.certificates
“ssl.truststore.type”指定格式的可信证书。默认SSL引擎工厂仅支持带有X.509证书的PEM格式。
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.truststore.location
信任存储文件的位置。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.truststore.password
信任存储文件的密码。如果未设置密码,仍将使用配置的信任存储文件,但完整性检查将被禁用。PEM格式不支持信任存储密码。
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
ssl.truststore.type
信任存储文件的文件格式。默认“ssl.engine.factory.class”当前支持的值有[JKS,PKCS12,PEM]。
Type: | string |
---|---|
Default: | JKS |
Valid Values: | |
Importance: | medium |
Update Mode: | per-broker |
zookeeper.clientCnxnSocket
使用TLS连接到zookeeper时,通常设置为“org . Apache . ZooKeeper . clientcnxnsockettnetty”。覆盖通过同名的“zookeeper.clientCnxnSocket”系统属性设置的任何显式值。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.client.enable
设置客户端在连接到ZooKeeper时使用TLS。显式值会覆盖通过“zookeeper.client.secure”系统属性设置的任何值(注意不同的名称)。如果两者都未设置,则默认为false为true时,必须设置“zookeeper.clientCnxnSocket ”(通常设置为“org . Apache . zookeeper . clientcnxnsockettnetty ”);要设置的其他值可以包括“zookeeper.ssl.cipher.suites”、“zookeeper.ssl.crl.enable”、“zookeeper . SSL . enabled . protocols”、“zookeeper . SSL . endpoint . identificati on . algorithm”、“zookeeper . SSL . keystore . location”、“zookeeper . SSL . keystore . password”、“zookeeper.ssl.keystore.type”、“zookeeper.ssl.ocsp.enable”、“zookeeper.ssl.protocol”、“zookeeper . SSL . SSL . protocol
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.keystore.location
使用具有与ZooKeeper的TLS连接的客户端证书时的密钥库位置。覆盖通过“zookeeper . SSL . keystore . location”系统属性设置的任何显式值(注意camelCase)。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.keystore.password
使用具有与ZooKeeper的TLS连接的客户端证书时的密钥库密码。覆盖通过“zookeeper . SSL . keystore . password”系统属性设置的任何显式值(注意camelCase)。注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同;否则,连接Zookeeper的尝试将会失败。
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.keystore.type
使用客户端证书与ZooKeeper进行TLS连接时的密钥库类型。覆盖通过“zookeeper.ssl.keyStore.type”系统属性设置的任何显式值(注意camelCase)。默认值“null”表示将根据密钥库的文件扩展名自动检测类型。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.truststore.location
使用TLS连接到ZooKeeper时的信任库位置。覆盖通过“zookeeper . SSL . trust store . location”系统属性设置的任何显式值(注意camelCase)。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.truststore.password
使用TLS连接到ZooKeeper时的信任库密码。覆盖通过“zookeeper . SSL . trust store . password”系统属性设置的任何显式值(注意camelCase)。
Type: | password |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
zookeeper.ssl.truststore.type
使用TLS连接到ZooKeeper时的信任库类型。覆盖通过“zookeeper.ssl.trustStore.type”系统属性设置的任何显式值(注意camelCase)。默认值“null”表示将根据信任库的文件扩展名自动检测类型。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
Update Mode: | read-only |
alter.config.policy.class.name
应该用于验证的alter configs策略类。该类应实现“org . Apache . Kafka . server . policy . alterconfigpolicy”接口。
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
alter.log.dirs.replication.quota.window.num
为更改日志目录复制配额而保留在内存中的样本数
Type: | int |
---|---|
Default: | 11 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
alter.log.dirs.replication.quota.window.size.seconds
alter log dirs复制配额的每个样本的时间跨度
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
authorizer.class.name
实现“org . Apache . Kafka . server . author izer . author izer”接口的类的完全限定名,代理使用该接口进行授权。
Type: | string |
---|---|
Default: | “” |
Valid Values: | non-null string |
Importance: | low |
Update Mode: | read-only |
client.quota.callback.class
实现ClientQuotaCallback接口的类的完全限定名,该接口用于确定应用于客户端请求的配额限制。默认情况下,会应用存储在ZooKeeper中的< user >和< client-id >配额。对于任何给定的请求,将应用与会话的用户主体和请求的客户端id相匹配的最具体的配额。
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
connection.failed.authentication.delay.ms
身份验证失败时的连接关闭延迟:这是身份验证失败时连接关闭将延迟的时间(以毫秒为单位)。必须将其配置为小于connections.max.idle.ms,以防止连接超时。
Type: | int |
---|---|
Default: | 100 |
Valid Values: | [0,…] |
Importance: | low |
Update Mode: | read-only |
controller.quorum.retry.backoff.ms
在尝试重试给定主题分区的失败请求之前等待的时间。这避免了在某些失败场景下重复发送请求。
Type: | int |
---|---|
Default: | 20 |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
controller.quota.window.num
为控制器变异配额保留在内存中的样本数
Type: | int |
---|---|
Default: | 11 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
controller.quota.window.size.seconds
控制器突变配额的每个样本的时间跨度
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
create.topic.policy.class.name
应该用于验证的创建主题策略类。该类应实现“org . Apache . Kafka . server . policy . createtopicpolicy”接口。
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
delegation.token.expiry.check.interval.ms
删除过期委派令牌的扫描间隔。
Type: | long |
---|---|
Default: | 3600000 (1 hour) |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
kafka.metrics.polling.interval.secs
可以在kafka.metrics.reporters实现中使用的度量轮询间隔(秒)。
Type: | int |
---|---|
Default: | 10 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
kafka.metrics.reporters
用作Yammer度量自定义报告器的类列表。报告者应该实现“Kafka . metrics . kafkametricsreporter”特征。如果客户端希望在自定义报告器上公开JMX操作,自定义报告器需要额外实现一个扩展“Kafka . metrics . kafkametricsreportermbean”特征的MBean特征,以便注册的MBean符合标准MBean约定。
Type: | list |
---|---|
Default: | “” |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
listener.security.protocol.map
侦听器名称和安全协议之间的映射。这必须被定义为同一安全协议可用于多个端口或IP。例如,内部和外部流量可以分开,即使两者都需要SSL。具体地说,用户可以用名称INTERNAL和EXTERNAL来定义侦听器,这个属性为: INTERNAL:SSL,EXTERNAL:SSL
。如图所示,键和值由冒号分隔,映射条目由逗号分隔。每个侦听器名称在映射中应该只出现一次。通过向配置名添加规范化前缀(监听程序名为小写),可以为每个监听程序配置不同的安全(SSL和SASL)设置。例如,要为内部侦听器设置不同的密钥库,可以设置名为“listener . name . INTERNAL . SSL . keystore . location”的配置。如果未设置监听程序名称的配置,该配置将回退到通用配置(即“ssl.keystore.location”)。注意,在KRaft中,如果没有提供显式映射并且没有使用其他安全协议,则假定从由“controller.listener.names”定义的监听器名称到明文的默认映射。
Type: | string |
---|---|
Default: | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL |
Valid Values: | |
Importance: | low |
Update Mode: | per-broker |
log.message.downconversion.enable
此配置控制是否启用消息格式的下转换来满足消费请求。当设置为“false”时,broker不会为期望旧消息格式的使用者执行向下转换。对于来自此类旧客户端的消费请求,代理以“不支持的版本”错误进行响应。此配置不适用于复制到追随者可能需要的任何消息格式转换。
Type: | boolean |
---|---|
Default: | true |
Valid Values: | |
Importance: | low |
Update Mode: | cluster-wide |
metadata.max.idle.interval.ms
此配置控制活动控制器将无操作记录写入元数据分区的频率。如果该值为0,则不会将无操作记录追加到元数据分区中。默认值为500
Type: | int |
---|---|
Default: | 500 |
Valid Values: | [0,…] |
Importance: | low |
Update Mode: | read-only |
metric.reporters
用作度量报告者的类列表。实现“org . Apache . Kafka . common . metrics . metrics reporter”接口允许插入将被通知新度量创建的类。JmxReporter总是包含在注册JMX统计信息中。
Type: | list |
---|---|
Default: | “” |
Valid Values: | |
Importance: | low |
Update Mode: | cluster-wide |
metrics.num.samples
为计算指标而维护的样本数。
Type: | int |
---|---|
Default: | 2 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
metrics.recording.level
指标的最高记录级别。
Type: | string |
---|---|
Default: | INFO |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
metrics.sample.window.ms
计算度量样本的时间窗口。
Type: | long |
---|---|
Default: | 30000 (30 seconds) |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
password.encoder.cipher.algorithm
用于编码动态配置密码的密码算法。
Type: | string |
---|---|
Default: | AES/CBC/PKCS5Padding |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
password.encoder.iterations
用于编码动态配置密码的迭代计数。
Type: | int |
---|---|
Default: | 4096 |
Valid Values: | [1024,…] |
Importance: | low |
Update Mode: | read-only |
password.encoder.key.length
用于对动态配置的密码进行编码的密钥长度。
Type: | int |
---|---|
Default: | 128 |
Valid Values: | [8,…] |
Importance: | low |
Update Mode: | read-only |
password.encoder.keyfactory.algorithm
用于对动态配置的密码进行编码的SecretKeyFactory算法。如果可用,默认为pbk df 2 with macsha 512,否则为pbk df 2 with macsha 1。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
quota.window.num
为客户端配额保留在内存中的样本数
Type: | int |
---|---|
Default: | 11 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
quota.window.size.seconds
客户端配额的每个样本的时间跨度
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
replication.quota.window.num
为复制配额保留在内存中的样本数
Type: | int |
---|---|
Default: | 11 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
replication.quota.window.size.seconds
复制配额的每个样本的时间跨度
Type: | int |
---|---|
Default: | 1 |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
sasl.login.connect.timeout.ms
外部身份验证提供程序连接超时的(可选)值,以毫秒为单位。目前仅适用于OAUTHBEARER。
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.login.read.timeout.ms
外部身份验证提供程序读取超时的(可选)值,以毫秒为单位。目前仅适用于OAUTHBEARER。
Type: | int |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.login.retry.backoff.max.ms
(可选)外部身份验证提供程序登录尝试之间的最大等待时间值,以毫秒为单位。Login使用指数回退算法,其初始等待基于sasl.login.retry.backoff.ms设置,并且在两次尝试之间的等待时间将加倍,直到达到sasl . log in . retry . back off . max . ms设置指定的最大等待时间。目前仅适用于OAUTHBEARER。
Type: | long |
---|---|
Default: | 10000 (10 seconds) |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.login.retry.backoff.ms
两次尝试登录外部身份验证提供程序之间的初始等待(可选)值,以毫秒为单位。Login使用指数回退算法,其初始等待基于sasl.login.retry.backoff.ms设置,并且在两次尝试之间的等待时间将加倍,直到达到sasl . log in . retry . back off . max . ms设置指定的最大等待时间。目前仅适用于OAUTHBEARER。
Type: | long |
---|---|
Default: | 100 |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.clock.skew.seconds
以秒为单位的(可选)值,允许OAuth/OIDC身份提供者和代理之间的时间差。
Type: | int |
---|---|
Default: | 30 |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.expected.audience
(可选)以逗号分隔的设置,代理使用该设置来验证JWT是否是为预期受众之一发布的。将检查JWT的标准OAuth“aud”索赔,如果设置了该值,代理将匹配JWT的“aud”索赔的值,以查看是否存在精确匹配。如果不匹配,代理将拒绝JWT,身份验证将失败。
Type: | list |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.expected.issuer
代理用于验证JWT是否由预期发行者创建的(可选)设置。将检查JWT的标准OAuth“iss”索赔,如果设置了该值,代理将将其与JWT的“ISS”索赔完全匹配。如果不匹配,代理将拒绝JWT,身份验证将失败。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.jwks.endpoint.refresh.ms
代理在刷新其JWKS (JSON Web密钥集)缓存(包含用于验证JWT签名的密钥)之间等待的(可选)值(毫秒)。
Type: | long |
---|---|
Default: | 3600000 (1 hour) |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
从外部身份验证提供程序检索JWKS (JSON Web密钥集)的两次尝试之间的最大等待时间值(可选),以毫秒为单位。JWKS检索使用指数回退算法,初始等待基于sasl . oauth bearer . JWKS . endpoint . retry . back off . ms设置,两次尝试之间的等待时间将加倍,直到达到sasl . oauth bearer . JWKS . endpoint . retry . back off . max . ms设置指定的最大等待时间。
Type: | long |
---|---|
Default: | 10000 (10 seconds) |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
来自外部身份验证提供程序的JWKS (JSON Web密钥集)检索尝试之间的初始等待值(可选),以毫秒为单位。JWKS检索使用指数回退算法,初始等待基于sasl . oauth bearer . JWKS . endpoint . retry . back off . ms设置,两次尝试之间的等待时间将加倍,直到达到sasl . oauth bearer . JWKS . endpoint . retry . back off . max . ms设置指定的最大等待时间。
Type: | long |
---|---|
Default: | 100 |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.scope.claim.name
作用域的OAuth声明通常被命名为“scope”,但是如果OAuth/OIDC提供者为该声明使用不同的名称,则该(可选)设置可以为JWT有效负载的声明中包括的作用域提供不同的名称。
Type: | string |
---|---|
Default: | scope |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
sasl.oauthbearer.sub.claim.name
主题的OAuth声明通常被命名为“sub ”,但是如果OAuth/OIDC提供者为该声明使用不同的名称,则该(可选)设置可以为包括在JWT有效负载的声明中的主题提供不同的名称。
Type: | string |
---|---|
Default: | sub |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
security.providers
可配置的创建者类列表,每个类返回一个实现安全算法的提供者。这些类应实现“org . Apache . Kafka . common . security . auth . security provider creator”接口。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
ssl.endpoint.identification.algorithm
使用服务器证书验证服务器主机名的端点识别算法。
Type: | string |
---|---|
Default: | https |
Valid Values: | |
Importance: | low |
Update Mode: | per-broker |
ssl.engine.factory.class
提供SSLEngine对象的org . Apache . Kafka . common . security . auth . sslenginefactory类型的类。默认值为org . Apache . Kafka . common . security . SSL .
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | per-broker |
ssl.principal.mapping.rules
从客户端证书的可分辨名称映射到简称的规则列表。规则按顺序进行评估,第一个匹配主体名称的规则用于将其映射到一个简称。列表中任何后面的规则都将被忽略。默认情况下,X.500证书的可分辨名称将是主体。有关格式的更多详细信息,请参见[安全授权和ACL](https://Kafka . Apache . org/documentation/# security _ authz)。请注意,如果“principal.builder.class”配置提供了KafkaPrincipalBuilder的扩展,则会忽略此配置。
Type: | string |
---|---|
Default: | DEFAULT |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
ssl.secure.random.implementation
用于SSL加密操作的SecureRandom PRNG实现。
Type: | string |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | per-broker |
transaction.abort.timed.out.transaction.cleanup.interval.ms
回滚已超时事务的时间间隔
Type: | int |
---|---|
Default: | 10000 (10 seconds) |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
transaction.remove.expired.transaction.cleanup.interval.ms
删除因“transactional.id.expiration.ms”通过而过期的事务的时间间隔
Type: | int |
---|---|
Default: | 3600000 (1 hour) |
Valid Values: | [1,…] |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.cipher.suites
指定要在ZooKeeper TLS协商(csv)中使用的已启用密码套件。覆盖通过“zookeeper.ssl.ciphersuites”系统属性设置的任何显式值(请注意单词“ciphersuites”)。缺省值“null”意味着启用的密码套件列表由正在使用的Java运行时决定。
Type: | list |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.crl.enable
指定是否在ZooKeeper TLS协议中启用证书吊销列表。覆盖通过“zookeeper.ssl.crl”系统属性设置的任何显式值(注意较短的名称)。
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.enabled.protocols
指定ZooKeeper TLS协商(csv)中启用的协议。覆盖通过“zookeeper.ssl.enabledProtocols”系统属性设置的任何显式值(注意camelCase)。默认值“null”意味着启用的协议将是“zookeeper.ssl.protocol”配置属性的值。
Type: | list |
---|---|
Default: | null |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.endpoint.identification.algorithm
指定是否在ZooKeeper TLS协商过程中启用主机名验证,其中(不区分大小写)“https”表示启用ZooKeeper主机名验证,显式空白值表示禁用(仅出于测试目的建议禁用)。显式值覆盖通过“zookeeper . SSL . hostname verification”系统属性设置的任何“true”或“false”值(注意不同的名称和值;true表示https,false表示空白)。
Type: | string |
---|---|
Default: | HTTPS |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.ocsp.enable
指定是否在ZooKeeper TLS协议中启用在线证书状态协议。覆盖通过“zookeeper.ssl.ocsp”系统属性设置的任何显式值(注意较短的名称)
Type: | boolean |
---|---|
Default: | false |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
zookeeper.ssl.protocol
指定ZooKeeper TLS协商中使用的协议。显式值会覆盖通过同名的“zookeeper.ssl.protocol”系统属性设置的任何值。
Type: | string |
---|---|
Default: | TLSv1.2 |
Valid Values: | |
Importance: | low |
Update Mode: | read-only |
关于代理配置的更多细节可以在scala类中找到kafka.server.KafkaConfig
3.1.1更新代理配置
从某些broker版本开始,无需重新启动Kafk1。见
Dynamic Update Mode
列输入 代理配置
对于每个代理配置的更新模式。
read-only
:需要重新启动代理才能进行更新per-broker
:可以为每个代理动态更新cluster-wide
:可以作为群集范围的默认值动态更新。也可以更新为每个代理的值以进行测试。
要更改代理id 0的当前代理配置(例如,日志清理线程的数量):
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2
要描述代理id 0的当前动态代理配置,请执行以下操作:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
要删除配置重写并还原为代理id 0的静态配置值或默认值(例如,日志清理器线程数):
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads
有些配置可以配置为集群范围内的默认值,以在整个集群中保持一致的值。群集中的所有代理将处理群集默认更新。例如,要更新所有代理上的日志清理器线程,请执行以下操作:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2
要描述当前配置的动态群集范围的默认配置,请执行以下操作:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe
在集群级别可配置的所有配置也可以在每个代理级别进行配置(例如,用于测试)。如果在不同级别定义配置值,则使用以下优先顺序:
- 每个代理的动态配置
- 动态群集范围的默认配置
- 静态代理配置自
server.properties
- 卡夫卡违约,参见 代理配置
动态配置作为集群元数据存储在Kafka中。在ZooKeeper模式下,动态配置存储在ZooKeeper中。在KRaft模式下,动态配置作为记录存储在元数据日志中。
动态更新密码配置(仅限ZooKeeper)
动态更新的密码配置值在存储到ZooKeeper之前会加密。代理配置password.encoder.secret
必须在中配置服务器.属性
启用密码配置的动态更新。不同经纪人的秘密可能不同。
用于密码编码的密码可以随着代理的滚动重新启动而轮换。必须在静态代理配置中提供ZooKeeper中当前用于编码密码的旧密码password.encoder.old.secret
新的秘密必须在密码.encoder.secret
。当代理启动时,存储在ZooKeeper中的所有动态密码配置都将用新密码重新编码。
在Kafka 1.1.x中,在使用更新配置时,必须在每个alter请求中提供所有动态更新的密码配置kafka-configs.sh
即使密码配置没有被更改。此约束将在将来的版本中删除。
启动代理之前更新ZooKeeper中的密码配置
从卡夫卡2.0.0开始,
kafka-configs.sh
启用在启动代理进行引导之前使用ZooKeeper更新动态代理配置。这将使所有密码配置都以加密形式存储,从而避免了在中需要清除密码
服务器.属性
。代理配置
password.encoder.secret
如果alter命令中包含任何密码配置,则还必须指定。还可以指定其他加密参数。密码编码器配置将不会保存在ZooKeeper中。例如,为侦听器存储SSL密钥密码
内部
在代理0上:
> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
配置
listener.name.internal.ssl.key.password
将使用提供的编码器配置以加密形式保存在ZooKeeper中。编码器机密和迭代不会在ZooKeeper中持久化。
更新现有侦听器的SSL密钥库
代理可以配置具有较短有效期的SSL密钥库,以降低证书受损的风险。密钥库可以动态更新,而无需重新启动代理。配置名称必须以侦听器前缀作为前缀
listener.name.{listenerName}.
这样只更新特定侦听器的密钥库配置。以下配置可以在每个代理级别的单个alter请求中更新:
ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password
如果侦听器是代理间侦听器,则仅当为该侦听器配置的信任库信任新密钥库时,才允许更新。对于其他侦听器,代理不对密钥库执行信任验证。证书必须由签署旧证书的同一证书颁发机构签名,以避免任何客户端身份验证失败。
更新现有侦听器的SSL信任库
代理信任库可以动态更新,而无需重新启动代理来添加或删除证书。更新的信任库将用于验证新的客户端连接。配置名称必须以侦听器前缀作为前缀
listener.name.{listenerName}.
这样,只有特定侦听器的信任库配置才会更新。以下配置可以在每个代理级别的单个alter请求中更新:
ssl.truststore.type
ssl.truststore.location
ssl.truststore.password
如果侦听器是代理间侦听器,则仅当新信任库信任该侦听器的现有密钥库时,才允许更新。对于其他侦听器,代理在更新之前不执行信任验证。从新信任库中删除用于签署客户端证书的CA证书可能会导致客户端身份验证失败。
更新默认主题配置
代理使用的默认主题配置选项可以在不重新启动代理的情况下进行更新。这些配置应用于主题,而不使用与每个主题配置等效的主题配置重写。这些配置中的一个或多个可能在所有代理使用的群集默认级别被覆盖。
log.segment.bytes
log.roll.ms
log.roll.hours
log.roll.jitter.ms
log.roll.jitter.hours
log.index.size.max.bytes
log.flush.interval.messages
log.flush.interval.ms
log.retention.bytes
log.retention.ms
log.retention.minutes
log.retention.hours
log.index.interval.bytes
log.cleaner.delete.retention.ms
log.cleaner.min.compaction.lag.ms
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleanup.policy
log.segment.delete.delay.ms
unclean.leader.election.enable
min.insync.replicas
max.message.bytes
compression.type
log.preallocate
log.message.timestamp.type
log.message.timestamp.difference.max.ms
从Kafka 2.0.0版开始,当配置时,控制器会自动启用不干净的领导人选举
不干净。领导。选举。启用
动态更新。在Kafka版本1.1.x中,更改为
unclean.leader.election.enable
只有在选出新的控制者时才生效。在ZooKeeper模式下,可以通过移除控制器的ZNode来强制重新选择控制器。这是通过使用
zookeeper-shell.sh
“bin”目录中包含的实用程序。
> bin/zookeeper-shell.sh localhostrmr /controller
在KRaft模式下,强制控制器重新选择的方法是终止活动控制器节点。由于KRaft控制器不托管分区,所以重启速度通常非常快。
更新清理程序配置日志
日志清理器配置可以在所有代理使用的群集默认级别上动态更新。这些更改将在日志清理的下一个迭代中生效。这些配置中的一个或多个可以更新:
log.cleaner.threads
log.cleaner.io.max.bytes.per.second
log.cleaner.dedupe.buffer.size
log.cleaner.io.buffer.size
log.cleaner.io.buffer.load.factor
log.cleaner.backoff.ms
更新配置线程
代理使用的各种线程池的大小可以在所有代理使用的集群默认级别上动态更新。更新仅限于范围
currentSize / 2
到
电流大小*2
以确保配置更新得到妥善处理。
num.network.threads
num.io.threads
num.replica.fetchers
num.recovery.threads.per.data.dir
log.cleaner.threads
background.threads
正在更新ConnectionQuota配置
代理对给定IP/主机允许的最大连接数可以在所有代理使用的群集默认级别上动态更新。这些更改将应用于新的连接创建,现有的连接计数将被新的限制考虑在内。
max.connections.per.ip
max.connections.per.ip.overrides
添加和删除侦听器
可以动态添加或删除侦听器。添加新侦听器时,必须提供侦听器的安全配置,因为侦听器配置带有侦听器前缀listener.name.{listenerName}.
。如果新侦听器使用SASL,则必须使用JAAS配置属性提供侦听器的JAAS配置 sasl.jaas.config文件
带有侦听器和机制前缀。看到了吗Kafka代理的JAAS配置了解详情
在Kafka版本1.1.x中,代理间侦听器使用的侦听器可能不会动态更新。要将代理间侦听器更新为新侦听器,可以在所有代理上添加新侦听器,而无需重新启动代理。然后需要滚动重新启动才能更新inter.broker.listener.name
.
除了新侦听器的所有安全配置外,以下配置可能在每个代理级别上动态更新:
listeners
advertised.listeners
listener.security.protocol.map
代理间侦听器必须使用静态代理配置进行配置
inter.broker.listener.name
或
security.inter.broker.protocol
.
3.2 Topic Configs
与主题相关的配置既有服务器默认值,也有可选的每个主题覆盖。如果没有为每个主题指定配置,则使用服务器默认配置。可以在主题创建时通过提供一个或多个
--config
选项。此示例创建一个名为
我的主题
使用自定义的最大邮件大小和刷新率:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
以后还可以使用alterconfigs命令更改或设置覆盖。此示例更新的最大邮件大小
我的主题:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --add-config max.message.bytes=128000
要检查在主题上设置的覆盖,可以执行以下操作
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
要删除覆盖,可以执行以下操作
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --delete-config max.message.bytes
以下是主题配置。服务器对此属性的默认配置在服务器默认属性标题下给出。给定的服务器默认配置值仅适用于没有显式主题配置重写的主题。
-
cleanup.policy
This config designates the retention policy to use on log segments. The “delete” policy (which is the default) will discard old segments when their retention time or size limit has been reached. The “compact” policy will enable log compaction, which retains the latest value for each key. It is also possible to specify both policies in a comma-separated list (e.g. “delete,compact”). In this case, old segments will be discarded per the retention time and size configuration, while retained segments will be compacted.
Type: list Default: delete Valid Values: [compact, delete] Server Default Property: log.cleanup.policy Importance: medium -
compression.type
Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘uncompressed’ which is equivalent to no compression; and ‘producer’ which means retain the original compression codec set by the producer.
Type: string Default: producer Valid Values: [uncompressed, zstd, lz4, snappy, gzip, producer] Server Default Property: compression.type Importance: medium -
delete.retention.ms
The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).
Type: long Default: 86400000 (1 day) Valid Values: [0,…] Server Default Property: log.cleaner.delete.retention.ms Importance: medium -
file.delete.delay.ms
The time to wait before deleting a file from the filesystem
Type: long Default: 60000 (1 minute) Valid Values: [0,…] Server Default Property: log.segment.delete.delay.ms Importance: medium -
flush.messages
This setting allows specifying an interval at which we will force an fsync of data written to the log. For example if this was set to 1 we would fsync after every message; if it were 5 we would fsync after every five messages. In general we recommend you not set this and use replication for durability and allow the operating system’s background flush capabilities as it is more efficient. This setting can be overridden on a per-topic basis (see the per-topic configuration section).
Type: long Default: 9223372036854775807 Valid Values: [1,…] Server Default Property: log.flush.interval.messages Importance: medium -
flush.ms
This setting allows specifying a time interval at which we will force an fsync of data written to the log. For example if this was set to 1000 we would fsync after 1000 ms had passed. In general we recommend you not set this and use replication for durability and allow the operating system’s background flush capabilities as it is more efficient.
Type: long Default: 9223372036854775807 Valid Values: [0,…] Server Default Property: log.flush.interval.ms Importance: medium -
follower.replication.throttled.replicas
A list of replicas for which log replication should be throttled on the follower side. The list should describe a set of replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:… or alternatively the wildcard ‘*’ can be used to throttle all replicas for this topic.
Type: list Default: “” Valid Values: [partitionId]:[brokerId],[partitionId]:[brokerId],… Server Default Property: follower.replication.throttled.replicas Importance: medium -
index.interval.bytes
This setting controls how frequently Kafka adds an index entry to its offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don’t need to change this.
Type: int Default: 4096 (4 kibibytes) Valid Values: [0,…] Server Default Property: log.index.interval.bytes Importance: medium -
leader.replication.throttled.replicas
A list of replicas for which log replication should be throttled on the leader side. The list should describe a set of replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:… or alternatively the wildcard ‘*’ can be used to throttle all replicas for this topic.
Type: list Default: “” Valid Values: [partitionId]:[brokerId],[partitionId]:[brokerId],… Server Default Property: leader.replication.throttled.replicas Importance: medium -
max.compaction.lag.ms
The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.
Type: long Default: 9223372036854775807 Valid Values: [1,…] Server Default Property: log.cleaner.max.compaction.lag.ms Importance: medium -
max.message.bytes
The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers’ fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.
Type: int Default: 1048588 Valid Values: [0,…] Server Default Property: message.max.bytes Importance: medium -
message.format.version
[DEPRECATED] Specify the message format version the broker will use to append messages to the logs. The value of this config is always assumed to be
3.0
ifinter.broker.protocol.version
is 3.0 or higher (the actual config value is ignored). Otherwise, the value should be a valid ApiVersion. Some examples are: 0.10.0, 1.1, 2.8, 3.0. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don’t understand.Type: string Default: 3.0-IV1 Valid Values: [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2-IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2-IV0, 2.2-IV1, 2.3-IV0, 2.3-IV1, 2.4-IV0, 2.4-IV1, 2.5-IV0, 2.6-IV0, 2.7-IV0, 2.7-IV1, 2.7-IV2, 2.8-IV0, 2.8-IV1, 3.0-IV0, 3.0-IV1, 3.1-IV0, 3.2-IV0, 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3] Server Default Property: log.message.format.version Importance: medium -
message.timestamp.difference.max.ms
The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.
Type: long Default: 9223372036854775807 Valid Values: [0,…] Server Default Property: log.message.timestamp.difference.max.ms Importance: medium -
message.timestamp.type
Define whether the timestamp in the message is message create time or log append time. The value should be either
CreateTime
orLogAppendTime
Type: string Default: CreateTime Valid Values: [CreateTime, LogAppendTime] Server Default Property: log.message.timestamp.type Importance: medium -
min.cleanable.dirty.ratio
This configuration controls how frequently the log compactor will attempt to clean the log (assuming log compaction is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log. If the max.compaction.lag.ms or the min.compaction.lag.ms configurations are also specified, then the log compactor considers the log to be eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the max.compaction.lag.ms period.
Type: double Default: 0.5 Valid Values: [0,…,1] Server Default Property: log.cleaner.min.cleanable.ratio Importance: medium -
min.compaction.lag.ms
The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
Type: long Default: 0 Valid Values: [0,…] Server Default Property: log.cleaner.min.compaction.lag.ms Importance: medium -
min.insync.replicas
When a producer sets acks to “all” (or “-1”), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together,min.insync.replicas
andacks
allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, setmin.insync.replicas
to 2, and produce withacks
of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.Type: int Default: 1 Valid Values: [1,…] Server Default Property: min.insync.replicas Importance: medium -
preallocate
True if we should preallocate the file on disk when creating a new log segment.
Type: boolean Default: false Valid Values: Server Default Property: log.preallocate Importance: medium -
retention.bytes
This configuration controls the maximum size a partition (which consists of log segments) can grow to before we will discard old log segments to free up space if we are using the “delete” retention policy. By default there is no size limit only a time limit. Since this limit is enforced at the partition level, multiply it by the number of partitions to compute the topic retention in bytes.
Type: long Default: -1 Valid Values: Server Default Property: log.retention.bytes Importance: medium -
retention.ms
This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the “delete” retention policy. This represents an SLA on how soon consumers must read their data. If set to -1, no time limit is applied.
Type: long Default: 604800000 (7 days) Valid Values: [-1,…] Server Default Property: log.retention.ms Importance: medium -
segment.bytes
This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
Type: int Default: 1073741824 (1 gibibyte) Valid Values: [14,…] Server Default Property: log.segment.bytes Importance: medium -
segment.index.bytes
This configuration controls the size of the index that maps offsets to file positions. We preallocate this index file and shrink it only after log rolls. You generally should not need to change this setting.
Type: int Default: 10485760 (10 mebibytes) Valid Values: [4,…] Server Default Property: log.index.size.max.bytes Importance: medium -
segment.jitter.ms
The maximum random jitter subtracted from the scheduled segment roll time to avoid thundering herds of segment rolling
Type: long Default: 0 Valid Values: [0,…] Server Default Property: log.roll.jitter.ms Importance: medium -
segment.ms
This configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn’t full to ensure that retention can delete or compact old data.
Type: long Default: 604800000 (7 days) Valid Values: [1,…] Server Default Property: log.roll.ms Importance: medium -
unclean.leader.election.enable
指示是否允许不在ISR集中的副本作为最后手段被选举为主副本,即使这样做可能会导致数据丢失。
Type: boolean Default: false Valid Values: Server Default Property: unclean.leader.election.enable Importance: medium -
message.downconversion.enable
此配置控制是否启用消息格式的下转换来满足消费请求。当设置为“false”时,broker不会为期望旧消息格式的使用者执行向下转换。对于来自此类旧客户端的消费请求,代理以“不支持的版本”错误进行响应。此配置不适用于复制到追随者可能需要的任何消息格式转换。
Type: boolean Default: true Valid Values: Server Default Property: log.message.downconversion.enable Importance: low
3.3 Producer Configs
以下是生产者的配置:
-
key.serializer
实现“org . Apache . Kafka . common . serialization . serializer”接口的键的序列化程序类。
Type: class Default: Valid Values: Importance: high -
value.serializer
实现“org . Apache . Kafka . common . serialization . serializer”接口的值的序列化程序类。
Type: class Default: Valid Values: Importance: high -
bootstrap.servers
用于建立与Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管此处为引导指定了哪些服务器—此列表仅影响用于发现全套服务器的初始主机。该列表应采用“主机1:端口1,主机2:端口2,…`.由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态变化),因此该列表不需要包含完整的服务器集(但是,如果某个服务器出现故障,您可能需要多个服务器)。
Type: list Default: “” Valid Values: non-null string Importance: high -
buffer.memory
生成器可用于缓冲等待发送到服务器的记录的内存总字节数。如果记录发送的速度比它们能够传递到服务器的速度快,生成器将阻塞“max.block.ms ”,之后将引发异常。
此设置应该大致对应于生成器将使用的总内存,但这不是一个硬性限制,因为并非生成器使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护进行中的请求。Type: long Default: 33554432 Valid Values: [0,…] Importance: high -
compression.type
生成器生成的所有数据的压缩类型。默认值为无(即不压缩)。有效值为“none”、“gzip”、“snappy”、“lz4”或“zstd”。压缩的是完整批次的数据,因此批处理的功效也会影响压缩比(更多的批处理意味着更好的压缩)。
Type: string Default: none Valid Values: [none, gzip, snappy, lz4, zstd] Importance: high -
retries
设置一个大于零的值将导致客户端重新发送任何发送失败的记录,并出现潜在的暂时性错误。请注意,这种重试与客户端收到错误后重新发送记录没有什么不同。如果“delivery.timeout.ms”配置的超时在成功确认之前先过期,则在重试次数用尽之前,产生请求将会失败。用户通常倾向于不设置此配置,而是使用“delivery.timeout.ms”来控制重试行为。
启用幂等要求此配置值大于0。如果设置了冲突配置,并且没有显式启用幂等,则幂等将被禁用。
在将“enable.idempotence”设置为“false”并将“max . in-flight . requests . per . connection”设置为1时允许重试可能会改变记录的排序,因为如果将两个批次发送到单个分区,并且第一个批次失败并重试,但是第二个批次成功,则第二个批次中的记录可能会首先出现。Type: int Default: 2147483647 Valid Values: [0,…,2147483647] Importance: high -
ssl.key.password
密钥存储文件中私钥的密码或“ssl.keystore.key”中指定的PEM密钥。
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.certificate.chain
“ssl.keystore.type”指定格式的证书链。默认SSL引擎工厂仅支持带有X.509证书列表的PEM格式
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.key
“ssl.keystore.type”指定格式的私钥。默认SSL引擎工厂仅支持带有PKCS#8密钥的PEM格式。如果密钥已加密,则必须使用“ssl.key.password”指定密钥密码
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.location
密钥存储文件的位置。这对客户端是可选的,可用于客户端的双向身份验证。
Type: string Default: null Valid Values: Importance: high -
ssl.keystore.password
密钥存储文件的存储密码。这对于客户端是可选的,只有在配置了“ssl.keystore.location”时才需要。PEM格式不支持密钥存储密码。
Type: password Default: null Valid Values: Importance: high -
ssl.truststore.certificates
“ssl.truststore.type”指定格式的可信证书。默认SSL引擎工厂仅支持带有X.509证书的PEM格式。
Type: password Default: null Valid Values: Importance: high -
ssl.truststore.location
信任存储文件的位置.
Type: string Default: null Valid Values: Importance: high -
ssl.truststore.password
信任存储文件的密码。如果未设置密码,仍将使用配置的信任存储文件,但完整性检查将被禁用。PEM格式不支持信任存储密码。
Type: password Default: null Valid Values: Importance: high -
batch.size
每当有多条记录被发送到同一个分区时,生成器都会尝试将多条记录一起批处理成较少的请求。这有助于提高客户端和服务器的性能。此配置以字节为单位控制默认批处理大小。
不会尝试批量处理大于此大小的记录。
发送到代理的请求将包含多个批处理,每个分区一个批处理,有数据可供发送。
小批量将使批处理不太常见,并可能降低吞吐量(零批量将完全禁用批处理)。一个非常大的批处理大小可能会浪费更多的内存,因为我们总是会分配一个指定的批处理大小的缓冲区来处理额外的记录。
注意:此设置给出了要发送的批量大小的上限。如果我们为这个分区累积的字节数少于这个数,我们将“逗留”一段时间,等待更多的记录出现。这个“linger.ms”设置默认为0,这意味着我们将立即发送一个记录,即使累计的批量大小低于这个“batch.size”设置。Type: int Default: 16384 Valid Values: [0,…] Importance: medium -
client.dns.lookup
控制客户端如何使用DNS查找。如果设置为“use_all_dns_ips ”,则按顺序连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。一旦所有IP都使用过一次,客户端将再次从主机名解析IP(但是,JVM和操作系统缓存DNS名称查找)。如果设置为“resolve _ canonical _ bootstrap _ servers _ only ”,则将每个引导地址解析为规范名称列表。在引导阶段之后,这与“使用所有DNS IPS”的行为相同。
Type: string Default: use_all_dns_ips Valid Values: [use_all_dns_ips, resolve_canonical_bootstrap_servers_only] Importance: medium -
client.id
发出请求时传递给服务器的id字符串。这样做的目的是,通过允许在服务器端请求日志中包含一个逻辑应用程序名称,能够跟踪请求的来源,而不仅仅是IP/端口。
Type: string Default: “” Valid Values: Importance: medium -
connections.max.idle.ms
在此配置指定的毫秒数后关闭空闲连接。
Type: long Default: 540000 (9 minutes) Valid Values: Importance: medium -
delivery.timeout.ms
对“send()”的调用返回后报告成功或失败的时间上限。这限制了记录在发送前延迟的总时间、等待代理确认的时间(如果期望的话)以及允许可重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽,或者记录被添加到达到较早交付到期期限的批中,则生成器可能会报告在此配置之前发送记录失败。此配置的值应大于或等于“request.timeout.ms”和“linger.ms”之和。
Type: int Default: 120000 (2 minutes) Valid Values: [0,…] Importance: medium -
linger.ms
生成器将请求传输之间到达的所有记录组合成一个批处理请求。通常情况下,这种情况只在记录到达速度快于发送速度的情况下才会发生。但是,在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。此设置通过添加少量人为延迟来实现这一点,也就是说,生产者不是立即发送记录,而是等待给定的延迟,以允许发送其他记录,以便可以一起批量发送。这可以被认为类似于TCP中的Nagle算法。此设置给出了批处理延迟的上限:一旦我们获得了一个分区的“batch.size”值的记录,不管此设置如何,它都将被立即发送,但是如果我们为此分区累积的字节数少于此数,我们将“逗留”指定的时间,等待更多记录出现。该设置默认为0(即无延迟)。例如,设置“linger.ms=5”会减少发送的请求数,但会给无负载情况下发送的记录增加多达5毫秒的延迟。
Type: long Default: 0 Valid Values: [0,…] Importance: medium -
max.block.ms
该配置控制
KafkaProducer
的send()
、partitionsFor()
、initTransactions()
、sendOffsetsToTransaction()
、commitTransaction()
和abortTransaction()
方法将阻塞多长时间。对于“send()”,此超时限制了等待元数据提取和缓冲区分配的总时间(用户提供的序列化程序或分区程序中的阻塞不计入此超时)。对于’ partitionsFor()'来说,如果元数据不可用,此超时限制了等待元数据所花费的时间。与事务相关的方法总是被阻塞,但是如果不能发现事务协调器或者在超时内没有响应,则可能会超时。Type: long Default: 60000 (1 minute) Valid Values: [0,…] Importance: medium -
max.request.size
请求的最大字节数。此设置将限制生成器在单个请求中发送的记录批次数量,以避免发送大量请求。这实际上也是对最大未压缩记录批大小的限制。请注意,服务器对记录批处理大小有自己的上限(如果启用了压缩,则在压缩之后),这可能与此不同。
Type: int Default: 1048576 Valid Values: [0,…] Importance: medium -
partitioner.class
一个类,用于确定生成记录时发送到哪个分区。可用选项包括:
- 如果未设置,则使用默认的分区逻辑。该策略将尝试坚持一个分区,直到向该分区产生batch.size字节。它与策略一起工作:
- 如果没有指定分区,但有一个键,则根据键的散列选择分区
- 如果不存在分区或键,则选择在向分区产生batch.size字节时发生变化的粘性分区。
org . Apache . Kafka . clients . producer . roundrobinspartitioner
:这种分区策略是,一系列连续记录中的每个记录都将被发送到不同的分区(不管是否提供了’ key '),直到我们用完分区并重新开始。注意:有一个已知的问题,当创建新的批次时,会导致分配不均。更多细节请查看KAFKA-9965。
实现“org . Apache . Kafka . clients . producer . partitioner”接口允许您插入一个定制的分区器。
Type: class Default: null Valid Values: Importance: medium - 如果未设置,则使用默认的分区逻辑。该策略将尝试坚持一个分区,直到向该分区产生batch.size字节。它与策略一起工作:
-
partitioner.ignore.keys
当设置为“真”时,生产者不会使用记录键来选择一个分区。如果为“false ”,当存在一个键时,生产者将根据该键的散列选择一个分区。注意:如果使用自定义分区器,此设置不起作用。
Type: boolean Default: false Valid Values: Importance: medium -
receive.buffer.bytes
读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果该值为-1,将使用操作系统默认值。
Type: int Default: 32768 (32 kibibytes) Valid Values: [-1,…] Importance: medium -
request.timeout.ms
配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。这应该大于“replica . lag . time max . ms ”(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
Type: int Default: 30000 (30 seconds) Valid Values: [0,…] Importance: medium -
sasl.client.callback.handler.class
实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的完全限定名。
Type: | class |
---|---|
Default: | null |
Valid Values: | |
Importance: | medium |
-
sasl.jaas.config
SASL连接的JAAS登录上下文参数,采用JAAS配置文件使用的格式。JAAS配置文件格式在【这里】有描述(http://docs . Oracle . com/javase/8/docs/technotes/guides/security/jgss/tutorials/loginconfigfile . html)。该值的格式为:
loginmoduleclass control flag(option name = option value)*;
.对于代理,配置必须以小写的侦听器前缀和SASL机制名称作为前缀。例如listener . name . sasl _ SSL . scram-sha-256 . sasl . JAAS . config = com . example . scram log in module必选;Type: password Default: null Valid Values: Importance: medium -
sasl.kerberos.service.name
Kafka运行时使用的Kerberos主体名称。这可以在卡夫卡的JAAS配置或卡夫卡的配置中定义。
Type: string Default: null Valid Values: Importance: medium -
sasl.login.callback.handler.class
The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler
Type: class Default: null Valid Values: Importance: medium -
sasl.login.class
The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin
Type: class Default: null Valid Values: Importance: medium -
sasl.mechanism
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.
Type: string Default: GSSAPI Valid Values: Importance: medium -
sasl.oauthbearer.jwks.endpoint.url
The OAuth/OIDC provider URL from which the provider’s JWKS (JSON Web Key Set) can be retrieved. The URL can be HTTP(S)-based or file-based. If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a “kid” header claim value that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received. If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a “kid” header value that isn’t in the JWKS file, the broker will reject the JWT and authentication will fail.
Type: string Default: null Valid Values: Importance: medium -
sasl.oauthbearer.token.endpoint.url
The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer’s token endpoint URL to which requests will be made to login based on the configuration in sasl.jaas.config. If the URL is file-based, it specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.
Type: string Default: null Valid Values: Importance: medium -
security.protocol
Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Type: string Default: PLAINTEXT Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] Importance: medium -
send.buffer.bytes
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
Type: int Default: 131072 (128 kibibytes) Valid Values: [-1,…] Importance: medium -
socket.connection.setup.timeout.max.ms
The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
Type: long Default: 30000 (30 seconds) Valid Values: Importance: medium -
socket.connection.setup.timeout.ms
The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.
Type: long Default: 10000 (10 seconds) Valid Values: Importance: medium -
ssl.enabled.protocols
The list of protocols enabled for SSL connections. The default is ‘TLSv1.2,TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the config documentation for
ssl.protocol
.Type: list Default: TLSv1.2,TLSv1.3 Valid Values: Importance: medium -
ssl.keystore.type
The file format of the key store file. This is optional for client. The values currently supported by the default
ssl.engine.factory.class
are [JKS, PKCS12, PEM].Type: string Default: JKS Valid Values: Importance: medium -
ssl.protocol
The SSL protocol used to generate the SSLContext. The default is ‘TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are ‘TLSv1.2’ and ‘TLSv1.3’. ‘TLS’, ‘TLSv1.1’, ‘SSL’, ‘SSLv2’ and ‘SSLv3’ may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this config and ‘ssl.enabled.protocols’, clients will downgrade to ‘TLSv1.2’ if the server does not support ‘TLSv1.3’. If this config is set to ‘TLSv1.2’, clients will not use ‘TLSv1.3’ even if it is one of the values in ssl.enabled.protocols and the server only supports ‘TLSv1.3’.
Type: string Default: TLSv1.3 Valid Values: Importance: medium -
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
Type: string Default: null Valid Values: Importance: medium -
ssl.truststore.type
The file format of the trust store file. The values currently supported by the default
ssl.engine.factory.class
are [JKS, PKCS12, PEM].Type: string Default: JKS Valid Values: Importance: medium -
acks
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0
If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries
configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to-1
.acks=1
This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.acks=all
This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
Note that enabling idempotence requires this config value to be ‘all’. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
Type: string Default: all Valid Values: [all, -1, 0, 1] Importance: low -
enable.idempotence
When set to ‘true’, the producer will ensure that exactly one copy of each message is written in the stream. If ‘false’, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
max.in.flight.requests.per.connection
to be less than or equal to 5 (with message ordering preserved for any allowable value),retries
to be greater than 0, andacks
must be ‘all’.
Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, aConfigException
is thrown.Type: boolean Default: true Valid Values: Importance: low -
interceptor.classes
A list of classes to use as interceptors. Implementing the
org.apache.kafka.clients.producer.ProducerInterceptor
interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.Type: list Default: “” Valid Values: non-null string Importance: low -
max.in.flight.requests.per.connection
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this configuration is set to be greater than 1 and
enable.idempotence
is set to false, there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled); if retries are disabled or ifenable.idempotence
is set to true, ordering will be preserved. Additionally, enabling idempotence requires the value of this configuration to be less than or equal to 5. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.Type: int Default: 5 Valid Values: [1,…] Importance: low -
metadata.max.age.ms
The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.
Type: long Default: 300000 (5 minutes) Valid Values: [0,…] Importance: low -
metadata.max.idle.ms
Controls how long the producer will cache metadata for a topic that’s idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic’s metadata is forgotten and the next access to it will force a metadata fetch request.
Type: long Default: 300000 (5 minutes) Valid Values: [5000,…] Importance: low -
metric.reporters
A list of classes to use as metrics reporters. Implementing the
org.apache.kafka.common.metrics.MetricsReporter
interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.Type: list Default: “” Valid Values: non-null string Importance: low -
metrics.num.samples
The number of samples maintained to compute metrics.
Type: int Default: 2 Valid Values: [1,…] Importance: low -
metrics.recording.level
The highest recording level for metrics.
Type: string Default: INFO Valid Values: [INFO, DEBUG, TRACE] Importance: low -
metrics.sample.window.ms
The window of time a metrics sample is computed over.
Type: long Default: 30000 (30 seconds) Valid Values: [0,…] Importance: low -
partitioner.adaptive.partitioning.enable
When set to ‘true’, the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. If ‘false’, producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used
Type: boolean Default: true Valid Values: Importance: low -
partitioner.availability.timeout.ms
If a broker cannot process produce requests from a partition for
partitioner.availability.timeout.ms
time, the partitioner treats that partition as not available. If the value is 0, this logic is disabled. Note: this setting has no effect if a custom partitioner is used orpartitioner.adaptive.partitioning.enable
is set to ‘false’Type: long Default: 0 Valid Values: [0,…] Importance: low -
reconnect.backoff.max.ms
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
Type: long Default: 1000 (1 second) Valid Values: [0,…] Importance: low -
reconnect.backoff.ms
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
Type: long Default: 50 Valid Values: [0,…] Importance: low -
retry.backoff.ms
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
Type: long Default: 100 Valid Values: [0,…] Importance: low -
sasl.kerberos.kinit.cmd
Kerberos kinit command path.
Type: string Default: /usr/bin/kinit Valid Values: Importance: low -
sasl.kerberos.min.time.before.relogin
Login thread sleep time between refresh attempts.
Type: long Default: 60000 Valid Values: Importance: low -
sasl.kerberos.ticket.renew.jitter
Percentage of random jitter added to the renewal time.
Type: double Default: 0.05 Valid Values: Importance: low -
sasl.kerberos.ticket.renew.window.factor
Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.
Type: double Default: 0.8 Valid Values: Importance: low -
sasl.login.connect.timeout.ms
The (optional) value in milliseconds for the external authentication provider connection timeout. Currently applies only to OAUTHBEARER.
Type: int Default: null Valid Values: Importance: low -
sasl.login.read.timeout.ms
The (optional) value in milliseconds for the external authentication provider read timeout. Currently applies only to OAUTHBEARER.
Type: int Default: null Valid Values: Importance: low -
sasl.login.refresh.buffer.seconds
The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.
Type: short Default: 300 Valid Values: [0,…,3600] Importance: low -
sasl.login.refresh.min.period.seconds
The desired minimum time for the login refresh thread to wait before refreshing a credential, in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.
Type: short Default: 60 Valid Values: [0,…,900] Importance: low -
sasl.login.refresh.window.factor
Login refresh thread will sleep until the specified window factor relative to the credential’s lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER.
Type: double Default: 0.8 Valid Values: [0.5,…,1.0] Importance: low -
sasl.login.refresh.window.jitter
The maximum amount of random jitter relative to the credential’s lifetime that is added to the login refresh thread’s sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.
Type: double Default: 0.05 Valid Values: [0.0,…,0.25] Importance: low -
sasl.login.retry.backoff.max.ms
The (optional) value in milliseconds for the maximum wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.
Type: long Default: 10000 (10 seconds) Valid Values: Importance: low -
sasl.login.retry.backoff.ms
The (optional) value in milliseconds for the initial wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.
Type: long Default: 100 Valid Values: Importance: low -
sasl.oauthbearer.clock.skew.seconds
The (optional) value in seconds to allow for differences between the time of the OAuth/OIDC identity provider and the broker.
Type: int Default: 30 Valid Values: Importance: low -
sasl.oauthbearer.expected.audience
The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences. The JWT will be inspected for the standard OAuth “aud” claim and if this value is set, the broker will match the value from JWT’s “aud” claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.
Type: list Default: null Valid Values: Importance: low -
sasl.oauthbearer.expected.issuer
The (optional) setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will be inspected for the standard OAuth “iss” claim and if this value is set, the broker will match it exactly against what is in the JWT’s “iss” claim. If there is no match, the broker will reject the JWT and authentication will fail.
Type: string Default: null Valid Values: Importance: low -
sasl.oauthbearer.jwks.endpoint.refresh.ms
The (optional) value in milliseconds for the broker to wait between refreshing its JWKS (JSON Web Key Set) cache that contains the keys to verify the signature of the JWT.
Type: long Default: 3600000 (1 hour) Valid Values: Importance: low -
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
The (optional) value in milliseconds for the maximum wait between attempts to retrieve the JWKS (JSON Web Key Set) from the external authentication provider. JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.
Type: long Default: 10000 (10 seconds) Valid Values: Importance: low -
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
The (optional) value in milliseconds for the initial wait between JWKS (JSON Web Key Set) retrieval attempts from the external authentication provider. JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.
Type: long Default: 100 Valid Values: Importance: low -
sasl.oauthbearer.scope.claim.name
The OAuth claim for the scope is often named “scope”, but this (optional) setting can provide a different name to use for the scope included in the JWT payload’s claims if the OAuth/OIDC provider uses a different name for that claim.
Type: string Default: scope Valid Values: Importance: low -
sasl.oauthbearer.sub.claim.name
The OAuth claim for the subject is often named “sub”, but this (optional) setting can provide a different name to use for the subject included in the JWT payload’s claims if the OAuth/OIDC provider uses a different name for that claim.
Type: string Default: sub Valid Values: Importance: low -
security.providers
A list of configurable creator classes each returning a provider implementing security algorithms. These classes should implement the
org.apache.kafka.common.security.auth.SecurityProviderCreator
interface.Type: string Default: null Valid Values: Importance: low -
ssl.cipher.suites
A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.
Type: list Default: null Valid Values: Importance: low -
ssl.endpoint.identification.algorithm
The endpoint identification algorithm to validate server hostname using server certificate.
Type: string Default: https Valid Values: Importance: low -
ssl.engine.factory.class
The class of type org.apache.kafka.common.security.auth.SslEngineFactory to provide SSLEngine objects. Default value is org.apache.kafka.common.security.ssl.DefaultSslEngineFactory
Type: class Default: null Valid Values: Importance: low -
ssl.keymanager.algorithm
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
Type: string Default: SunX509 Valid Values: Importance: low -
ssl.secure.random.implementation
The SecureRandom PRNG implementation to use for SSL cryptography operations.
Type: string Default: null Valid Values: Importance: low -
ssl.trustmanager.algorithm
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.
Type: string Default: PKIX Valid Values: Importance: low -
transaction.timeout.ms
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a
InvalidTxnTimeoutException
error.Type: int Default: 60000 (1 minute) Valid Values: Importance: low -
transactional.id
The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. If a TransactionalId is configured,
enable.idempotence
is implied. By default the TransactionId is not configured, which means transactions cannot be used. Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker settingtransaction.state.log.replication.factor
.Type: string Default: null Valid Values: non-empty string Importance: low
3.4 Consumer Configs
以下是消费者的配置:
-
key.deserializer
Deserializer class for key that implements the
org.apache.kafka.common.serialization.Deserializer
interface.Type: class Default: Valid Values: Importance: high -
value.deserializer
Deserializer class for value that implements the
org.apache.kafka.common.serialization.Deserializer
interface.Type: class Default: Valid Values: Importance: high -
bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).Type: list Default: “” Valid Values: non-null string Importance: high -
fetch.min.bytes
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.
Type: int Default: 1 Valid Values: [0,…] Importance: high -
group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using
subscribe(topic)
or the Kafka-based offset management strategy.Type: string Default: null Valid Values: Importance: high -
heartbeat.interval.ms
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than
session.timeout.ms
, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.Type: int Default: 3000 (3 seconds) Valid Values: Importance: high -
max.partition.fetch.bytes
The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via
message.max.bytes
(broker config) ormax.message.bytes
(topic config). See fetch.max.bytes for limiting the consumer request size.Type: int Default: 1048576 (1 mebibyte) Valid Values: [0,…] Importance: high -
session.timeout.ms
The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by
group.min.session.timeout.ms
andgroup.max.session.timeout.ms
.Type: int Default: 45000 (45 seconds) Valid Values: Importance: high -
ssl.key.password
The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key’.
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.certificate.chain
Certificate chain in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with a list of X.509 certificates
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.key
Private key in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, key password must be specified using ‘ssl.key.password’
Type: password Default: null Valid Values: Importance: high -
ssl.keystore.location
The location of the key store file. This is optional for client and can be used for two-way authentication for client.
Type: string Default: null Valid Values: Importance: high -
ssl.keystore.password
The store password for the key store file. This is optional for client and only needed if ‘ssl.keystore.location’ is configured. Key store password is not supported for PEM format.
Type: password Default: null Valid Values: Importance: high -
ssl.truststore.certificates
Trusted certificates in the format specified by ‘ssl.truststore.type’. Default SSL engine factory supports only PEM format with X.509 certificates.
Type: password Default: null Valid Values: Importance: high -
ssl.truststore.location
The location of the trust store file.
Type: string Default: null Valid Values: Importance: high -
ssl.truststore.password
The password for the trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. Trust store password is not supported for PEM format.
Type: password Default: null Valid Values: Importance: high -
allow.auto.create.topics
Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using
auto.create.topics.enable
broker configuration. This configuration must be set tofalse
when using brokers older than 0.11.0Type: boolean Default: true Valid Values: Importance: medium -
auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer’s group
- anything else: throw exception to the consumer.
Type: | string |
---|---|
Default: | latest |
Valid Values: | [latest, earliest, none] |
Importance: | medium |
-
client.dns.lookup
Controls how the client uses DNS lookups. If set to
use_all_dns_ips
, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again (both the JVM and the OS cache DNS name lookups, however). If set toresolve_canonical_bootstrap_servers_only
, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same asuse_all_dns_ips
.Type: string Default: use_all_dns_ips Valid Values: [use_all_dns_ips, resolve_canonical_bootstrap_servers_only] Importance: medium -
connections.max.idle.ms
Close idle connections after the number of milliseconds specified by this config.
Type: long Default: 540000 (9 minutes) Valid Values: Importance: medium -
default.api.timeout.ms
Specifies the timeout (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a
timeout
parameter.Type: int Default: 60000 (1 minute) Valid Values: [0,…] Importance: medium -
enable.auto.commit
If true the consumer’s offset will be periodically committed in the background.
Type: boolean Default: true Valid Values: Importance: medium -
exclude.internal.topics
Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.
Type: boolean Default: true Valid Values: Importance: medium -
fetch.max.bytes
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via
message.max.bytes
(broker config) ormax.message.bytes
(topic config). Note that the consumer performs multiple fetches in parallel.Type: int Default: 52428800 (50 mebibytes) Valid Values: [0,…] Importance: medium -
group.instance.id
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.
Type: string Default: null Valid Values: non-empty string Importance: medium -
isolation.level
Controls how to read messages written transactionally. If set to
read_committed
, consumer.poll() will only return transactional messages which have been committed. If set toread_uncommitted
(the default), consumer.poll() will return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned unconditionally in either mode.
Messages will always be returned in offset order. Hence, inread_committed
mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result,read_committed
consumers will not be able to read up to the high watermark when there are in flight transactions.
Further, when inread_committed
the seekToEnd method will return the LSO
Type: | string |
---|---|
Default: | read_uncommitted |
Valid Values: | [read_committed, read_uncommitted] |
Importance: | medium |
-
max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null
group.instance.id
which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration ofsession.timeout.ms
. This mirrors the behavior of a static consumer which has shutdown.Type: int Default: 300000 (5 minutes) Valid Values: [1,…] Importance: medium -
max.poll.records
The maximum number of records returned in a single call to poll(). Note, that
max.poll.records
does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.Type: int Default: 500 Valid Values: [1,…] Importance: medium -
partition.assignment.strategy
A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are:
org.apache.kafka.clients.consumer.RangeAssignor
: Assigns partitions on a per-topic basis.org.apache.kafka.clients.consumer.RoundRobinAssignor
: Assigns partitions to consumers in a round-robin fashion.org.apache.kafka.clients.consumer.StickyAssignor
: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.org.apache.kafka.clients.consumer.CooperativeStickyAssignor
: Follows the same StickyAssignor logic, but allows for cooperative rebalancing.
The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
Implementing theorg.apache.kafka.clients.consumer.ConsumerPartitionAssignor
interface allows you to plug in a custom assignment strategy.
Type: | list |
---|---|
Default: | class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
Valid Values: | non-null string |
Importance: | medium |
-
receive.buffer.bytes
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
Type: int Default: 65536 (64 kibibytes) Valid Values: [-1,…] Importance: medium -
request.timeout.ms
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: int Default: 30000 (30 seconds) Valid Values: [0,…] Importance: medium -
sasl.client.callback.handler.class
The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.
Type: class Default: null Valid Values: Importance: medium -
sasl.jaas.config
JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is:
loginModuleClass controlFlag (optionName=optionValue)*;
. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;Type: password Default: null Valid Values: Importance: medium -
sasl.kerberos.service.name
The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.
Type: string Default: null Valid Values: Importance: medium -
sasl.login.callback.handler.class
The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler
Type: class Default: null Valid Values: Importance: medium -
sasl.login.class
The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin
Type: class Default: null Valid Values: Importance: medium -
sasl.mechanism
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.
Type: string Default: GSSAPI Valid Values: Importance: medium -
sasl.oauthbearer.jwks.endpoint.url
The OAuth/OIDC provider URL from which the provider’s JWKS (JSON Web Key Set) can be retrieved. The URL can be HTTP(S)-based or file-based. If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a “kid” header claim value that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received. If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a “kid” header value that isn’t in the JWKS file, the broker will reject the JWT and authentication will fail.
Type: string Default: null Valid Values: Importance: medium -
sasl.oauthbearer.token.endpoint.url
The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer’s token endpoint URL to which requests will be made to login based on the configuration in sasl.jaas.config. If the URL is file-based, it specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.
Type: string Default: null Valid Values: Importance: medium -
security.protocol
Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Type: string Default: PLAINTEXT Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] Importance: medium -
send.buffer.bytes
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
Type: int Default: 131072 (128 kibibytes) Valid Values: [-1,…] Importance: medium -
socket.connection.setup.timeout.max.ms
The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
Type: long Default: 30000 (30 seconds) Valid Values: Importance: medium -
socket.connection.setup.timeout.ms
The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.
Type: long Default: 10000 (10 seconds) Valid Values: Importance: medium -
ssl.enabled.protocols
The list of protocols enabled for SSL connections. The default is ‘TLSv1.2,TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the config documentation for
ssl.protocol
.Type: list Default: TLSv1.2,TLSv1.3 Valid Values: Importance: medium -
ssl.keystore.type
The file format of the key store file. This is optional for client. The values currently supported by the default
ssl.engine.factory.class
are [JKS, PKCS12, PEM].Type: string Default: JKS Valid Values: Importance: medium -
ssl.protocol
The SSL protocol used to generate the SSLContext. The default is ‘TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are ‘TLSv1.2’ and ‘TLSv1.3’. ‘TLS’, ‘TLSv1.1’, ‘SSL’, ‘SSLv2’ and ‘SSLv3’ may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this config and ‘ssl.enabled.protocols’, clients will downgrade to ‘TLSv1.2’ if the server does not support ‘TLSv1.3’. If this config is set to ‘TLSv1.2’, clients will not use ‘TLSv1.3’ even if it is one of the values in ssl.enabled.protocols and the server only supports ‘TLSv1.3’.
Type: string Default: TLSv1.3 Valid Values: Importance: medium -
ssl.provider
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
Type: string Default: null Valid Values: Importance: medium -
ssl.truststore.type
The file format of the trust store file. The values currently supported by the default
ssl.engine.factory.class
are [JKS, PKCS12, PEM].Type: string Default: JKS Valid Values: Importance: medium -
auto.commit.interval.ms
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if
enable.auto.commit
is set totrue
.Type: int Default: 5000 (5 seconds) Valid Values: [0,…] Importance: low -
check.crcs
Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.
Type: boolean Default: true Valid Values: Importance: low -
client.id
An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical