当前位置: 首页 > news >正文

热门搜索怎么做企业网站优化需要多少钱

热门搜索,怎么做企业网站优化需要多少钱,网络规划设计师题型,有那些app开发公司Kafka4.0 介绍 Kafka4.0 的重大变革 —— KRaft 模式。Kafka4.0 最具革命性的变化,默认运行在 KRaft(Kafka Raft)模式下,彻底摒弃了对 Apache ZooKeeper 的依赖。KRaft 模式的引入,可谓是 Kafka 架构演进的一次重大飞…

Kafka4.0 介绍

Kafka4.0 的重大变革 —— KRaft 模式。Kafka4.0 最具革命性的变化,默认运行在 KRaft(Kafka Raft)模式下,彻底摒弃了对 Apache ZooKeeper 的依赖。KRaft 模式的引入,可谓是 Kafka 架构演进的一次重大飞跃。它基于 Raft 一致性算法构建共识机制,将元数据管理功能巧妙地集成到 Kafka 自身的体系之中,从而实现了对 ZooKeeper 的无缝替换。

主要优势:

  • 简化部署与运维流程:运维人员从此无需再为搭建和维护复杂的 ZooKeeper 集群耗费大量精力,大大降低了整体的运营开销。新的架构设计极大地简化了系统的复杂性,使得 Kafka 的安装、配置以及日常管理工作变得更加直观、高效,即使是新手也能轻松上手。
  • 显著增强可扩展性:在 KRaft 模式下,Kafka 集群的扩展性得到了进一步的提升。新增 Broker 节点的操作变得更加简便快捷,能够更好地适应大规模数据处理场景下,对系统资源进行动态调整的需求。无论是应对业务高峰期的数据洪峰,还是随着业务增长逐步扩展集群规模,KRaft 模式都能游刃有余。
  • 提升系统性能与稳定性:去除 ZooKeeper 这一外部依赖后,Kafka 在元数据操作的响应速度和一致性方面表现得更加出色。特别是在高并发写入和读取的场景中,系统的稳定性和可靠性得到了显著增强,有效减少了因外部组件故障而可能引发的单点问题,为企业级应用提供了更加坚实可靠的底层支撑。

观测云

观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。

部署 DataKit

DataKit 是一个开源的、跨平台的数据收集和监控工具,由观测云开发并维护。它旨在帮助用户收集、处理和分析各种数据源,如日志、指标和事件,以便进行有效的监控和故障排查。DataKit 支持多种数据输入和输出格式,可以轻松集成到现有的监控系统中。

登录观测云控制台,在「集成」 - 「DataKit」选择对应安装方式,当前采用 Linux 主机部署 DataKit。

采集步骤

下载 JMX Exporter

下载地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

配置 JMX 脚本和启动参数

注意:采集 Producer、Consumer、Streams、Connect 指标需要开各自独立进程,启动各自进程时注意替换对应的 yaml 文件和对应的启动脚本,如下可参考。

KRaft Metrics

  • 创建 KRaft Metrics 配置文件 kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:# 1. Broker / Topic / Partition Metrics- pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_server_broker_topic_metrics_$1type: GAUGElabels:topic: "$2"# 2. Request / Network Metrics- pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_network_request_metrics_$1type: GAUGE# 3. Socket Server Metrics- pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)name: kafka_network_socket_server_metrics_$1type: GAUGE# 4. Log / Segment / Cleaner Metrics- pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_log_$1_$2type: GAUGE# 5. Controller (KRaft) Metrics- pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)name: kafka_controller_$1type: GAUGE# 6. Group / Coordinator Metrics- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)name: kafka_coordinator_group_metadata_manager_$1type: GAUGE# 7. KRaft Specific Metrics- pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)name: kafka_controller_$1type: GAUGE# 8. New Generation Consumer Rebalance Protocol Metrics- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)name: kafka_coordinator_group_metadata_manager_$1type: GAUGE# 9. Queue Metrics- pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)name: kafka_server_queue_$1type: GAUGE# 10. Client Metrics- pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_network_request_metrics_$1type: GAUGE# 11. Log Flush Rate and Time- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)name: kafka_log_log_flush_rate_and_time_mstype: GAUGE
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \-Dcom.sun.management.jmxremote.rmi.port=9999 \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties

Producer Metrics

  • 创建 Procucer Metrics 配置文件 producer.yml
---
lowercaseOutputName: true
rules:# 新增:producer-node-metrics- pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)name: kafka_producer_node_$3labels:client_id: "$1"node_id: "$2"type: GAUGE- pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_producer_metrics_$2'labels:client_id: "$1"type: GAUGE# 抓取 Selector 全部指标(Kafka 4.0 新增)- pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'name: 'kafka_${1}_${4}'labels:client_id: '$3'type: GAUGE
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"/opt/kafka/kafka/bin/kafka-console-producer.sh \--broker-list localhost:9092 \--topic xxxx \--producer-property bootstrap.servers=localhost:9092

Consumer Metrics

  • 创建 Consumer Metrics 配置文件consumer.yml
lowercaseOutputName: true
rules:# consumer-coordinator-metrics- pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_consumer_coordinator_metrics_$2'labels:client_id: "$1"type: GAUGE- pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'name: 'kafka_consumer_metrics_$2'labels:client_id: "$1"
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"/opt/kafka/kafka/bin/kafka-console-consumer.sh \--broker-list localhost:9092 \--topic xxxx \--producer-property bootstrap.servers=localhost:9092

Streams Metrics

  • 创建 Streams Metrics 配置文件stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: truerules:# Kafka Streams 应用指标 - 移除特殊字符- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'name: kafka_streams_$2labels:client_id: "$1"# 处理包含特殊字符的属性名- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'name: kafka_streams_$2_$3labels:client_id: "$1"# Processor Node 指标- pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'name: kafka_streams_processor_$4labels:client_id: "$1"task_id: "$2"processor_node_id: "$3"# Task 指标- pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'name: kafka_streams_task_$3labels:client_id: "$1"task_id: "$2"# 线程指标- pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'name: kafka_streams_thread_$3labels:client_id: "$1"thread_id: "$2"# JVM 指标- pattern: 'java.lang<type=Memory><>(.+)'name: jvm_memory_$1- pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'name: jvm_gc_$2labels:gc: "$1"# 线程池指标- pattern: 'java.lang<type=Threading><>(.+)'name: jvm_threads_$1# 默认规则- pattern: '(.*)'
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \-Dcom.sun.management.jmxremote.rmi.port=9996 \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo

Connect Metrics

  • 创建 Connect Metrics 配置文件 connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:# 1) connect-worker-metrics(全局)- pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'name: 'kafka_connect_worker_$1'type: GAUGE# 2) connect-worker-metrics,connector=xxx- pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'name: 'kafka_connect_worker_$2'labels:connector: "$1"type: GAUGE# 3) connect-worker-rebalance-metrics- pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'name: 'kafka_connect_worker_rebalance_$1'type: GAUGE# 4) connector-task-metrics- pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'name: 'kafka_connect_task_$3'labels:connector: "$1"task_id: "$2"type: GAUGE# 5) sink-task-metrics- pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'name: 'kafka_connect_sink_task_$3'labels:connector: "$1"task_id: "$2"
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Dcom.sun.management.jmxremote.port=9995 \-Dcom.sun.management.jmxremote.rmi.port=9995 \-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"# 启动 Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties

启动成功后,可通过 curl http://IP:端口号/metrics 查看获取到的监控数据。

配置 DataKit

  • 进入 datakit 安装目录下的 conf.d/prom 目录,复制 prom.conf.sample 并命名为 kafka.conf
cp prom.conf.sample kafka.conf
  • 调整 kafka.conf
[[inputs.prom]]## Exporter URLs.urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]## Collector alias.source = "kafka"## Prioritier over 'measurement_name' configuration.[[inputs.prom.measurements]]prefix = "kafka_controller_"name = "kafka_controller"[[inputs.prom.measurements]]prefix = "kafka_network_"name = "kafka_network"[[inputs.prom.measurements]]prefix = "kafka_log_"name = "kafka_log"[[inputs.prom.measurements]]prefix = "kafka_server_"name = "kafka_server"[[inputs.prom.measurements]]prefix = "kafka_connect_"name = "kafka_connect"[[inputs.prom.measurements]]prefix = "kafka_stream_"name = "kafka_stream"

重启 DataKit

执行以下命令:

datakit service -R

指标集

以下是 kafka4.0 部分指标说明,更多指标可参考 Kafka 指标详情。

kafka_server指标集
指标名描述单位
Fetch_queue_size当前活跃的 Broker 数量Count
Produce_queue_size当前活跃的 Broker 数量Count
Request_queue_size当前活跃的 Broker 数量Count
broker_topic_metrics_BytesInPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_BytesOutPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_FailedProduceRequestsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_FetchMessageConversionsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_MessagesInPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_ProduceMessageConversionsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_TotalFetchRequestsPerSec当前活跃的 Broker 数量Count
broker_topic_metrics_TotalProduceRequestsPerSec当前活跃的 Broker 数量Count
socket_server_metrics_connection_count当前活跃的 Broker 数量Count
socket_server_metrics_connection_close_total当前活跃的 Broker 数量Count
socket_server_metrics_incoming_byte_rate当前活跃的 Broker 数量Count
kafka_network 指标集
指标名描述单位
request_metrics_RequestBytes_request_AddOffsetsToTxnAddOffsetsToTxn 请求大小bytes
request_metrics_RequestBytes_request_FetchFetch 请求大小count
request_metrics_RequestBytes_request_FetchConsumerFetchConsumer 请求大小bytes
request_metrics_RequestBytes_request_FetchFollowerFetchFollower 请求大小bytes
request_metrics_TotalTimeMs_request_CreateTopicsCreateTopics 请求总时间ms
request_metrics_TotalTimeMs_request_CreatePartitionsCreatePartitions 请求总时间ms
request_metrics_RequestQueueTimeMs_request_CreatePartitionsCreatePartitions 在请求对列等待时间ms
request_metrics_RequestQueueTimeMs_request_ProduceProduce 在请求对列的等待时间ms
kafka_controller 指标集
指标名描述单位
ActiveBrokerCount当前活跃的 Broker 数量Count
ActiveControllerCount活跃控制器数量Count
GlobalPartitionCount分区数量Count
GlobalTopicCount主题数量Count
IgnoredStaticVotersbytesKong的带宽使用量,单位为字节
OfflinePartitionsCount离线分区数量Count
PreferredReplicaImbalanceCountPreferred Leader 选举条件的分区数Count
TimedOutBrokerHeartbeatCountBroker 心跳超时的次数Count
kafka_producer 指标集
指标名描述单位
producer_metrics_batch_split_rate批次分割率count/s
producer_metrics_buffer_available_bytes未使用的缓冲区内存总量bytes
producer_metrics_buffer_exhausted_rate缓冲区耗尽而丢弃的每秒平均记录发送数量count/s
producer_metrics_buffer_total_bytes缓冲区总字节大小bytes
producer_metrics_bufferpool_wait_ratio缓冲池等待比率%
producer_metrics_bufferpool_wait_time_ns_total缓冲池等待时间ms
producer_metrics_connection_close_rate关闭连接率count/s
producer_metrics_connection_count关闭连接数量count
kafka_consumer 指标集
指标名描述单位
consumer_coordinator_metrics_failed_rebalance_total再平衡失败数量count
consumer_coordinator_metrics_heartbeat_rate每秒平均心跳次数count/s
consumer_coordinator_metrics_heartbeat_response_time_max心跳响应最大时间count
consumer_coordinator_metrics_join_rateGroup 每秒加入速率count/s
consumer_coordinator_metrics_join_totalGroup 加入总数count
consumer_coordinator_metrics_last_rebalance_seconds_ago自上次重新平衡事件以来的秒数ms
consumer_coordinator_metrics_rebalance_latency_total重新平衡延迟总计ms
consumer_fetch_manager_metrics_bytes_consumed_rate每秒消耗的字节数bytes/s
consumer_fetch_manager_metrics_fetch_latency_avgFetch 请求延迟ms
consumer_metrics_connection_count连接数count
consumer_metrics_incoming_byte_rate输入字节数率bytes/s
kafka_connect 指标集
指标名描述单位
worker_connector_countConnector 数量count
worker_task_startup_attempts_total任务启动重试次数count
worker_connector_startup_attempts_total连接器尝试启动次数count
worker_task_startup_failure_total任务启动失败数量count
worker_connector_startup_failure_percentage连接失败率%
worker_rebalance_completed_rebalances_total再平衡完成总数count
worker_task_startup_failure_percentage任务启动失败占比%
worker_rebalance_time_since_last_rebalance_ms自上次重新平衡以来的时间ms
worker_task_startup_attempts_total任务尝试启动次数count
kafka_stream 指标集
指标名描述单位
stream_thread_metrics_thread_start_time线程启动时间时间戳 ms
stream_thread_metrics_task_created_total任务创建总数count
stream_state_metrics_block_cache_capacity块缓存大小bytes
stream_state_metrics_all_rate所有操作率count/s
stream_state_metrics_block_cache_usage块缓存使用率%
stream_state_metrics_bytes_read_compaction_rate字节读取压缩率bytes/s
stream_state_metrics_bytes_written_compaction_rate字节写入压缩率bytes/s
stream_state_metrics_block_cache_index_hit_ratio块缓存索引命中率%
stream_state_metrics_block_cache_data_hit_ratio块缓存数据命中率%

场景视图

登录观测云控制台,点击「场景」 -「新建仪表板」,输入 “Kafka 4”, 选择 “Kafka 4”,点击 “确定” 即可添加视图。

监控器(告警)

登录观测云控制台,点击「监控」 -「新建监控器」,输入 “kafka”, 选择对应的监控器,点击 “确定” 即可添加。

Kafka 连接过期被关闭客户端连接

Kafka 集群在处理消费者拉取请求时的延迟过高

Kafka 集群在处理生产者请求时的延迟过高

Kafka 集群 ActiveController 为0异常

Kafka 离线分区数量过高异常

总结

通过监控 Kafka,我们可以实时掌握消息吞吐、消费者滞后、Broker 健康等关键指标,提前发现副本缺失、网络拥塞或消费延迟,保障系统稳定;也能结合历史基线做容量预测与异常检测,为扩缩容、参数调优提供量化依据,让数据持续高效、可观测、可演进。

http://www.dtcms.com/a/482905.html

相关文章:

  • JVM初识
  • 最小二乘问题详解4:非线性最小二乘
  • pcba方案开发|车载智能充气泵
  • c++项目篇:高并发内存池项目开发记录01
  • wordpress wp_term_taxonomy优化网站哪家好
  • 怎么选择宜昌网站建设沈阳网站建设与开发
  • SQL提数与数据分析指南
  • 新手用Godot打造2D像素风游戏
  • 框架--SpringMVC
  • 做外贸网站注意wordpress 附件下载插件
  • 微波人体传感器技术深度解析:从多普勒效应到工程落地
  • 定长内存池 思考实现过程 C++ 附源码
  • 风电场站AGC/AVC系统方案|风力发电AGC/AVC系统方案|风电场站一次调频系统方案|风力发电一次调频系统产品方案概述
  • sward,一款超级轻量且简洁的知识管理工具
  • 类似京东的购物网站开发价格课程设计代做网站推荐
  • 网站开发毕业设计开题报告手机怎么安装网站程序
  • python调用远程服务器的ollama embedding模型
  • 手机电子商务网站建设怎么做网站排名优化
  • SQL入门:行列转换实战-从基础到高级
  • 科大讯飞【免费】的开源模型实现录音转写与角色判定
  • 上海专业建设网站制作微信群推广
  • 景区网站建设原则做网站需要每年都缴费吗
  • 推广一下自己刚撸的 IDEA 插件—Bean Copy 助手
  • 线粒体靶向压电催化剂调控焦亡与胞葬作用以增强骨肉瘤免疫原性死亡
  • Vue 3 + TypeScript 开发的视频直播页面组件
  • 【开题答辩实录分享】以《智能体育训练助手的设计与实现》为例进行答辩实录分享
  • Vue + Element Plus 手动注册 v-loading 指令
  • docker elasticsearch端口映射解决端口冲突问题
  • SD:在一个 Ubuntu 系统安装 stable diffusion ComfyUI
  • 如何使用命令修改conda虚拟环境目录