当前位置: 首页 > news >正文

kafka-3.3.1

kafka-3.3.1

1. 开始

1.1 简介

什么是事件流?

事件流是人体中枢神经系统的数字等价物。它是“永远在线”世界的技术基础,在这个世界中,企业越来越多地由软件定义和自动化,并且软件的用户更多是软件。

从技术上讲,事件流是以事件流的形式从数据库、传感器、移动设备、云服务和软件应用程序等事件源实时捕获数据的实践;持久存储这些事件流以供以后检索;实时和回顾性地操纵、处理和响应事件流;并根据需要将事件流路由到不同的目标技术。因此,事件流可确保数据的连续流动和解释,从而使正确的信息在正确的时间出现在正确的位置。

我可以使用事件流来做什么?

事件流应用于 众多行业和组织的各种用例。它的许多例子包括:

  • 实时处理支付和金融交易,例如证券交易所、银行和保险。

  • 实时跟踪和监控汽车、卡车、车队和货物,例如物流和汽车行业。

  • 持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电场。

  • 收集客户互动和订单并立即做出反应,例如在零售、酒店和旅游业以及移动应用程序中。

  • 监测住院患者并预测病情变化,确保在紧急情况下及时救治。

  • 连接、存储和提供公司不同部门生成的数据。

  • 作为数据平台、事件驱动架构和微服务的基础

Apache Kafka® 是一个事件流平台。这意味着什么?

Kafka 结合了三个关键功能,因此您可以 使用 一个久经考验的解决方案实现端到端事件流 的用例:

  1. 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据 。
  2. 只要您愿意,就可以持久可靠地存储事件 流**。**
  3. 在事件发生时或回顾性 地处理事件流。

所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以选择自行管理 Kafka 环境和使用各种供应商提供的完全托管服务。

简而言之,Kafka 是如何工作的?

Kafka 是一个分布式系统,由通过高性能TCP 网络协议进行通信的服务器客户端组成。它可以部署在本地和云环境中的裸机硬件、虚拟机和容器上。

服务器:Kafka 作为一个或多个服务器集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以事件流的形式持续导入和导出数据,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一台服务器发生故障,其他服务器将接管它们的工作以确保连续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,即使在出现网络问题或机器故障的情况下,也能以容错的方式并行、大规模地读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的 数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。

主要概念和术语

事件记录了世界上或您的企业中“发生了某事” 的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您会以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。这是一个示例事件:

  • 活动key:“Alice”
  • 事件vlue:“向 Bob 支付了 200 美元”
  • 事件时间戳:“2020 年 6 月 25 日下午 2:06”

生产者是那些向 Kafka 发布(写入)事件的客户端应用程序,而消费者是那些订阅(读取和处理)这些事件的客户端应用程序。在 Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是实现 Kafka 著名的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如能够精确地处理一次事件。

事件被组织并持久存储在主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。示例主题名称可以是“付款”。Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。主题中的事件可以根据需要随时读取——与传统的消息系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该保留您的事件多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据完全没问题。

主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。这种数据的分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从/向许多代理读取和写入数据。当一个新事件被发布到一个主题时,它实际上被附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件将写入同一分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。

img图:这个示例主题有四个分区 P1–P4。两个不同的生产者客户端通过网络将事件写入主题的分区,彼此独立地向主题发布新事件。具有相同键的事件(在图中用它们的颜色表示)被写入相同的分区。请注意,如果合适,两个生产者都可以写入同一分区。

为了使您的数据具有容错性和高可用性,可以复制每个主题,甚至可以跨地理区域或数据中心,以便始终有多个代理拥有数据副本以防万一出现问题,您希望对经纪人进行维护,等等。一个常见的生产设置是复制因子 3,即你的数据总是有三个副本。这种复制是在主题分区级别执行的。

这本入门书应该足以进行介绍。如果您有兴趣,文档的设计部分详细解释了 Kafka 的各种概念。

Kafka APIs

除了用于管理和管理任务的命令行工具外,Kafka 还具有五个用于 Java 和 Scala 的核心 API:

1.2 用例

以下是 Apache Kafka® 的一些流行用例的描述。有关其中一些领域的概述,请参阅此博客文章。

讯息

Kafka 可以很好地替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

根据我们的经验,消息传递的使用通常吞吐量相对较低,但可能需要低端到端延迟,并且通常依赖于 Kafka 提供的强大持久性保证。

在这个领域中,Kafka 可与传统消息系统(如ActiveMQ或 RabbitMQ )相媲美。

网站活动跟踪

Kafka 的最初用例是能够将用户活动跟踪管道重建为一组实时发布-订阅提要。这意味着站点活动(页面浏览、搜索或用户可能采取的其他操作)将发布到中心主题,每个活动类型一个主题。这些提要可用于订阅一系列用例,包括实时处理、实时监控以及加载到 Hadoop 或离线数据仓库系统以进行离线处理和报告。

活动跟踪的量通常非常大,因为每个用户页面视图都会生成许多活动消息。

指标

Kafka常用于运营监控数据。这涉及汇总来自分布式应用程序的统计数据,以生成集中的运营数据提要。

许多人使用 Kafka 作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在一个中央位置(可能是文件服务器或 HDFS)进行处理。Kafka 抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低的延迟处理和更容易支持多个数据源和分布式数据消费。与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka 提供了同样出色的性能、由于复制而提供的更强的持久性保证以及更低的端到端延迟。

流处理

Kafka 的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从 Kafka 主题中使用,然后聚合、丰富或以其他方式转换为新主题以供进一步使用或后续处理。例如,用于推荐新闻文章的处理管道可能会从 RSS 提要中抓取文章内容并将其发布到“文章”主题;进一步处理可能会规范化或删除重复内容,并将清理后的文章内容发布到新主题;最后的处理阶段可能会尝试向用户推荐此内容。此类处理管道根据各个主题创建实时数据流图。从0.10.0.0开始,一个轻量级但强大的流处理库Kafka Streams 在 Apache Kafka 中可用以执行上述数据处理。除了 Kafka Streams,替代的开源流处理工具包括Apache Storm和 Apache Samza。

事件溯源

事件溯源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列。Kafka 对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。

提交日志

Kafka 可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka 中的日志压缩功能有助于支持这种用法。在这种用法中,Kafka 类似于Apache BookKeeper项目。

1.3 快速入门

第 1 步:获取 KAFKA

下载 最新的 Kafka 版本并解压:

$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
第2步:启动KAFKA环境

注意:您的本地环境必须安装 Java 8+。

Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动。要开始使用任一配置,请遵循以下部分,但不要同时遵循这两个部分。

Kafka 与 ZooKeeper

运行以下命令以按正确顺序启动所有服务:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务都成功启动,您将拥有一个正在运行并可以使用的基本 Kafka 环境。注:自带的zookeeper不适合作为集群应用

kafka与KRaft

修改配置文件 …/config/kraft/server.properties

#kafka#kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功
能)
process.roles=broker, controller 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功
能)

node.id=1

controller.quorum.voters=@192.168.88.139:9093

advertised.listeners=PLAINTEXT://192.168.88.139:9092

log.dirs=/opt/kafka_2.13-3.3.1/kafka_log

生成集群 UUID

$ bin/kafka-storage.sh random-uuid
生成的uuid

格式化日志目录

$ bin/kafka-storage.sh format -t RbHFvk0wTTKMYnSRNpMTpA -c config/kraft/server.properties

启动卡夫卡服务器

$ bin/kafka-server-start.sh config/kraft/server.properties

Kafka 服务器成功启动后,您将拥有一个正在运行并可以使用的基本 Kafka 环境。

第 3 步:创建一个主题来存储您的事件

Kafka 是一个分布式事件流平台,可让您跨多台计算机 读取、写入、存储和处理 事件(在文档中也称为记录消息)。

示例事件包括支付交易、来自手机的地理位置更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。

因此,在您编写第一个事件之前,您必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --list --bootstrap-server 192.168.88.134:9092

Kafka 的所有命令行工具都有额外的选项:运行kafka-topics.sh不带任何参数的命令以显示使用信息。例如,它还可以向您显示 新主题 的分区计数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1    Configs:Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
第 4 步:将一些事件写入主题

Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理将以持久和容错的方式存储事件,只要您需要——甚至永远。

运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致一个单独的事件被写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以随时停止生产者客户端Ctrl-C

第 5 步:读取事件

打开另一个终端会话并运行控制台消费者客户端以读取您刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以随时停止消费者客户端Ctrl-C

随意尝试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在您的消费者终端中。

因为事件持久存储在 Kafka 中,所以它们可以被任意多次读取,并且可以被任意多的消费者读取。您可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。

第 6 步:使用 KAFKA CONNECT 将数据导入/导出为事件流

您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及许多已经在使用这些系统的应用程序。 Kafka Connect允许您不断地将数据从外部系统提取到 Kafka 中,反之亦然。它是一个运行 连接器的可扩展工具,连接器实现了与外部系统交互的自定义逻辑。因此很容易将现有系统与 Kafka 集成。为了使这个过程更容易,有数百个这样的连接器随时可用。

在本快速入门中,我们将了解如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka 主题并将数据从 Kafka 主题导出到文件。

首先,确保添加connect-file-3.3.1.jarplugin.pathConnect worker 配置中的属性。出于本快速入门的目的,我们将使用相对路径并将连接器的包视为超级 jar,当从安装目录运行快速入门命令时,它会起作用。但是,值得注意的是,对于生产部署,使用绝对路径始终是可取的。有关如何设置此配置的详细说明, 请参阅plugin.path 。

编辑config/connect-standalone.properties文件,添加或更改plugin.path与以下匹配的配置属性,然后保存文件:

> echo "plugin.path=libs/connect-file-3.3.1.jar"

然后,首先创建一些种子数据进行测试:

> echo -e "foo\nbar" > test.txt

或者在 Windows 上:

> echo foo> test.txt
> echo bar>> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka 代理和数据的序列化格式。其余配置文件分别指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些示例配置文件包含在 Kafka 中,使用您之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成到 Kafka 主题,第二个是接收器连接器它从 Kafka 主题读取消息并在输出文件中将每条消息生成为一行。

在启动过程中,您会看到许多日志消息,包括一些表明正在实例化连接器的消息。一旦 Kafka Connect 进程启动,源连接器应该开始从主题读取行test.txt并将它们生成到主题connect-test,而接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传送:

> more test.sink.txt
foo
bar

请注意,数据存储在 Kafka 主题connect-test中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,因此我们可以将数据添加到文件中,并看到数据在管道中移动:

> echo Another line>> test.txt

您应该会在控制台使用者输出和接收器文件中看到这一行。

第 7 步:使用 KAFKA STREAMS 处理您的事件

一旦您的数据作为事件存储在 Kafka 中,您就可以使用适用于 Java/Scala 的 Kafka Streams客户端库处理数据。它允许您实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式。该库支持恰好一次处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

为了给你一个初步的体验,下面是如何实现流行的WordCount算法:

KStream<String, String> textLines = builder.stream("quickstart-events");KTable<String, Long> wordCounts = textLines.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))).groupBy((keyIgnored, word) -> word).count();wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams 演示 和应用程序开发教程演示 了 如何从头到尾编写和运行此类流应用程序。

第8步:终止KAFKA环境

现在您已经完成了快速入门,可以随意拆除 Kafka 环境,或者继续尝试。

  1. 使用 停止生产者和消费者客户Ctrl-C,如果您还没有这样做的话。
  2. 使用 停止 Kafka 代理Ctrl-C
  3. 最后,如果遵循 Kafka with ZooKeeper 部分,请使用 停止 ZooKeeper 服务器Ctrl-C

如果您还想删除本地 Kafka 环境的任何数据,包括您在此过程中创建的任何事件,请运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

祝贺 你!

阿帕奇已经成功地完成了快速入门。

为了了解更多信息,我们建议以下步骤:

  • 通读简报介绍了解卡夫卡如何在高水平上工作,它的主要概念,以及它与其他技术的比较。要更详细地了解卡夫卡,请前往文档 .
  • 浏览用例了解全球社区的其他用户如何从卡夫卡身上获得价值。
  • 加入当地卡夫卡会议小组和观看卡夫卡峰会会谈卡夫卡社区的主要会议。

1.4 生态系统

在主发行版之外有很多工具可以与Kafka集成。这个生态系统页面列出其中许多,包括流处理系统、Hadoop集成、监视和部署工具

2.APIS

Kafka包括五个核心API:

  1. 这个ProducerAPI允许应用程序向Kafka集群中的主题发送数据流。
  2. 这个ConsumerAPI允许应用程序从Kafka集群中的主题读取数据流。
  3. 这个StreamsAPI允许将数据流从输入主题转换为输出主题。
  4. 这个connectAPI允许实现连接器,这些连接器不断地从源系统或应用程序拉入Kafka,或从Kafka推送到某些接收器系统或应用程序。
  5. 这个AdminAPI允许管理和检查主题、代理和其他Kafka对象。

Kafka通过一个独立于语言的协议公开了它的所有功能,该协议有许多编程语言的客户端。然而,只有Java客户机作为Kafka主项目的一部分进行维护,其他客户机作为独立的开源项目提供。提供了一个非Java客户机列表

在这里.

2.1生产者API

producerapi允许应用程序向Kafka集群中的主题发送数据流。

中提供了演示如何使用producer的示例 javadocs公司 .

要使用producer,可以使用以下maven依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>

2.2消费者api

消费者API允许应用程序从Kafka集群中的主题读取数据流。

中给出了如何使用消费者的示例 javadocs公司 .

要使用使用者,可以使用以下maven依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>

2.3Streams API

这个StreamsAPI允许将数据流从输入主题转换为输出主题。

中提供了演示如何使用此库的示例 javadocs公司

有关使用Streams API的其他文档可用在这里 .

要使用Kafka流,可以使用以下maven依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>3.3.1</version>
</dependency>

使用Scala时,可以选择包含kafka-streams-scala图书馆。关于在Scala中使用Kafka Streams DSL的其他文档也可用在开发人员指南中 .

要使用Kafka Streams DSL for Scala for Scala 2.13,可以使用以下maven依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams-scala_2.13</artifactId><version>3.3.1</version>
</dependency>

2.4连接API

connectapi允许实现连接器,这些连接器不断地从源数据系统拉入Kafka或从Kafka推送到某个sink数据系统。

许多Connect用户不需要直接使用这个API,但是他们可以使用预先构建的连接器,而不需要编写任何代码。提供了有关使用Connect的其他信息在这里 .

希望实现自定义连接器的用户可以看到 java文档 .

2.5管理API

管理API支持管理和检查主题、代理、ACL和其他Kafka对象。

要使用管理API,请添加以下Maven依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version>
</dependency>

有关管理的更多信息,请参阅API

java文档

.

3. 配置

Kafka使用属性文件格式的键值对进行配置。这些值可以从文件或以编程方式提供。

3.1 Broker Configs

对于ZooKeeper集群,代理必须具有以下配置:

  • broker.id
  • log.dirs
  • zookeeper.connect

对于KRaft集群,代理和控制器必须具有以下配置:

  • node.id
  • log.dirs
  • process.roles

关于KRaft经纪人,如果是broker.id已设置,它必须等于node.id。下面将详细讨论主题级配置和默认值。

advertised.listeners

要发布到ZooKeeper供客户端使用的侦听器,如果不同于侦听器配置属性。在IaaS环境中,这可能需要与代理绑定的接口不同。如果未设置此值,将使用侦听器的值。与侦听器不同,通告0.0.0.0原地址是无效的。

与侦听器不同的是,此属性中可以有重复的端口,因此可以将一个侦听器配置为通告另一个侦听程序的地址。这在使用外部负载平衡器的某些情况下非常有用

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:per-broker

auto.create.topics.enable

启用在服务器上自动创建主题

Type:boolean
Default:true
Valid Values:
Importance:high
Update Mode:read-only

auto.leader.rebalance.enable

启用自动引线平衡。后台线程定期检查分区前导的分布,可通过“leader.inbalance.check.interval.seconds”进行配置。如果领导者失衡超过`leader.imbalance.per.broker。百分比”,则会触发分区的首选引线重新平衡。

Type:boolean
Default:true
Valid Values:
Importance:high
Update Mode:read-only

background.threads

用于各种后台处理任务的线程数

Type:int
Default:10
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

broker.id

此服务器的代理id。如果未设置,将生成唯一的代理id。为了避免zookeeper生成的代理id和用户配置的代理id之间的冲突,生成的代理id从reserved.broker.max.id+1开始。

Type:int
Default:-1
Valid Values:
Importance:high
Update Mode:read-only

compression.type

指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“zip”、“snappy”、“lz4”、“zstd”)。它还接受“未压缩”,这相当于没有压缩;和“生产者”,这意味着保留生产者设置的原始压缩编解码器。

Type:string
Default:producer
Valid Values:[uncompressed, zstd, lz4, snappy, gzip, producer]
Importance:high
Update Mode:cluster-wide

control.plane.listener.name

用于控制器和代理之间通信的侦听器的名称。Broker将使用control.plane.listener.name在侦听器列表中查找端点,侦听来自控制器的连接。例如,如果代理的配置为:

listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER

启动时,代理将开始使用安全协议“SSL”侦听“192.1.1.8:9094”。在控制器端,当它通过zookeeper发现代理的发布端点时,它将使用control.plane.listener.name来查找端点,它将使用该端点来建立与代理的连接。

“端点” : [“INTERNAL://broker1.example.com:9092”,“EXTERNAL://broker1.example.com:9093”,"控制器的配置为:

listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
control.plane.listener.name = CONTROLLER

则控制器将使用带有安全协议“SSL”的“broker1.example.com:9094”连接到代理。

如果未显式配置,默认值将为空,并且控制器连接将没有专用端点。

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

controller.listener.names

控制器使用的侦听器名称的逗号分隔列表。如果在KRaft模式下运行,这是必需的。当与控制器仲裁通信时,代理将始终使用此列表中的第一个侦听器。

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

controller.quorum.election.backoff.max.ms

开始新选举前的最长时间(毫秒)。这用于二元指数退避机制,有助于防止选举僵局

Type:int
Default:1000 (1 second)
Valid Values:
Importance:high
Update Mode:read-only

controller.quorum.election.timeout.ms

在触发新选举之前无法从领导人那里获取信息的最长等待时间(毫秒)

Type:int
Default:1000 (1 second)
Valid Values:
Importance:high
Update Mode:read-only

controller.quorum.fetch.timeout.ms

在成为候选人并引发选民选举之前,现任领导人没有成功提名的最长时间;在询问领导人是否有新的时代之前,最长的时间没有收到大多数法定人数的请求

Type:int
Default:2000 (2 seconds)
Valid Values:
Importance:high
Update Mode:read-only

controller.quorum.voters

以逗号分隔的“{id}@{host}:{port}”条目列表中的一组投票者的id/端点信息的映射。例如:1@localhost:9092,2@localhost:9093,3@localhost:9094

Type:list
Default:“”
Valid Values:non-empty list
Importance:high
Update Mode:read-only

delete.topic.enable

启用删除主题。如果关闭此配置,则通过管理工具删除主题将无效

Type:boolean
Default:true
Valid Values:
Importance:high
Update Mode:read-only

early.start.listeners

一个逗号分隔的侦听器名称列表,可以在授权者完成初始化之前启动。当授权者依赖集群本身进行引导时,这非常有用,就像StandardAuthorizer(它将ACL存储在元数据日志中)的情况一样。默认情况下,controller.listener中包含所有侦听器。名字也将是早期开始的听众。如果侦听器接受外部通信,则它不应出现在此列表中。

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

leader.imbalance.check.interval.seconds

控制器触发分区重新平衡检查的频率

Type:long
Default:300
Valid Values:[1,…]
Importance:high
Update Mode:read-only

leader.imbalance.per.broker.percentage

每个经纪人允许的领先者失衡比率。如果超过每个经纪人的这个值,控制器将触发领先余额。该值以百分比表示

Type:int
Default:10
Valid Values:
Importance:high
Update Mode:read-only

listeners

侦听器列表-我们将侦听的URI和侦听器名称的逗号分隔列表。如果侦听器名称不是安全协议,则listener.security.protocol。还必须设置映射。

侦听器名称和端口号必须唯一。将主机名指定为0.0.0.0以绑定到所有接口。将主机名保留为空以绑定到默认接口。合法听众列表示例:

PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093

Type:string
Default:PLAINTEXT://:9092
Valid Values:
Importance:high
Update Mode:per-broker

log.dir

保存日志数据的目录(log.dirs属性的补充)

Type:string
Default:/tmp/kafka-logs
Valid Values:
Importance:high
Update Mode:read-only

log.dirs

存储日志数据的目录的逗号分隔列表。如果未设置,则为日志中的值。使用log.dir。

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

log.flush.interval.messages

在将消息刷新到磁盘之前,日志分区上累积的消息数

Type:long
Default:9223372036854775807
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

log.flush.interval.ms

任何主题中的消息在刷新到磁盘之前保留在内存中的最长时间(毫秒)。如果未设置,则为log.flush.scheduler.interval中的值。使用ms

Type:long
Default:null
Valid Values:
Importance:high
Update Mode:cluster-wide

log.flush.offset.checkpoint.interval.ms

更新作为日志恢复点的上次刷新的持久记录的频率

The frequency with which we update the persistent record of the last flush which acts as the log recovery point

Type:int
Default:60000 (1 minute)
Valid Values:[0,…]
Importance:high
Update Mode:read-only

log.flush.scheduler.interval.ms

日志刷新器检查是否需要将任何日志刷新到磁盘的频率(毫秒)

Type:long
Default:9223372036854775807
Valid Values:
Importance:high
Update Mode:read-only

log.flush.start.offset.checkpoint.interval.ms

更新日志开始偏移的持久记录的频率

Type:int
Default:60000 (1 minute)
Valid Values:[0,…]
Importance:high
Update Mode:read-only

log.retention.bytes

删除之前日志的最大大小

Type:long
Default:-1
Valid Values:
Importance:high
Update Mode:cluster-wide

log.retention.hours

日志文件在删除前保留的小时数(以小时为单位),是log.retention.ms属性的第三级

Type:int
Default:168
Valid Values:
Importance:high
Update Mode:read-only

log.retention.minutes

日志文件在删除前保留的分钟数(以分钟为单位),次于log.retention.ms属性。如果未设置,则使用log.retention.hours中的值

Type:int
Default:null
Valid Values:
Importance:high
Update Mode:read-only

log.retention.ms

删除日志文件之前保留日志文件的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值。如果设置为-1,则不应用时间限制。

Type:long
Default:null
Valid Values:
Importance:high
Update Mode:cluster-wide

log.roll.hours

新日志段转出之前的最长时间(小时),仅次于log.roll.ms属性

Type:int
Default:168
Valid Values:[1,…]
Importance:high
Update Mode:read-only

log.roll.jitter.hours

要从logRollTimeMillis(以小时为单位)中减去的最大抖动,从属于log.roll.jitter.ms属性

Type:int
Default:0
Valid Values:[0,…]
Importance:high
Update Mode:read-only

log.roll.jitter.ms

要从logRollTimeMillis中减去的最大抖动(以毫秒为单位)。如果未设置,则使用log.roll.jitter.hours中的值

Type:long
Default:null
Valid Values:
Importance:high
Update Mode:cluster-wide

log.roll.ms

新日志段转出之前的最长时间(毫秒)。如果未设置,则使用log.roll.hours中的值

Type:long
Default:null
Valid Values:
Importance:high
Update Mode:cluster-wide

log.segment.bytes

单个日志文件的最大大小

Type:int
Default:1073741824 (1 gibibyte)
Valid Values:[14,…]
Importance:high
Update Mode:cluster-wide

log.segment.delete.delay.ms

从文件系统中删除文件之前等待的时间

Type:long
Default:60000 (1 minute)
Valid Values:[0,…]
Importance:high
Update Mode:cluster-wide

message.max.bytes

Kafka允许的最大记录批处理大小(如果启用了压缩,则在压缩之后)。如果该值增加,并且存在早于0.10.2的使用者,则使用者的提取大小也必须增加,以便他们可以提取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。这可以通过主题级别“max.message.bytes”配置按主题进行设置。

Type:int
Default:1048588
Valid Values:[0,…]
Importance:high
Update Mode:cluster-wide

metadata.log.dir

这个配置决定了我们在KRaft模式下将集群的元数据日志放在哪里。如果没有设置,元数据日志将放在log.dirs的第一个日志目录中。

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

metadata.log.max.record.bytes.between.snapshots

这是日志中最新快照与生成新快照之前所需的高水位线之间的最大字节数。

Type:long
Default:20971520
Valid Values:[1,…]
Importance:high
Update Mode:read-only

metadata.log.segment.bytes

单个元数据日志文件的最大大小

Type:int
Default:1073741824 (1 gibibyte)
Valid Values:[12,…]
Importance:high
Update Mode:read-only

metadata.log.segment.ms

转出新元数据日志文件之前的最长时间(以毫秒为单位)。

Type:long
Default:604800000 (7 days)
Valid Values:
Importance:high
Update Mode:read-only

metadata.max.retention.bytes

The maximum combined size of the metadata log and snapshots before deleting old snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.

Type:long
Default:-1
Valid Values:
Importance:high
Update Mode:read-only

metadata.max.retention.ms

The number of milliseconds to keep a metadata log file or snapshot before deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.

Type:long
Default:604800000 (7 days)
Valid Values:
Importance:high
Update Mode:read-only

min.insync.replicas

When a producer sets acks to “all” (or “-1”), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.

Type:int
Default:1
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

node.id

The node ID associated with the roles this process is playing when process.roles is non-empty. Every node in a KRaft cluster must have a unique node.id, this includes broker and controller nodes. This is required configuration when running in KRaft mode.

Type:int
Default:-1
Valid Values:
Importance:high
Update Mode:read-only

num.io.threads

The number of threads that the server uses for processing requests, which may include disk I/O

Type:int
Default:8
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

num.network.threads

The number of threads that the server uses for receiving requests from the network and sending responses to the network

Type:int
Default:3
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

num.recovery.threads.per.data.dir

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown

Type:int
Default:1
Valid Values:[1,…]
Importance:high
Update Mode:cluster-wide

num.replica.alter.log.dirs.threads

The number of threads that can move replicas between log directories, which may include disk I/O

Type:int
Default:null
Valid Values:
Importance:high
Update Mode:read-only

num.replica.fetchers

Number of fetcher threads used to replicate records from each source broker. The total number of fetchers on each broker is bound by num.replica.fetchers multiplied by the number of brokers in the cluster.Increasing this value can increase the degree of I/O parallelism in the follower and leader broker at the cost of higher CPU and memory utilization.

Type:int
Default:1
Valid Values:
Importance:high
Update Mode:cluster-wide

offset.metadata.max.bytes

The maximum size for a metadata entry associated with an offset commit

Type:int
Default:4096 (4 kibibytes)
Valid Values:
Importance:high
Update Mode:read-only

offsets.commit.required.acks

The required acks before the commit can be accepted. In general, the default (-1) should not be overridden

Type:short
Default:-1
Valid Values:
Importance:high
Update Mode:read-only

offsets.commit.timeout.ms

Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.

Type:int
Default:5000 (5 seconds)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.load.buffer.size

Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large).

Type:int
Default:5242880
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.retention.check.interval.ms

Frequency at which to check for stale offsets

Type:long
Default:600000 (10 minutes)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.retention.minutes

After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.

Type:int
Default:10080
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.topic.compression.codec

Compression codec for the offsets topic - compression may be used to achieve “atomic” commits

Type:int
Default:0
Valid Values:
Importance:high
Update Mode:read-only

offsets.topic.num.partitions

The number of partitions for the offset commit topic (should not change after deployment)

Type:int
Default:50
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.topic.replication.factor

The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Type:short
Default:3
Valid Values:[1,…]
Importance:high
Update Mode:read-only

offsets.topic.segment.bytes

The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads

Type:int
Default:104857600 (100 mebibytes)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

process.roles

The roles that this process plays: ‘broker’, ‘controller’, or ‘broker,controller’ if it is both. This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for Zookeeper clusters.

Type:list
Default:“”
Valid Values:[broker, controller]
Importance:high
Update Mode:read-only

queued.max.requests

The number of queued requests allowed for data-plane, before blocking the network threads

Type:int
Default:500
Valid Values:[1,…]
Importance:high
Update Mode:read-only

replica.fetch.min.bytes

Minimum bytes expected for each fetch response. If not enough bytes, wait up to replica.fetch.wait.max.ms (broker config).

Type:int
Default:1
Valid Values:
Importance:high
Update Mode:read-only

replica.fetch.wait.max.ms

The maximum wait time for each fetcher request issued by follower replicas. This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics

Type:int
Default:500
Valid Values:
Importance:high
Update Mode:read-only

replica.high.watermark.checkpoint.interval.ms

The frequency with which the high watermark is saved out to disk

Type:long
Default:5000 (5 seconds)
Valid Values:
Importance:high
Update Mode:read-only

replica.lag.time.max.ms

If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr

Type:long
Default:30000 (30 seconds)
Valid Values:
Importance:high
Update Mode:read-only

replica.socket.receive.buffer.bytes

The socket receive buffer for network requests

Type:int
Default:65536 (64 kibibytes)
Valid Values:
Importance:high
Update Mode:read-only

replica.socket.timeout.ms

The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms

Type:int
Default:30000 (30 seconds)
Valid Values:
Importance:high
Update Mode:read-only

request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

Type:int
Default:30000 (30 seconds)
Valid Values:
Importance:high
Update Mode:read-only

sasl.mechanism.controller.protocol

SASL mechanism used for communication with controllers. Default is GSSAPI.

Type:string
Default:GSSAPI
Valid Values:
Importance:high
Update Mode:read-only

socket.receive.buffer.bytes

The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.

Type:int
Default:102400 (100 kibibytes)
Valid Values:
Importance:high
Update Mode:read-only

socket.request.max.bytes

The maximum number of bytes in a socket request

Type:int
Default:104857600 (100 mebibytes)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

socket.send.buffer.bytes

The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.

Type:int
Default:102400 (100 kibibytes)
Valid Values:
Importance:high
Update Mode:read-only

transaction.max.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Type:int
Default:900000 (15 minutes)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transaction.state.log.load.buffer.size

Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large).

Type:int
Default:5242880
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transaction.state.log.min.isr

Overridden min.insync.replicas config for the transaction topic.

Type:int
Default:2
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transaction.state.log.num.partitions

The number of partitions for the transaction topic (should not change after deployment).

Type:int
Default:50
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transaction.state.log.replication.factor

The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Type:short
Default:3
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transaction.state.log.segment.bytes

The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads

Type:int
Default:104857600 (100 mebibytes)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

transactional.id.expiration.ms

The time in ms that the transaction coordinator will wait without receiving any transaction status updates for the current transaction before expiring its transactional id. This setting also influences producer id expiration - producer ids are expired once this time has elapsed after the last write with the given producer id. Note that producer ids may expire sooner if the last write from the producer id is deleted due to the topic’s retention settings.

Type:int
Default:604800000 (7 days)
Valid Values:[1,…]
Importance:high
Update Mode:read-only

unclean.leader.election.enable

Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss

Type:boolean
Default:false
Valid Values:
Importance:high
Update Mode:cluster-wide

zookeeper.connect

Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.
The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.

Type:string
Default:null
Valid Values:
Importance:high
Update Mode:read-only

zookeeper.connection.timeout.ms

The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used

Type:int
Default:null
Valid Values:
Importance:high
Update Mode:read-only

zookeeper.max.in.flight.requests

The maximum number of unacknowledged requests the client will send to Zookeeper before blocking.

Type:int
Default:10
Valid Values:[1,…]
Importance:high
Update Mode:read-only

zookeeper.session.timeout.ms

Zookeeper session timeout

Type:int
Default:18000 (18 seconds)
Valid Values:
Importance:high
Update Mode:read-only

zookeeper.set.acl

Set client to use secure ACLs

Type:boolean
Default:false
Valid Values:
Importance:high
Update Mode:read-only

broker.heartbeat.interval.ms

The length of time in milliseconds between broker heartbeats. Used when running in KRaft mode.

Type:int
Default:2000 (2 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

broker.id.generation.enable

Enable automatic broker id generation on the server. When enabled the value configured for reserved.broker.max.id should be reviewed.

Type:boolean
Default:true
Valid Values:
Importance:medium
Update Mode:read-only

broker.rack

Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: RACK1, us-east-1d

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

broker.session.timeout.ms

The length of time in milliseconds that a broker lease lasts if no heartbeats are made. Used when running in KRaft mode.

Type:int
Default:9000 (9 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

connections.max.idle.ms

Idle connections timeout: the server socket processor threads close the connections that idle more than this

Type:long
Default:600000 (10 minutes)
Valid Values:
Importance:medium
Update Mode:read-only

connections.max.reauth.ms

When explicitly set to a positive number (the default is 0, not a positive number), a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000

Type:long
Default:0
Valid Values:
Importance:medium
Update Mode:read-only

controlled.shutdown.enable

Enable controlled shutdown of the server

Type:boolean
Default:true
Valid Values:
Importance:medium
Update Mode:read-only

controlled.shutdown.max.retries

Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens

Type:int
Default:3
Valid Values:
Importance:medium
Update Mode:read-only

controlled.shutdown.retry.backoff.ms

Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying.

Type:long
Default:5000 (5 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

controller.quorum.append.linger.ms

The duration in milliseconds that the leader will wait for writes to accumulate before flushing them to disk.

Type:int
Default:25
Valid Values:
Importance:medium
Update Mode:read-only

controller.quorum.request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

Type:int
Default:2000 (2 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

controller.socket.timeout.ms

The socket timeout for controller-to-broker channels

Type:int
Default:30000 (30 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

default.replication.factor

The default replication factors for automatically created topics

Type:int
Default:1
Valid Values:
Importance:medium
Update Mode:read-only

delegation.token.expiry.time.ms

The token validity time in miliseconds before the token needs to be renewed. Default value 1 day.

Type:long
Default:86400000 (1 day)
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

delegation.token.master.key

DEPRECATED: An alias for delegation.token.secret.key, which should be used instead of this config.

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

delegation.token.max.lifetime.ms

The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days.

Type:long
Default:604800000 (7 days)
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

delegation.token.secret.key

Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. If the key is not set or set to empty string, brokers will disable the delegation token support.

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

delete.records.purgatory.purge.interval.requests

The purge interval (in number of requests) of the delete records request purgatory

Type:int
Default:1
Valid Values:
Importance:medium
Update Mode:read-only

fetch.max.bytes

The maximum number of bytes we will return for a fetch request. Must be at least 1024.

Type:int
Default:57671680 (55 mebibytes)
Valid Values:[1024,…]
Importance:medium
Update Mode:read-only

fetch.purgatory.purge.interval.requests

The purge interval (in number of requests) of the fetch request purgatory

Type:int
Default:1000
Valid Values:
Importance:medium
Update Mode:read-only

group.initial.rebalance.delay.ms

The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.

Type:int
Default:3000 (3 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

group.max.session.timeout.ms

The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.

Type:int
Default:1800000 (30 minutes)
Valid Values:
Importance:medium
Update Mode:read-only

group.max.size

The maximum number of consumers that a single consumer group can accommodate.

Type:int
Default:2147483647
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

group.min.session.timeout.ms

The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources.

Type:int
Default:6000 (6 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

initial.broker.registration.timeout.ms

用于代理之间通信的侦听器的名称。如果未设置此属性,监听程序名称由security.inter.broker.protocol定义。同时设置此属性和security.inter.broker.protocol属性是错误的。

Type:int
Default:60000 (1 minute)
Valid Values:
Importance:medium
Update Mode:read-only

inter.broker.listener.name

Name of listener used for communication between brokers. If this is unset, the listener name is defined by security.inter.broker.protocol. It is an error to set this and security.inter.broker.protocol properties at the same time.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

inter.broker.protocol.version

指定将使用哪个版本的代理间协议。
这通常是在所有代理升级到新版本后发生的。
一些有效值的示例有:0.8.0、0.8.1、0.8.1.1、0.8.2、0.8.2.0、0.8.2.1、0.9.0.0、0.9.0.1检查元数据版本以获取完整列表。

Type:string
Default:3.3-IV3
Valid Values:[0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2-IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2-IV0, 2.2-IV1, 2.3-IV0, 2.3-IV1, 2.4-IV0, 2.4-IV1, 2.5-IV0, 2.6-IV0, 2.7-IV0, 2.7-IV1, 2.7-IV2, 2.8-IV0, 2.8-IV1, 3.0-IV0, 3.0-IV1, 3.1-IV0, 3.2-IV0, 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3]
Importance:medium
Update Mode:read-only

log.cleaner.backoff.ms

没有日志要清理时的睡眠时间

Type:long
Default:15000 (15 seconds)
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.cleaner.dedupe.buffer.size

The total memory used for log deduplication across all cleaner threads

Type:long
Default:134217728
Valid Values:
Importance:medium
Update Mode:cluster-wide

log.cleaner.delete.retention.ms

The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).

Type:long
Default:86400000 (1 day)
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.cleaner.enable

Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

Type:boolean
Default:true
Valid Values:
Importance:medium
Update Mode:read-only

log.cleaner.io.buffer.load.factor

Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value will allow more log to be cleaned at once but will lead to more hash collisions

Type:double
Default:0.9
Valid Values:
Importance:medium
Update Mode:cluster-wide

log.cleaner.io.buffer.size

The total memory used for log cleaner I/O buffers across all cleaner threads

Type:int
Default:524288
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.cleaner.io.max.bytes.per.second

The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average

Type:double
Default:1.7976931348623157E308
Valid Values:
Importance:medium
Update Mode:cluster-wide

log.cleaner.max.compaction.lag.ms

The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.

Type:long
Default:9223372036854775807
Valid Values:[1,…]
Importance:medium
Update Mode:cluster-wide

log.cleaner.min.cleanable.ratio

The minimum ratio of dirty log to total log for a log to eligible for cleaning. If the log.cleaner.max.compaction.lag.ms or the log.cleaner.min.compaction.lag.ms configurations are also specified, then the log compactor considers the log eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the log.cleaner.min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the log.cleaner.max.compaction.lag.ms period.

Type:double
Default:0.5
Valid Values:[0,…,1]
Importance:medium
Update Mode:cluster-wide

log.cleaner.min.compaction.lag.ms

The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

Type:long
Default:0
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.cleaner.threads

The number of background threads to use for log cleaning

Type:int
Default:1
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.cleanup.policy

The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: “delete” and “compact”

Type:list
Default:delete
Valid Values:[compact, delete]
Importance:medium
Update Mode:cluster-wide

log.index.interval.bytes

The interval with which we add an entry to the offset index

Type:int
Default:4096 (4 kibibytes)
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.index.size.max.bytes

The maximum size in bytes of the offset index

Type:int
Default:10485760 (10 mebibytes)
Valid Values:[4,…]
Importance:medium
Update Mode:cluster-wide

log.message.format.version

Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check MetadataVersion for more details. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don’t understand.

Type:string
Default:3.0-IV1
Valid Values:[0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2-IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2-IV0, 2.2-IV1, 2.3-IV0, 2.3-IV1, 2.4-IV0, 2.4-IV1, 2.5-IV0, 2.6-IV0, 2.7-IV0, 2.7-IV1, 2.7-IV2, 2.8-IV0, 2.8-IV1, 3.0-IV0, 3.0-IV1, 3.1-IV0, 3.2-IV0, 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3]
Importance:medium
Update Mode:read-only

log.message.timestamp.difference.max.ms

The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling.

Type:long
Default:9223372036854775807
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

log.message.timestamp.type

Define whether the timestamp in the message is message create time or log append time. The value should be either CreateTime or LogAppendTime

Type:string
Default:CreateTime
Valid Values:[CreateTime, LogAppendTime]
Importance:medium
Update Mode:cluster-wide

log.preallocate

Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.

Type:boolean
Default:false
Valid Values:
Importance:medium
Update Mode:cluster-wide

log.retention.check.interval.ms

The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion

Type:long
Default:300000 (5 minutes)
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

max.connection.creation.rate

The maximum connection creation rate we allow in the broker at any time. Listener-level limits may also be configured by prefixing the config name with the listener prefix, for example, listener.name.internal.max.connection.creation.rate.Broker-wide connection rate limit should be configured based on broker capacity while listener limits should be configured based on application requirements. New connections will be throttled if either the listener or the broker limit is reached, with the exception of inter-broker listener. Connections on the inter-broker listener will be throttled only when the listener-level rate limit is reached.

Type:int
Default:2147483647
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

max.connections

The maximum number of connections we allow in the broker at any time. This limit is applied in addition to any per-ip limits configured using max.connections.per.ip. Listener-level limits may also be configured by prefixing the config name with the listener prefix, for example, listener.name.internal.max.connections. Broker-wide limit should be configured based on broker capacity while listener limits should be configured based on application requirements. New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case.

Type:int
Default:2147483647
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

max.connections.per.ip

The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides configured using max.connections.per.ip.overrides property. New connections from the ip address are dropped if the limit is reached.

Type:int
Default:2147483647
Valid Values:[0,…]
Importance:medium
Update Mode:cluster-wide

max.connections.per.ip.overrides

A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is “hostName:100,127.0.0.1:200”

Type:string
Default:“”
Valid Values:
Importance:medium
Update Mode:cluster-wide

max.incremental.fetch.session.cache.slots

The maximum number of incremental fetch sessions that we will maintain.

Type:int
Default:1000
Valid Values:[0,…]
Importance:medium
Update Mode:read-only

num.partitions

The default number of log partitions per topic

Type:int
Default:1
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

password.encoder.old.secret

The old secret that was used for encoding dynamically configured passwords. This is required only when the secret is updated. If specified, all dynamically encoded passwords are decoded using this old secret and re-encoded using password.encoder.secret when broker starts up.

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

password.encoder.secret

The secret used for encoding dynamically configured passwords for this broker.

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

principal.builder.class

The fully qualified name of a class that implements the KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during authorization. If no principal builder is defined, the default behavior depends on the security protocol in use. For SSL authentication, the principal will be derived using the rules defined by ssl.principal.mapping.rules applied on the distinguished name from the client certificate if one is provided; otherwise, if client authentication is not required, the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the rules defined by sasl.kerberos.principal.to.local.rules if GSSAPI is in use, and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.

Type:class
Default:org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
Valid Values:
Importance:medium
Update Mode:per-broker

producer.purgatory.purge.interval.requests

The purge interval (in number of requests) of the producer request purgatory

Type:int
Default:1000
Valid Values:
Importance:medium
Update Mode:read-only

queued.max.request.bytes

The number of queued bytes allowed before no more requests are read

Type:long
Default:-1
Valid Values:
Importance:medium
Update Mode:read-only

replica.fetch.backoff.ms

The amount of time to sleep when fetch partition error occurs.

Type:int
Default:1000 (1 second)
Valid Values:[0,…]
Importance:medium
Update Mode:read-only

replica.fetch.max.bytes

The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

Type:int
Default:1048576 (1 mebibyte)
Valid Values:[0,…]
Importance:medium
Update Mode:read-only

replica.fetch.response.max.bytes

Maximum bytes expected for the entire fetch response. Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

Type:int
Default:10485760 (10 mebibytes)
Valid Values:[0,…]
Importance:medium
Update Mode:read-only

replica.selector.class

The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

reserved.broker.max.id

Max number that can be used for a broker.id

Type:int
Default:1000
Valid Values:[0,…]
Importance:medium
Update Mode:read-only

sasl.client.callback.handler.class

The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.

Type:class
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.enabled.mechanisms

The list of SASL mechanisms enabled in the Kafka server. The list may contain any mechanism for which a security provider is available. Only GSSAPI is enabled by default.

Type:list
Default:GSSAPI
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.jaas.config

JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here. The format for the value is: loginModuleClass controlFlag (optionName=optionValue)*;. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.kinit.cmd

Kerberos kinit command path.

Type:string
Default:/usr/bin/kinit
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.min.time.before.relogin

Login thread sleep time between refresh attempts.

Type:long
Default:60000
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.principal.to.local.rules

A list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}. For more details on the format please see security authorization and acls. Note that this configuration is ignored if an extension of KafkaPrincipalBuilder is provided by the principal.builder.class configuration.

Type:list
Default:DEFAULT
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.service.name

The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.ticket.renew.jitter

Percentage of random jitter added to the renewal time.

Type:double
Default:0.05
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.kerberos.ticket.renew.window.factor

Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.

Type:double
Default:0.8
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.login.callback.handler.class

The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

Type:class
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.login.class

The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin

Type:class
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.login.refresh.buffer.seconds

The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

Type:short
Default:300
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.login.refresh.min.period.seconds

The desired minimum time for the login refresh thread to wait before refreshing a credential, in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

Type:short
Default:60
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.login.refresh.window.factor

Login refresh thread will sleep until the specified window factor relative to the credential’s lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER.

Type:double
Default:0.8
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.login.refresh.window.jitter

The maximum amount of random jitter relative to the credential’s lifetime that is added to the login refresh thread’s sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.

Type:double
Default:0.05
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.mechanism.inter.broker.protocol

SASL mechanism used for inter-broker communication. Default is GSSAPI.

Type:string
Default:GSSAPI
Valid Values:
Importance:medium
Update Mode:per-broker

sasl.oauthbearer.jwks.endpoint.url

The OAuth/OIDC provider URL from which the provider’s JWKS (JSON Web Key Set) can be retrieved. The URL can be HTTP(S)-based or file-based. If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a “kid” header claim value that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received. If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a “kid” header value that isn’t in the JWKS file, the broker will reject the JWT and authentication will fail.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.oauthbearer.token.endpoint.url

The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer’s token endpoint URL to which requests will be made to login based on the configuration in sasl.jaas.config. If the URL is file-based, it specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.server.callback.handler.class

The fully qualified name of a SASL server callback handler class that implements the AuthenticateCallbackHandler interface. Server callback handlers must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler.

Type:class
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

sasl.server.max.receive.size

The maximum receive size allowed before and during initial SASL authentication. Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice, PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.

Type:int
Default:524288
Valid Values:
Importance:medium
Update Mode:read-only

security.inter.broker.protocol

Security protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time.

Type:string
Default:PLAINTEXT
Valid Values:[PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
Importance:medium
Update Mode:read-only

socket.connection.setup.timeout.max.ms

The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.

Type:long
Default:30000 (30 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

socket.connection.setup.timeout.ms

The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.

Type:long
Default:10000 (10 seconds)
Valid Values:
Importance:medium
Update Mode:read-only

socket.listen.backlog.size

The maximum number of pending connections on the socket. In Linux, you may also need to configure somaxconn and tcp_max_syn_backlog kernel parameters accordingly to make the configuration takes effect.

Type:int
Default:50
Valid Values:[1,…]
Importance:medium
Update Mode:read-only

ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.

Type:list
Default:“”
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.client.auth

Configures kafka broker to request client authentication. The following settings are common:

  • ssl.client.auth=required If set to required client authentication is required.
  • ssl.client.auth=requested This means client authentication is optional. unlike required, if this option is set client can choose not to provide authentication information about itself
  • ssl.client.auth=none This means client authentication is not needed.
Type:string
Default:none
Valid Values:[required, requested, none]
Importance:medium
Update Mode:per-broker

ssl.enabled.protocols

The list of protocols enabled for SSL connections. The default is ‘TLSv1.2,TLSv1.3’ when running with Java 11 or newer, ‘TLSv1.2’ otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the config documentation for ssl.protocol.

Type:list
Default:TLSv1.2,TLSv1.3
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.key.password

The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key’.

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keymanager.algorithm

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

Type:string
Default:SunX509
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keystore.certificate.chain

Certificate chain in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with a list of X.509 certificates

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keystore.key

Private key in the format specified by ‘ssl.keystore.type’. Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, key password must be specified using ‘ssl.key.password’

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way authentication for client.

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keystore.password

密钥存储文件的存储密码。这对于客户端是可选的,只有在配置了“ssl.keystore.location”时才需要。PEM格式不支持密钥存储密码。

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.keystore.type

密钥存储文件的文件格式。这对客户端是可选的。默认“ssl.engine.factory.class”当前支持的值有[JKS,PKCS12,PEM]。

Type:string
Default:JKS
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.protocol

用于生成SSLContext的SSL协议。使用Java 11或更新版本运行时,默认为“TLSv1.3”,否则为“TLSv1.2”。对于大多数用例来说,这个值应该是合适的。最近的JVM中允许的值是“TLSv1.2”和“TLSv1.3”。TLS ‘,’ TLSv1.1 ‘,’ SSL ‘,’ SSLv2 ‘和’ SSLv3 '可能在较旧的JVM中受支持,但由于已知的安全漏洞,不鼓励使用它们。使用此配置和“ssl.enabled.protocols”的默认值,如果服务器不支持“TLSv1.3”,客户端将降级到“TLSv1.2”。如果此配置设置为“TLSv1.2”,客户端将不会使用“TLSv1.3”,即使它是ssl.enabled.protocols中的值之一,并且服务器仅支持“TLSv1.3”。

Type:string
Default:TLSv1.3
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.provider

用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供者。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.trustmanager.algorithm

信任管理器工厂用于SSL连接的算法。默认值是为Java虚拟机配置的信任管理器工厂算法。

Type:string
Default:PKIX
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.truststore.certificates

“ssl.truststore.type”指定格式的可信证书。默认SSL引擎工厂仅支持带有X.509证书的PEM格式。

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.truststore.location

信任存储文件的位置。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.truststore.password

信任存储文件的密码。如果未设置密码,仍将使用配置的信任存储文件,但完整性检查将被禁用。PEM格式不支持信任存储密码。

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:per-broker

ssl.truststore.type

信任存储文件的文件格式。默认“ssl.engine.factory.class”当前支持的值有[JKS,PKCS12,PEM]。

Type:string
Default:JKS
Valid Values:
Importance:medium
Update Mode:per-broker

zookeeper.clientCnxnSocket

使用TLS连接到zookeeper时,通常设置为“org . Apache . ZooKeeper . clientcnxnsockettnetty”。覆盖通过同名的“zookeeper.clientCnxnSocket”系统属性设置的任何显式值。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.client.enable

设置客户端在连接到ZooKeeper时使用TLS。显式值会覆盖通过“zookeeper.client.secure”系统属性设置的任何值(注意不同的名称)。如果两者都未设置,则默认为false为true时,必须设置“zookeeper.clientCnxnSocket ”(通常设置为“org . Apache . zookeeper . clientcnxnsockettnetty ”);要设置的其他值可以包括“zookeeper.ssl.cipher.suites”、“zookeeper.ssl.crl.enable”、“zookeeper . SSL . enabled . protocols”、“zookeeper . SSL . endpoint . identificati on . algorithm”、“zookeeper . SSL . keystore . location”、“zookeeper . SSL . keystore . password”、“zookeeper.ssl.keystore.type”、“zookeeper.ssl.ocsp.enable”、“zookeeper.ssl.protocol”、“zookeeper . SSL . SSL . protocol

Type:boolean
Default:false
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.keystore.location

使用具有与ZooKeeper的TLS连接的客户端证书时的密钥库位置。覆盖通过“zookeeper . SSL . keystore . location”系统属性设置的任何显式值(注意camelCase)。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.keystore.password

使用具有与ZooKeeper的TLS连接的客户端证书时的密钥库密码。覆盖通过“zookeeper . SSL . keystore . password”系统属性设置的任何显式值(注意camelCase)。注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同;否则,连接Zookeeper的尝试将会失败。

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.keystore.type

使用客户端证书与ZooKeeper进行TLS连接时的密钥库类型。覆盖通过“zookeeper.ssl.keyStore.type”系统属性设置的任何显式值(注意camelCase)。默认值“null”表示将根据密钥库的文件扩展名自动检测类型。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.truststore.location

使用TLS连接到ZooKeeper时的信任库位置。覆盖通过“zookeeper . SSL . trust store . location”系统属性设置的任何显式值(注意camelCase)。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.truststore.password

使用TLS连接到ZooKeeper时的信任库密码。覆盖通过“zookeeper . SSL . trust store . password”系统属性设置的任何显式值(注意camelCase)。

Type:password
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

zookeeper.ssl.truststore.type

使用TLS连接到ZooKeeper时的信任库类型。覆盖通过“zookeeper.ssl.trustStore.type”系统属性设置的任何显式值(注意camelCase)。默认值“null”表示将根据信任库的文件扩展名自动检测类型。

Type:string
Default:null
Valid Values:
Importance:medium
Update Mode:read-only

alter.config.policy.class.name

应该用于验证的alter configs策略类。该类应实现“org . Apache . Kafka . server . policy . alterconfigpolicy”接口。

Type:class
Default:null
Valid Values:
Importance:low
Update Mode:read-only

alter.log.dirs.replication.quota.window.num

为更改日志目录复制配额而保留在内存中的样本数

Type:int
Default:11
Valid Values:[1,…]
Importance:low
Update Mode:read-only

alter.log.dirs.replication.quota.window.size.seconds

alter log dirs复制配额的每个样本的时间跨度

Type:int
Default:1
Valid Values:[1,…]
Importance:low
Update Mode:read-only

authorizer.class.name

实现“org . Apache . Kafka . server . author izer . author izer”接口的类的完全限定名,代理使用该接口进行授权。

Type:string
Default:“”
Valid Values:non-null string
Importance:low
Update Mode:read-only

client.quota.callback.class

实现ClientQuotaCallback接口的类的完全限定名,该接口用于确定应用于客户端请求的配额限制。默认情况下,会应用存储在ZooKeeper中的< user >和< client-id >配额。对于任何给定的请求,将应用与会话的用户主体和请求的客户端id相匹配的最具体的配额。

Type:class
Default:null
Valid Values:
Importance:low
Update Mode:read-only

connection.failed.authentication.delay.ms

身份验证失败时的连接关闭延迟:这是身份验证失败时连接关闭将延迟的时间(以毫秒为单位)。必须将其配置为小于connections.max.idle.ms,以防止连接超时。

Type:int
Default:100
Valid Values:[0,…]
Importance:low
Update Mode:read-only

controller.quorum.retry.backoff.ms

在尝试重试给定主题分区的失败请求之前等待的时间。这避免了在某些失败场景下重复发送请求。

Type:int
Default:20
Valid Values:
Importance:low
Update Mode:read-only

controller.quota.window.num

为控制器变异配额保留在内存中的样本数

Type:int
Default:11
Valid Values:[1,…]
Importance:low
Update Mode:read-only

controller.quota.window.size.seconds

控制器突变配额的每个样本的时间跨度

Type:int
Default:1
Valid Values:[1,…]
Importance:low
Update Mode:read-only

create.topic.policy.class.name

应该用于验证的创建主题策略类。该类应实现“org . Apache . Kafka . server . policy . createtopicpolicy”接口。

Type:class
Default:null
Valid Values:
Importance:low
Update Mode:read-only

delegation.token.expiry.check.interval.ms

删除过期委派令牌的扫描间隔。

Type:long
Default:3600000 (1 hour)
Valid Values:[1,…]
Importance:low
Update Mode:read-only

kafka.metrics.polling.interval.secs

可以在kafka.metrics.reporters实现中使用的度量轮询间隔(秒)。

Type:int
Default:10
Valid Values:[1,…]
Importance:low
Update Mode:read-only

kafka.metrics.reporters

用作Yammer度量自定义报告器的类列表。报告者应该实现“Kafka . metrics . kafkametricsreporter”特征。如果客户端希望在自定义报告器上公开JMX操作,自定义报告器需要额外实现一个扩展“Kafka . metrics . kafkametricsreportermbean”特征的MBean特征,以便注册的MBean符合标准MBean约定。

Type:list
Default:“”
Valid Values:
Importance:low
Update Mode:read-only

listener.security.protocol.map

侦听器名称和安全协议之间的映射。这必须被定义为同一安全协议可用于多个端口或IP。例如,内部和外部流量可以分开,即使两者都需要SSL。具体地说,用户可以用名称INTERNAL和EXTERNAL来定义侦听器,这个属性为: INTERNAL:SSL,EXTERNAL:SSL。如图所示,键和值由冒号分隔,映射条目由逗号分隔。每个侦听器名称在映射中应该只出现一次。通过向配置名添加规范化前缀(监听程序名为小写),可以为每个监听程序配置不同的安全(SSL和SASL)设置。例如,要为内部侦听器设置不同的密钥库,可以设置名为“listener . name . INTERNAL . SSL . keystore . location”的配置。如果未设置监听程序名称的配置,该配置将回退到通用配置(即“ssl.keystore.location”)。注意,在KRaft中,如果没有提供显式映射并且没有使用其他安全协议,则假定从由“controller.listener.names”定义的监听器名称到明文的默认映射。

Type:string
Default:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
Valid Values:
Importance:low
Update Mode:per-broker

log.message.downconversion.enable

此配置控制是否启用消息格式的下转换来满足消费请求。当设置为“false”时,broker不会为期望旧消息格式的使用者执行向下转换。对于来自此类旧客户端的消费请求,代理以“不支持的版本”错误进行响应。此配置不适用于复制到追随者可能需要的任何消息格式转换。

Type:boolean
Default:true
Valid Values:
Importance:low
Update Mode:cluster-wide

metadata.max.idle.interval.ms

此配置控制活动控制器将无操作记录写入元数据分区的频率。如果该值为0,则不会将无操作记录追加到元数据分区中。默认值为500

Type:int
Default:500
Valid Values:[0,…]
Importance:low
Update Mode:read-only

metric.reporters

用作度量报告者的类列表。实现“org . Apache . Kafka . common . metrics . metrics reporter”接口允许插入将被通知新度量创建的类。JmxReporter总是包含在注册JMX统计信息中。

Type:list
Default:“”
Valid Values:
Importance:low
Update Mode:cluster-wide

metrics.num.samples

为计算指标而维护的样本数。

Type:int
Default:2
Valid Values:[1,…]
Importance:low
Update Mode:read-only

metrics.recording.level

指标的最高记录级别。

Type:string
Default:INFO
Valid Values:
Importance:low
Update Mode:read-only

metrics.sample.window.ms

计算度量样本的时间窗口。

Type:long
Default:30000 (30 seconds)
Valid Values:[1,…]
Importance:low
Update Mode:read-only

password.encoder.cipher.algorithm

用于编码动态配置密码的密码算法。

Type:string
Default:AES/CBC/PKCS5Padding
Valid Values:
Importance:low
Update Mode:read-only

password.encoder.iterations

用于编码动态配置密码的迭代计数。

Type:int
Default:4096
Valid Values:[1024,…]
Importance:low
Update Mode:read-only

password.encoder.key.length

用于对动态配置的密码进行编码的密钥长度。

Type:int
Default:128
Valid Values:[8,…]
Importance:low
Update Mode:read-only

password.encoder.keyfactory.algorithm

用于对动态配置的密码进行编码的SecretKeyFactory算法。如果可用,默认为pbk df 2 with macsha 512,否则为pbk df 2 with macsha 1。

Type:string
Default:null
Valid Values:
Importance:low
Update Mode:read-only

quota.window.num

为客户端配额保留在内存中的样本数

Type:int
Default:11
Valid Values:[1,…]
Importance:low
Update Mode:read-only

quota.window.size.seconds

客户端配额的每个样本的时间跨度

Type:int
Default:1
Valid Values:[1,…]
Importance:low
Update Mode:read-only

replication.quota.window.num

为复制配额保留在内存中的样本数

Type:int
Default:11
Valid Values:[1,…]
Importance:low
Update Mode:read-only

replication.quota.window.size.seconds

复制配额的每个样本的时间跨度

Type:int
Default:1
Valid Values:[1,…]
Importance:low
Update Mode:read-only

sasl.login.connect.timeout.ms

外部身份验证提供程序连接超时的(可选)值,以毫秒为单位。目前仅适用于OAUTHBEARER。

Type:int
Default:null
Valid Values:
Importance:low
Update Mode:read-only

sasl.login.read.timeout.ms

外部身份验证提供程序读取超时的(可选)值,以毫秒为单位。目前仅适用于OAUTHBEARER。

Type:int
Default:null
Valid Values:
Importance:low
Update Mode:read-only

sasl.login.retry.backoff.max.ms

(可选)外部身份验证提供程序登录尝试之间的最大等待时间值,以毫秒为单位。Login使用指数回退算法,其初始等待基于sasl.login.retry.backoff.ms设置,并且在两次尝试之间的等待时间将加倍,直到达到sasl . log in . retry . back off . max . ms设置指定的最大等待时间。目前仅适用于OAUTHBEARER。

Type:long
Default:10000 (10 seconds)
Valid Values:
Importance:low
Update Mode:read-only

sasl.login.retry.backoff.ms

两次尝试登录外部身份验证提供程序之间的初始等待(可选)值,以毫秒为单位。Login使用指数回退算法,其初始等待基于sasl.login.retry.backoff.ms设置,并且在两次尝试之间的等待时间将加倍,直到达到sasl . log in . retry . back off . max . ms设置指定的最大等待时间。目前仅适用于OAUTHBEARER。

Type:long
Default:100
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.clock.skew.seconds

以秒为单位的(可选)值,允许OAuth/OIDC身份提供者和代理之间的时间差。

Type:int
Default:30
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.expected.audience

(可选)以逗号分隔的设置,代理使用该设置来验证JWT是否是为预期受众之一发布的。将检查JWT的标准OAuth“aud”索赔,如果设置了该值,代理将匹配JWT的“aud”索赔的值,以查看是否存在精确匹配。如果不匹配,代理将拒绝JWT,身份验证将失败。

Type:list
Default:null
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.expected.issuer

代理用于验证JWT是否由预期发行者创建的(可选)设置。将检查JWT的标准OAuth“iss”索赔,如果设置了该值,代理将将其与JWT的“ISS”索赔完全匹配。如果不匹配,代理将拒绝JWT,身份验证将失败。

Type:string
Default:null
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.jwks.endpoint.refresh.ms

代理在刷新其JWKS (JSON Web密钥集)缓存(包含用于验证JWT签名的密钥)之间等待的(可选)值(毫秒)。

Type:long
Default:3600000 (1 hour)
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms

从外部身份验证提供程序检索JWKS (JSON Web密钥集)的两次尝试之间的最大等待时间值(可选),以毫秒为单位。JWKS检索使用指数回退算法,初始等待基于sasl . oauth bearer . JWKS . endpoint . retry . back off . ms设置,两次尝试之间的等待时间将加倍,直到达到sasl . oauth bearer . JWKS . endpoint . retry . back off . max . ms设置指定的最大等待时间。

Type:long
Default:10000 (10 seconds)
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.jwks.endpoint.retry.backoff.ms

来自外部身份验证提供程序的JWKS (JSON Web密钥集)检索尝试之间的初始等待值(可选),以毫秒为单位。JWKS检索使用指数回退算法,初始等待基于sasl . oauth bearer . JWKS . endpoint . retry . back off . ms设置,两次尝试之间的等待时间将加倍,直到达到sasl . oauth bearer . JWKS . endpoint . retry . back off . max . ms设置指定的最大等待时间。

Type:long
Default:100
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.scope.claim.name

作用域的OAuth声明通常被命名为“scope”,但是如果OAuth/OIDC提供者为该声明使用不同的名称,则该(可选)设置可以为JWT有效负载的声明中包括的作用域提供不同的名称。

Type:string
Default:scope
Valid Values:
Importance:low
Update Mode:read-only

sasl.oauthbearer.sub.claim.name

主题的OAuth声明通常被命名为“sub ”,但是如果OAuth/OIDC提供者为该声明使用不同的名称,则该(可选)设置可以为包括在JWT有效负载的声明中的主题提供不同的名称。

Type:string
Default:sub
Valid Values:
Importance:low
Update Mode:read-only

security.providers

可配置的创建者类列表,每个类返回一个实现安全算法的提供者。这些类应实现“org . Apache . Kafka . common . security . auth . security provider creator”接口。

Type:string
Default:null
Valid Values:
Importance:low
Update Mode:read-only

ssl.endpoint.identification.algorithm

使用服务器证书验证服务器主机名的端点识别算法。

Type:string
Default:https
Valid Values:
Importance:low
Update Mode:per-broker

ssl.engine.factory.class

提供SSLEngine对象的org . Apache . Kafka . common . security . auth . sslenginefactory类型的类。默认值为org . Apache . Kafka . common . security . SSL .

Type:class
Default:null
Valid Values:
Importance:low
Update Mode:per-broker

ssl.principal.mapping.rules

从客户端证书的可分辨名称映射到简称的规则列表。规则按顺序进行评估,第一个匹配主体名称的规则用于将其映射到一个简称。列表中任何后面的规则都将被忽略。默认情况下,X.500证书的可分辨名称将是主体。有关格式的更多详细信息,请参见[安全授权和ACL](https://Kafka . Apache . org/documentation/# security _ authz)。请注意,如果“principal.builder.class”配置提供了KafkaPrincipalBuilder的扩展,则会忽略此配置。

Type:string
Default:DEFAULT
Valid Values:
Importance:low
Update Mode:read-only

ssl.secure.random.implementation

用于SSL加密操作的SecureRandom PRNG实现。

Type:string
Default:null
Valid Values:
Importance:low
Update Mode:per-broker

transaction.abort.timed.out.transaction.cleanup.interval.ms

回滚已超时事务的时间间隔

Type:int
Default:10000 (10 seconds)
Valid Values:[1,…]
Importance:low
Update Mode:read-only

transaction.remove.expired.transaction.cleanup.interval.ms

删除因“transactional.id.expiration.ms”通过而过期的事务的时间间隔

Type:int
Default:3600000 (1 hour)
Valid Values:[1,…]
Importance:low
Update Mode:read-only

zookeeper.ssl.cipher.suites

指定要在ZooKeeper TLS协商(csv)中使用的已启用密码套件。覆盖通过“zookeeper.ssl.ciphersuites”系统属性设置的任何显式值(请注意单词“ciphersuites”)。缺省值“null”意味着启用的密码套件列表由正在使用的Java运行时决定。

Type:list
Default:null
Valid Values:
Importance:low
Update Mode:read-only

zookeeper.ssl.crl.enable

指定是否在ZooKeeper TLS协议中启用证书吊销列表。覆盖通过“zookeeper.ssl.crl”系统属性设置的任何显式值(注意较短的名称)。

Type:boolean
Default:false
Valid Values:
Importance:low
Update Mode:read-only

zookeeper.ssl.enabled.protocols

指定ZooKeeper TLS协商(csv)中启用的协议。覆盖通过“zookeeper.ssl.enabledProtocols”系统属性设置的任何显式值(注意camelCase)。默认值“null”意味着启用的协议将是“zookeeper.ssl.protocol”配置属性的值。

Type:list
Default:null
Valid Values:
Importance:low
Update Mode:read-only

zookeeper.ssl.endpoint.identification.algorithm

指定是否在ZooKeeper TLS协商过程中启用主机名验证,其中(不区分大小写)“https”表示启用ZooKeeper主机名验证,显式空白值表示禁用(仅出于测试目的建议禁用)。显式值覆盖通过“zookeeper . SSL . hostname verification”系统属性设置的任何“true”或“false”值(注意不同的名称和值;true表示https,false表示空白)。

Type:string
Default:HTTPS
Valid Values:
Importance:low
Update Mode:read-only

zookeeper.ssl.ocsp.enable

指定是否在ZooKeeper TLS协议中启用在线证书状态协议。覆盖通过“zookeeper.ssl.ocsp”系统属性设置的任何显式值(注意较短的名称)

Type:boolean
Default:false
Valid Values:
Importance:low
Update Mode:read-only

zookeeper.ssl.protocol

指定ZooKeeper TLS协商中使用的协议。显式值会覆盖通过同名的“zookeeper.ssl.protocol”系统属性设置的任何值。

Type:string
Default:TLSv1.2
Valid Values:
Importance:low
Update Mode:read-only

关于代理配置的更多细节可以在scala类中找到kafka.server.KafkaConfig

3.1.1更新代理配置

从某些broker版本开始,无需重新启动Kafk1。见

Dynamic Update Mode

列输入 代理配置

对于每个代理配置的更新模式。

  • read-only:需要重新启动代理才能进行更新
  • per-broker:可以为每个代理动态更新
  • cluster-wide:可以作为群集范围的默认值动态更新。也可以更新为每个代理的值以进行测试。

要更改代理id 0的当前代理配置(例如,日志清理线程的数量):

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

要描述代理id 0的当前动态代理配置,请执行以下操作:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe

要删除配置重写并还原为代理id 0的静态配置值或默认值(例如,日志清理器线程数):

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config log.cleaner.threads

有些配置可以配置为集群范围内的默认值,以在整个集群中保持一致的值。群集中的所有代理将处理群集默认更新。例如,要更新所有代理上的日志清理器线程,请执行以下操作:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

要描述当前配置的动态群集范围的默认配置,请执行以下操作:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

在集群级别可配置的所有配置也可以在每个代理级别进行配置(例如,用于测试)。如果在不同级别定义配置值,则使用以下优先顺序:

  • 每个代理的动态配置
  • 动态群集范围的默认配置
  • 静态代理配置自server.properties
  • 卡夫卡违约,参见 代理配置

动态配置作为集群元数据存储在Kafka中。在ZooKeeper模式下,动态配置存储在ZooKeeper中。在KRaft模式下,动态配置作为记录存储在元数据日志中。

动态更新密码配置(仅限ZooKeeper)

动态更新的密码配置值在存储到ZooKeeper之前会加密。代理配置password.encoder.secret必须在中配置服务器.属性启用密码配置的动态更新。不同经纪人的秘密可能不同。

用于密码编码的密码可以随着代理的滚动重新启动而轮换。必须在静态代理配置中提供ZooKeeper中当前用于编码密码的旧密码password.encoder.old.secret新的秘密必须在密码.encoder.secret。当代理启动时,存储在ZooKeeper中的所有动态密码配置都将用新密码重新编码。

在Kafka 1.1.x中,在使用更新配置时,必须在每个alter请求中提供所有动态更新的密码配置kafka-configs.sh即使密码配置没有被更改。此约束将在将来的版本中删除。

启动代理之前更新ZooKeeper中的密码配置

从卡夫卡2.0.0开始,

kafka-configs.sh

启用在启动代理进行引导之前使用ZooKeeper更新动态代理配置。这将使所有密码配置都以加密形式存储,从而避免了在中需要清除密码

服务器.属性

。代理配置

password.encoder.secret

如果alter命令中包含任何密码配置,则还必须指定。还可以指定其他加密参数。密码编码器配置将不会保存在ZooKeeper中。例如,为侦听器存储SSL密钥密码

内部

在代理0上:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'

配置

listener.name.internal.ssl.key.password

将使用提供的编码器配置以加密形式保存在ZooKeeper中。编码器机密和迭代不会在ZooKeeper中持久化。

更新现有侦听器的SSL密钥库

代理可以配置具有较短有效期的SSL密钥库,以降低证书受损的风险。密钥库可以动态更新,而无需重新启动代理。配置名称必须以侦听器前缀作为前缀

listener.name.{listenerName}.

这样只更新特定侦听器的密钥库配置。以下配置可以在每个代理级别的单个alter请求中更新:

  • ssl.keystore.type
  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.key.password

如果侦听器是代理间侦听器,则仅当为该侦听器配置的信任库信任新密钥库时,才允许更新。对于其他侦听器,代理不对密钥库执行信任验证。证书必须由签署旧证书的同一证书颁发机构签名,以避免任何客户端身份验证失败。

更新现有侦听器的SSL信任库

代理信任库可以动态更新,而无需重新启动代理来添加或删除证书。更新的信任库将用于验证新的客户端连接。配置名称必须以侦听器前缀作为前缀

listener.name.{listenerName}.

这样,只有特定侦听器的信任库配置才会更新。以下配置可以在每个代理级别的单个alter请求中更新:

  • ssl.truststore.type
  • ssl.truststore.location
  • ssl.truststore.password

如果侦听器是代理间侦听器,则仅当新信任库信任该侦听器的现有密钥库时,才允许更新。对于其他侦听器,代理在更新之前不执行信任验证。从新信任库中删除用于签署客户端证书的CA证书可能会导致客户端身份验证失败。

更新默认主题配置

代理使用的默认主题配置选项可以在不重新启动代理的情况下进行更新。这些配置应用于主题,而不使用与每个主题配置等效的主题配置重写。这些配置中的一个或多个可能在所有代理使用的群集默认级别被覆盖。

  • log.segment.bytes
  • log.roll.ms
  • log.roll.hours
  • log.roll.jitter.ms
  • log.roll.jitter.hours
  • log.index.size.max.bytes
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.retention.bytes
  • log.retention.ms
  • log.retention.minutes
  • log.retention.hours
  • log.index.interval.bytes
  • log.cleaner.delete.retention.ms
  • log.cleaner.min.compaction.lag.ms
  • log.cleaner.max.compaction.lag.ms
  • log.cleaner.min.cleanable.ratio
  • log.cleanup.policy
  • log.segment.delete.delay.ms
  • unclean.leader.election.enable
  • min.insync.replicas
  • max.message.bytes
  • compression.type
  • log.preallocate
  • log.message.timestamp.type
  • log.message.timestamp.difference.max.ms

从Kafka 2.0.0版开始,当配置时,控制器会自动启用不干净的领导人选举

不干净。领导。选举。启用

动态更新。在Kafka版本1.1.x中,更改为

unclean.leader.election.enable

只有在选出新的控制者时才生效。在ZooKeeper模式下,可以通过移除控制器的ZNode来强制重新选择控制器。这是通过使用

zookeeper-shell.sh

“bin”目录中包含的实用程序。

> bin/zookeeper-shell.sh localhostrmr /controller

在KRaft模式下,强制控制器重新选择的方法是终止活动控制器节点。由于KRaft控制器不托管分区,所以重启速度通常非常快。

更新清理程序配置日志

日志清理器配置可以在所有代理使用的群集默认级别上动态更新。这些更改将在日志清理的下一个迭代中生效。这些配置中的一个或多个可以更新:

  • log.cleaner.threads
  • log.cleaner.io.max.bytes.per.second
  • log.cleaner.dedupe.buffer.size
  • log.cleaner.io.buffer.size
  • log.cleaner.io.buffer.load.factor
  • log.cleaner.backoff.ms

更新配置线程

代理使用的各种线程池的大小可以在所有代理使用的集群默认级别上动态更新。更新仅限于范围

currentSize / 2

 电流大小*2

以确保配置更新得到妥善处理。

  • num.network.threads
  • num.io.threads
  • num.replica.fetchers
  • num.recovery.threads.per.data.dir
  • log.cleaner.threads
  • background.threads

正在更新ConnectionQuota配置

代理对给定IP/主机允许的最大连接数可以在所有代理使用的群集默认级别上动态更新。这些更改将应用于新的连接创建,现有的连接计数将被新的限制考虑在内。

  • max.connections.per.ip
  • max.connections.per.ip.overrides

添加和删除侦听器

可以动态添加或删除侦听器。添加新侦听器时,必须提供侦听器的安全配置,因为侦听器配置带有侦听器前缀listener.name.{listenerName}.。如果新侦听器使用SASL,则必须使用JAAS配置属性提供侦听器的JAAS配置 sasl.jaas.config文件带有侦听器和机制前缀。看到了吗Kafka代理的JAAS配置了解详情

在Kafka版本1.1.x中,代理间侦听器使用的侦听器可能不会动态更新。要将代理间侦听器更新为新侦听器,可以在所有代理上添加新侦听器,而无需重新启动代理。然后需要滚动重新启动才能更新inter.broker.listener.name .

除了新侦听器的所有安全配置外,以下配置可能在每个代理级别上动态更新:

  • listeners
  • advertised.listeners
  • listener.security.protocol.map

代理间侦听器必须使用静态代理配置进行配置

inter.broker.listener.name

security.inter.broker.protocol

.

3.2 Topic Configs

与主题相关的配置既有服务器默认值,也有可选的每个主题覆盖。如果没有为每个主题指定配置,则使用服务器默认配置。可以在主题创建时通过提供一个或多个

--config

选项。此示例创建一个名为

我的主题

使用自定义的最大邮件大小和刷新率:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

以后还可以使用alterconfigs命令更改或设置覆盖。此示例更新的最大邮件大小

我的主题:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --add-config max.message.bytes=128000

要检查在主题上设置的覆盖,可以执行以下操作

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

要删除覆盖,可以执行以下操作

> bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic--alter --delete-config max.message.bytes

以下是主题配置。服务器对此属性的默认配置在服务器默认属性标题下给出。给定的服务器默认配置值仅适用于没有显式主题配置重写的主题。

3.3 Producer Configs

以下是生产者的配置:

Type:class
Default:null
Valid Values:
Importance:medium

3.4 Consumer Configs

以下是消费者的配置:

Type:string
Default:latest
Valid Values:[latest, earliest, none]
Importance:medium
Type:string
Default:read_uncommitted
Valid Values:[read_committed, read_uncommitted]
Importance:medium
Type:list
Default:class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Valid Values:non-null string
Importance:medium
http://www.dtcms.com/a/507204.html

相关文章:

  • 惠洋科技原厂直销H6201L:150V降压芯片 支持200V120V100V降压12V5V3.3V5A大电流仪表盘供电IC方案 低功耗 高性能
  • Python趣味算法:出售金鱼问题:用Python逆向思维破解数学谜题
  • Centos环境基于Nginx配置https
  • VMware Ubuntu 虚拟机网络故障分析报告
  • PostgreSQL PostGIS安装与配置,现有数据库启用PostGIS扩展
  • centos中安装redis
  • 【终极面试集锦】如何设计微服务熔断体系?
  • 华为多级m-lag简单配置案例
  • UE4_UE5 的快速下载安装教学 (UE产品展示程序实例教程 1)
  • 淄博哪家公司做网站最好龙岗微信网站制作
  • K8S(十七)—— Kubernetes集群可视化工具Kuboard部署与实践指南
  • 74-基于Python的蜜雪冰城门店数据可视化分析系统
  • 正点原子RK3568学习日志11-申请字符设备号
  • k8s device plugin
  • 征二级网站建设意见 通知室内装修设计企业
  • 【Maven】Maven设置国内源
  • 软件设计师知识点总结:程序设计语言基础
  • 让Pycharm的Terminal(终端)进入创建好的虚拟环境
  • [css] 图片阴影 filter: drop-shadow
  • 第三章深度学习---核心库TensorFlow 和 PyTorch 实操指南(三)
  • BugKu Web渗透之 cookiesWEB
  • 【研究生随笔】Pytorch中的线性代数(微分)
  • 专业品牌设计网站建设网站建设实训心得
  • 从暴力到最优——力扣88.合并两个有序数组
  • C语言——回调函数的典型示例(分析详解)
  • 雷州网站建设公司网站备案半身照
  • 【AI 风向标】gpt-oss20b 模型测试与评估报告(2025-08-21)
  • Java MyBatis(二)--- 多表查询,# 和 $的区别,SQL注入,数据库连接池,动态SQL
  • 深圳小企业网站建设vs做网站怎么放视频
  • 企业 做网站云虚拟主机搭建网站