当前位置: 首页 > news >正文

Kafka4.0 可观测性最佳实践

Kafka4.0 介绍

Kafka4.0 的重大变革 —— KRaft 模式。Kafka4.0 最具革命性的变化,默认运行在 KRaft(Kafka Raft)模式下,彻底摒弃了对 Apache ZooKeeper 的依赖。KRaft 模式的引入,可谓是 Kafka 架构演进的一次重大飞跃。它基于 Raft 一致性算法构建共识机制,将元数据管理功能巧妙地集成到 Kafka 自身的体系之中,从而实现了对 ZooKeeper 的无缝替换。

主要优势:

  • 简化部署与运维流程:运维人员从此无需再为搭建和维护复杂的 ZooKeeper 集群耗费大量精力,大大降低了整体的运营开销。新的架构设计极大地简化了系统的复杂性,使得 Kafka 的安装、配置以及日常管理工作变得更加直观、高效,即使是新手也能轻松上手。
  • 显著增强可扩展性:在 KRaft 模式下,Kafka 集群的扩展性得到了进一步的提升。新增 Broker 节点的操作变得更加简便快捷,能够更好地适应大规模数据处理场景下,对系统资源进行动态调整的需求。无论是应对业务高峰期的数据洪峰,还是随着业务增长逐步扩展集群规模,KRaft 模式都能游刃有余。
  • 提升系统性能与稳定性:去除 ZooKeeper 这一外部依赖后,Kafka 在元数据操作的响应速度和一致性方面表现得更加出色。特别是在高并发写入和读取的场景中,系统的稳定性和可靠性得到了显著增强,有效减少了因外部组件故障而可能引发的单点问题,为企业级应用提供了更加坚实可靠的底层支撑。

观测云

观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

部署 DataKit

DataKit 是一个开源的、跨平台的数据收集和监控工具,由观测云开发并维护。它旨在帮助用户收集、处理和分析各种数据源,如日志、指标和事件,以便进行有效的监控和故障排查。DataKit 支持多种数据输入和输出格式,可以轻松集成到现有的监控系统中。

登录观测云控制台,在「集成」 - 「DataKit」选择对应安装方式,当前采用 Linux 主机部署 DataKit。

采集步骤

下载 JMX Exporter

下载地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

配置 JMX 脚本和启动参数

注意:采集 Producer、Consumer、Streams、Connect 指标需要开各自独立进程,启动各自进程时注意替换对应的 yaml 文件和对应的启动脚本,如下可参考。

KRaft Metrics

  • 创建 KRaft Metrics 配置文件 kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:# 1. Broker / Topic / Partition Metrics- pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_server_broker_topic_metrics_$1type: GAUGElabels:topic: "$2"# 2. Request / Network Metrics- pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_network_request_metrics_$1type: GAUGE# 3. Socket Server Metrics- pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)name: kafka_network_socket_server_metrics_$1type: GAUGE# 4. Log / Segment / Cleaner Metrics- pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_log_$1_$2type: GAUGE# 5. Controller (KRaft) Metrics- pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)name: kafka_controller_$1type: GAUGE# 6. Group / Coordinator Metrics- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)name: kafka_coordinator_group_metadata_manager_$1type: GAUGE# 7. KRaft Specific Metrics- pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)name: kafka_controller_$1type: GAUGE# 8. New Generation Consumer Rebalance Protocol Metrics- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)name: kafka_coordinator_group_metadata_manager_$1type: GAUGE# 9. Queue Metrics- pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)name: kafka_server_queue_$1type: GAUGE# 10. Client Metrics- pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_network_request_metrics_$1type: GAUGE# 11. Log Flush Rate and Time- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_log_log_flush_rate_and_time_mstype: GAUGE
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \-Dcom.sun.management.jmxremote.rmi.port=9999 \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties

Producer Metrics

  • 创建 Procucer Metrics 配置文件 producer.yml
---
lowercaseOutputName: true
rules:# 新增:producer-node-metrics- pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)name: kafka_producer_node_$3labels:client_id: "$1"node_id: "$2"type: GAUGE- pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_producer_metrics_$2'labels:client_id: "$1"type: GAUGE# 抓取 Selector 全部指标(Kafka 4.0 新增)- pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'name: 'kafka_${1}_${4}'labels:client_id: '$3'type: GAUGE
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"/opt/kafka/kafka/bin/kafka-console-producer.sh \--broker-list localhost:9092 \--topic xxxx \--producer-property bootstrap.servers=localhost:9092

Consumer Metrics

  • 创建 Consumer Metrics 配置文件consumer.yml
lowercaseOutputName: true
rules:# consumer-coordinator-metrics- pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_consumer_coordinator_metrics_$2'labels:client_id: "$1"type: GAUGE- pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_consumer_metrics_$2'labels:client_id: "$1"
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"/opt/kafka/kafka/bin/kafka-console-consumer.sh \--broker-list localhost:9092 \--topic xxxx \--producer-property bootstrap.servers=localhost:9092

Streams Metrics

  • 创建 Streams Metrics 配置文件stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: truerules:# Kafka Streams 应用指标 - 移除特殊字符- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'name: kafka_streams_$2labels:client_id: "$1"# 处理包含特殊字符的属性名- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'name: kafka_streams_$2_$3labels:client_id: "$1"# Processor Node 指标- pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'name: kafka_streams_processor_$4labels:client_id: "$1"task_id: "$2"processor_node_id: "$3"# Task 指标- pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'name: kafka_streams_task_$3labels:client_id: "$1"task_id: "$2"# 线程指标- pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'name: kafka_streams_thread_$3labels:client_id: "$1"thread_id: "$2"# JVM 指标- pattern: 'java.lang<type=Memory><>(.+)'name: jvm_memory_$1- pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'name: jvm_gc_$2labels:gc: "$1"# 线程池指标- pattern: 'java.lang<type=Threading><>(.+)'name: jvm_threads_$1# 默认规则- pattern: '(.*)'
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \-Dcom.sun.management.jmxremote.rmi.port=9996 \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo

Connect Metrics

  • 创建 Connect Metrics 配置文件 connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:# 1) connect-worker-metrics(全局)- pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'name: 'kafka_connect_worker_$1'type: GAUGE# 2) connect-worker-metrics,connector=xxx- pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'name: 'kafka_connect_worker_$2'labels:connector: "$1"type: GAUGE# 3) connect-worker-rebalance-metrics- pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'name: 'kafka_connect_worker_rebalance_$1'type: GAUGE# 4) connector-task-metrics- pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'name: 'kafka_connect_task_$3'labels:connector: "$1"task_id: "$2"type: GAUGE# 5) sink-task-metrics- pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'name: 'kafka_connect_sink_task_$3'labels:connector: "$1"task_id: "$2"
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Dcom.sun.management.jmxremote.port=9995 \-Dcom.sun.management.jmxremote.rmi.port=9995 \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"# 启动 Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties

启动成功后,可通过 curl http://IP:端口号/metrics 查看获取到的监控数据。

配置 DataKit

  • 进入 datakit 安装目录下的 conf.d/prom 目录,复制 prom.conf.sample 并命名为 kafka.conf
cp prom.conf.sample kafka.conf
  • 调整 kafka.conf
[[inputs.prom]]## Exporter URLs.urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]## Collector alias.source = "kafka"## Prioritier over 'measurement_name' configuration.[[inputs.prom.measurements]]prefix = "kafka_controller_"name = "kafka_controller"[[inputs.prom.measurements]]prefix = "kafka_network_"name = "kafka_network"[[inputs.prom.measurements]]prefix = "kafka_log_"name = "kafka_log"[[inputs.prom.measurements]]prefix = "kafka_server_"name = "kafka_server"[[inputs.prom.measurements]]prefix = "kafka_connect_"name = "kafka_connect"[[inputs.prom.measurements]]prefix = "kafka_stream_"name = "kafka_stream"

重启 DataKit

执行以下命令:

datakit service -R

指标集

以下是 kafka4.0 部分指标说明,更多指标可参考 Kafka 指标详情。

kafka_server指标集
指标名描述单位
Fetch_queue_size当前活跃的 Broker 数量Count
Produce_queue_size当前活跃的 Broker 数量Count
Request_queue_size当前活跃的 Broker 数量Count
broker_topic_metrics_BytesInPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_BytesOutPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_FailedProduceRequestsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_FetchMessageConversionsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_MessagesInPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_ProduceMessageConversionsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_TotalFetchRequestsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_TotalProduceRequestsPerSec当前活跃的 Broker 数量Count
socket_server_metrics_connection_count当前活跃的 Broker 数量Count
socket_server_metrics_connection_close_total当前活跃的 Broker 数量Count
socket_server_metrics_incoming_byte_rate当前活跃的 Broker 数量Count
kafka_network 指标集
指标名描述单位
request_metrics_RequestBytes_request_AddOffsetsToTxnAddOffsetsToTxn 请求大小bytes
request_metrics_RequestBytes_request_FetchFetch 请求大小count
request_metrics_RequestBytes_request_FetchConsumerFetchConsumer 请求大小bytes
request_metrics_RequestBytes_request_FetchFollowerFetchFollower 请求大小bytes
request_metrics_TotalTimeMs_request_CreateTopicsCreateTopics 请求总时间ms
request_metrics_TotalTimeMs_request_CreatePartitionsCreatePartitions 请求总时间ms
request_metrics_RequestQueueTimeMs_request_CreatePartitionsCreatePartitions 在请求对列等待时间ms
request_metrics_RequestQueueTimeMs_request_ProduceProduce 在请求对列的等待时间ms
kafka_controller 指标集
指标名描述单位
ActiveBrokerCount当前活跃的 Broker 数量Count
ActiveControllerCount活跃控制器数量Count
GlobalPartitionCount分区数量Count
GlobalTopicCount主题数量Count
IgnoredStaticVotersbytesKong的带宽使用量,单位为字节
OfflinePartitionsCount离线分区数量Count
PreferredReplicaImbalanceCountPreferred Leader 选举条件的分区数Count
TimedOutBrokerHeartbeatCountBroker 心跳超时的次数Count
kafka_producer 指标集
指标名描述单位
producer_metrics_batch_split_rate批次分割率count/s
producer_metrics_buffer_available_bytes未使用的缓冲区内存总量bytes
producer_metrics_buffer_exhausted_rate缓冲区耗尽而丢弃的每秒平均记录发送数量count/s
producer_metrics_buffer_total_bytes缓冲区总字节大小bytes
producer_metrics_bufferpool_wait_ratio缓冲池等待比率%
producer_metrics_bufferpool_wait_time_ns_total缓冲池等待时间ms
producer_metrics_connection_close_rate关闭连接率count/s
producer_metrics_connection_count关闭连接数量count
kafka_consumer 指标集
指标名描述单位
consumer_coordinator_metrics_failed_rebalance_total再平衡失败数量count
consumer_coordinator_metrics_heartbeat_rate每秒平均心跳次数count/s
consumer_coordinator_metrics_heartbeat_response_time_max心跳响应最大时间count
consumer_coordinator_metrics_join_rateGroup 每秒加入速率count/s
consumer_coordinator_metrics_join_totalGroup 加入总数count
consumer_coordinator_metrics_last_rebalance_seconds_ago自上次重新平衡事件以来的秒数ms
consumer_coordinator_metrics_rebalance_latency_total重新平衡延迟总计ms
consumer_fetch_manager_metrics_bytes_consumed_rate每秒消耗的字节数bytes/s
consumer_fetch_manager_metrics_fetch_latency_avgFetch 请求延迟ms
consumer_metrics_connection_count连接数count
consumer_metrics_incoming_byte_rate输入字节数率bytes/s
kafka_connect 指标集
指标名描述单位
worker_connector_countConnector 数量count
worker_task_startup_attempts_total任务启动重试次数count
worker_connector_startup_attempts_total连接器尝试启动次数count
worker_task_startup_failure_total任务启动失败数量count
worker_connector_startup_failure_percentage连接失败率%
worker_rebalance_completed_rebalances_total再平衡完成总数count
worker_task_startup_failure_percentage任务启动失败占比%
worker_rebalance_time_since_last_rebalance_ms自上次重新平衡以来的时间ms
worker_task_startup_attempts_total任务尝试启动次数count
kafka_stream 指标集
指标名描述单位
stream_thread_metrics_thread_start_time线程启动时间时间戳 ms
stream_thread_metrics_task_created_total任务创建总数count
stream_state_metrics_block_cache_capacity块缓存大小bytes
stream_state_metrics_all_rate所有操作率count/s
stream_state_metrics_block_cache_usage块缓存使用率%
stream_state_metrics_bytes_read_compaction_rate字节读取压缩率bytes/s
stream_state_metrics_bytes_written_compaction_rate字节写入压缩率bytes/s
stream_state_metrics_block_cache_index_hit_ratio块缓存索引命中率%
stream_state_metrics_block_cache_data_hit_ratio块缓存数据命中率%

场景视图

登录观测云控制台,点击「场景」 -「新建仪表板」,输入 “Kafka 4”, 选择 “Kafka 4”,点击 “确定” 即可添加视图。

监控器(告警)

登录观测云控制台,点击「监控」 -「新建监控器」,输入 “kafka”, 选择对应的监控器,点击 “确定” 即可添加。

Kafka 连接过期被关闭客户端连接

Kafka 集群在处理消费者拉取请求时的延迟过高

Kafka 集群在处理生产者请求时的延迟过高

Kafka 集群 ActiveController 为0异常

Kafka 离线分区数量过高异常

总结

通过监控 Kafka,我们可以实时掌握消息吞吐、消费者滞后、Broker 健康等关键指标,提前发现副本缺失、网络拥塞或消费延迟,保障系统稳定;也能结合历史基线做容量预测与异常检测,为扩缩容、参数调优提供量化依据,让数据持续高效、可观测、可演进。

http://www.dtcms.com/a/389529.html

相关文章:

  • 深入解析 Spring AI 系列:解析函数调用
  • ​​[硬件电路-245]:电气制图软件有哪些
  • 不会索赔500万的苹果,翻车如期到来,不过已没啥影响了
  • 第十一章:AI进阶之--模块的概念与使用(一)
  • 【IoTDB】01 - IoTDB的基本使用
  • 【C++】模版语法基础:认识模版(初识篇)
  • 继承测试用例回归策略
  • 卡普空《怪物猎人》系列策略转变:PC平台成重要增长点
  • UML 顺序图 | 概念 / 组成 / 作用 / 绘制
  • 安装SSL证书后如何测试和验证其是否正确配置?
  • A股大盘数据-20250918分析
  • 容器环境变量管理在云服务器多环境部署中的配置方法
  • 算法练习-排序-选择排序
  • 岭回归(Ridge Regression)在机器学习中的应用
  • python高级编程面试题
  • 模拟ic工程师如何提升自己?
  • springboot flowable 工作流入门与实战
  • 飞算Java的在线考试系统的设计与实现——学生开发者的课程实践记录
  • Vue3 基础语法详解:从入门到实践
  • 大白话聊明白:同步刷盘、异步刷盘以及RocketMQ和RabbitMQ的刷盘策略
  • I0流学习
  • 摄影灯MCU方案开发,摄影灯单片机分析
  • Salesforce知识点: LWC 组件通信全解析
  • Lua语言程序设计3:闭包、模式匹配、日期和时间
  • Freertos系列教学(删除函数的使用)
  • DevOps平台建设 - 总体设计文档的核心架构与关键技术实践
  • 系统中间件与云虚拟化-云数据库与数据库访问中间件ORM框架-Sannic-非实验
  • DTC BluSDR™系列-满足您所有的无人机通信需求
  • 【猛犸AI科技】深度强化学习SCI/EI/CCF/中文核心一站式辅导
  • 美创科技闪耀亚洲教育装备博览会,以数据安全护航教育数字化