当前位置: 首页 > news >正文

【Kafka基础】消费者命令行完全指南:从基础到高级消费

Kafka消费者是消息系统的关键组成部分,掌握/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh工具的使用对于调试、测试和监控都至关重要。本文将全面介绍该工具的各种用法,帮助您高效地从Kafka消费消息。

1 基础消费模式

1.1 从最新位置消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic
参数解析
  • --bootstrap-server: 指定Kafka集群地址
  • --topic: 指定消费的主题名称
特点
  • 从该消费者组最后提交的offset开始消费
  • 如果没有提交记录,则从最新消息开始

1.2 从最早位置消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --from-beginning
关键参数
  • --from-beginning: 从主题最早的消息开始消费
应用场景
  • 数据回溯
  • 新消费者组初始化

2 消息元数据展示

2.1 显示消息Key

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property print.key=true \
    --property key.separator=":"
参数说明
  • print.key=true: 显示消息key
  • key.separator: 指定key/value分隔符

2.2 显示完整元数据

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property print.key=true \
    --property print.value=true \
    --property print.partition=true \
    --property print.offset=true \
    --property print.timestamp=true \
    --property key.separator=":" \
    --property line.separator="\n"
元数据参数
  • print.partition: 显示分区号
  • print.offset: 显示消息offset
  • print.timestamp: 显示时间戳

3 精准消费控制

3.1 指定分区消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --partition 1
参数说明
  • --partition: 指定消费的分区编号

3.2 指定Offset消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --partition 0 \
    --offset 1000
参数说明
  • --offset: 指定开始消费的offset位置

3.3 消费超时设置

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --timeout-ms 10000
参数说明
  • --timeout-ms: 设置无消息时的超时时间(毫秒)

4 消费者组管理

4.1 使用消费者组

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --group mygroup
参数说明
  • --group: 指定消费者组名称
特点
  • 支持offset自动提交
  • 支持消费者组rebalance

4.2 手动控制offset提交

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --group mygroup \
    --property enable.auto.commit=false
参数说明
  • enable.auto.commit=false: 关闭自动提交

5 高级消费配置

5.1 限制消费消息数

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --max-messages 100
参数说明
  • --max-messages: 最大消费消息数

5.2 过滤消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property filter.key.regex="test.*"
参数说明
  • filter.key.regex: 按key正则过滤

5.3 消费速率控制

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property fetch.min.bytes=1024 \
    --property fetch.max.wait.ms=500
参数说明
  • fetch.min.bytes: 最小拉取字节数
  • fetch.max.wait.ms: 最大等待时间

6 常用命令扩展

6.1 消费多个主题

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --whitelist "topic1|topic2"

6.2 显示消息头信息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property print.headers=true

6.3 消费特定时间后的消息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \
    --bootstrap-server 192.168.10.33:9092 \
    --topic testtopic \
    --property offsets.storage=kafka \
    --property timestamp=1625097600000
http://www.dtcms.com/a/122613.html

相关文章:

  • 软考高级-系统架构设计师 案例题-软件架构设计
  • vue:前端预览 / chrome浏览器设置 / <iframe> 方法预览 doc、pdf / vue-pdf 预览pdf
  • 蓝桥杯 一年中的第几天(日期问题)
  • 如何运用浏览器进行各种调试?(网络、内存、控制台等调试用法)
  • 前端实战:基于Vue3与免费满血版DeepSeek实现无限滚动+懒加载+瀑布流模块及优化策略
  • Vert.x vs. Micronaut:2025年高并发Java框架选型指南
  • redisson常用加锁方式
  • 【代码模板】判断C语言中文件是否存在?错误:‘F_OK’未声明如何处理?(access;#include “unistd.h“)
  • 【智慧养猪场】-猪的行为分析视频数据集及展示(已做好分类)
  • C —— 宏
  • Redis-场景缓存+秒杀+管道+消息队列
  • 保留格式地一键翻译英文ppt
  • etf可以T+0交易吗?
  • 基础知识补充篇:什么是DAPP前端连接中的provider
  • 用网页JS实现数据添加和取出的操作,链表
  • Class 文件和类加载机制
  • 【10】数据结构的矩阵与广义表篇章
  • 聊透多线程编程-线程基础-3.C# Thread 如何从非UI线程直接更新UI元素
  • 学习MySQL的第六天
  • vue+uniapp 获取上一页直接传递的参数
  • 大数据(6)【Kettle入门指南】从零开始掌握ETL工具:基础操作与实战案例解析
  • Spring Boot 自定义配置类(包含字符串、数字、布尔、小数、集合、映射、嵌套对象)实现步骤及示例
  • PHP 表单处理详解
  • docker安装软件汇总(持续更新)
  • 2022年全国职业院校技能大赛 高职组 “大数据技术与应用” 赛项赛卷(2卷)任务书
  • (三)行为模式:12、访问者模式(Visitor Pattern)(C++示例)
  • 家居实用品:生活中的艺术,家的温馨源泉‌
  • skynet.dispatch 使用详解
  • 微信小程序中的openid的作用
  • 对比 redis keys 命令 ,下次面试说用 scan