当前位置: 首页 > news >正文

如何监控Kafka的Lag(消费延迟)?

监控Kafka消费延迟(Lag)主要有以下几种方法:

  1. 使用Kafka自带命令行工具
# 查看消费者组lag
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group your_group --describe

输出结果中的LAG列显示各分区的延迟消息数,CURRENT-OFFSET是当前消费位移,LOG-END-OFFSET是生产端最新位移

  1. 通过JMX指标监控
    Kafka Broker暴露的JMX指标:
  • kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)下的records-lag-max
  • kafka.consumer:type=consumer-fetch-manager-metrics,partition=([-.\w]+),topic=([-.\w]+),client-id=([-.\w]+)下的records-lag
  1. 使用Prometheus + Grafana监控
    安装Kafka Exporter:
wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.6.1/kafka_exporter-1.6.1.windows-amd64.tar.gz
tar -xzf kafka_exporter-1.6.1.windows-amd64.tar.gz
cd kafka_exporter-1.6.1.windows-amd64
kafka_exporter.exe --kafka.server=localhost:9092

在Grafana中导入Kafka仪表板(如ID 7589)

  1. 编程方式获取(Java示例)
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (AdminClient admin = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets("your_group");Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get();// 获取各分区最新offsetList<TopicPartition> partitions = offsets.keySet().stream().collect(Collectors.toList());Map<TopicPartition, Long> endOffsets = admin.listOffsets(partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))).all().get().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));// 计算lagoffsets.forEach((tp, offsetMeta) -> {long lag = endOffsets.get(tp) - offsetMeta.offset();System.out.printf("分区 %s Lag: %d%n", tp, lag);});
}
  1. 使用Confluent Control Center(需企业版)
    提供可视化监控界面,可以直接查看消费者组的实时Lag指标和历史趋势

建议同时监控:

  • 单个分区的最大Lag值
  • 消费者组的平均Lag
  • Lag随时间的变化趋势
  • 设置阈值告警(如Lag持续超过1000条消息时触发告警)

相关文章:

  • RT-Thread中的配置
  • MySQL同步ES的6种方案!
  • [三分钟]性能测试工具JMeter入门: 下载安装JMeter并设置中文;JMeter基本使用流程
  • 解锁科研文献检索密码:多工具协同攻略
  • 给frp设置开机自启
  • 应急响应靶场web1:知攻善防实验室
  • 分布式 ID 的技术解析与实现实践
  • 【Java EE初阶 --- 多线程(初阶)】多线程的基本内容
  • ZYNQ-UART串口中断
  • 【Java篇】内存中的桥梁:Java数组与引用的灵动操作
  • 前端封装框架依赖管理全攻略:构建轻量可维护的私有框架
  • livp文件使用python转换为heic或jpeg格式
  • k8s node cgroup 泄露如何优化?
  • 深入理解 Java 观察者模式:原理、实现与应用
  • 【开发工具】Window安装WSL及配置Vscode获得Linux开发环境
  • npm install下载插件无法更新package.json和package-lock.json文件的解决办法
  • Android组件化 -> Debug模式下,本地构建module模块的AAR和APK
  • 三极管偏置电路分析
  • 51单片机入门教程——AT24C02(I2C 总线)
  • 在PBiCGStab(Preconditioned Bi-Conjugate Gradient Stabilized)算法中处理多个右端项的block版本
  • 云南省司法厅党委书记、厅长茶忠旺主动投案,正接受审查调查
  • 山东滕州车祸致6人遇难,醉驾肇事司机已被刑事拘留
  • 胖东来关闭官网内容清空?工作人员:后台维护升级
  • 五一档7.47亿收官:《水饺皇后》领跑;男观众占比增多
  • 玉渊谭天丨是自保还是自残?八个恶果透视美国征收100%电影关税
  • 山大齐鲁医院回应护士论文现“男性确诊子宫肌瘤”:给予该护士记过处分、降级处理