当前位置: 首页 > news >正文

kafka 生产消息和消费消息 kafka-console-producer.sh kafka-console-consumer.sh

目录

    • kafka-console-producer.sh
      • 基本用法
      • 常用参数说明
      • 示例用法
        • 1. 简单发送消息
        • 2. 发送带键的消息
        • 3. 从文件读取消息
        • 4. 发送批量消息
        • 5. 使用自定义配置
      • 配置文件示例
      • 注意事项
    • kafka-console-consumer.sh
      • 基本用法
      • 核心参数说明
      • 常见使用场景
        • 1. 实时消费最新消息
        • 2. 消费历史所有消息
        • 3. 使用消费组
        • 4. 消费特定分区
        • 5. 格式化输出
        • 6. 消费 JSON 格式消息
        • 7. 导出消息到文件
        • 8. 消费指定数量的消息

kafka-console-producer.sh 是 Kafka 提供的一个命令行工具,用于向 Kafka 主题发送消息(即生产消息)。它允许你在终端中手动输入消息,或从文件、其他命令的输出中读取消息并发送到 Kafka。

kafka-console-producer.sh

基本用法

以下是使用 kafka-console-producer.sh 的基本命令格式:

bin/kafka-console-producer.sh --bootstrap-server <broker地址> --topic <主题名>

常用参数说明

  • --bootstrap-server: 指定 Kafka broker 的地址(例如 localhost:9092),用于建立初始连接。
  • --topic: 指定要发送消息的主题名称。
  • --property: 设置额外的生产者配置,例如:
    • parse.key=true: 启用键值对模式(需要配合 key.separator 使用)。
    • key.separator=,: 指定键和值之间的分隔符(默认为制表符 \t)。
  • --producer.config: 指定生产者配置文件的路径。
  • --request-required-acks 用于控制生产者发送消息后需要等待多少个副本确认接收,以此来平衡消息发送的可靠性和性能。这个参数会影响消息的持久性保证和发送延迟。

该参数指定了生产者在认定消息发送成功之前需要收到的确认数(acks)。主要有以下几种取值:

  • 0:生产者不等待任何副本的确认,直接认为消息发送成功。
    优点:发送延迟最低。
    缺点:如果消息在传输过程中丢失,生产者不会知道,可能导致数据丢失。
  • 1(默认值):生产者只需要等待 Leader 副本确认接收即可。
    优点:在 Leader 正常工作的情况下,可以保证消息不丢失。
    缺点:如果 Leader 接收消息后立即崩溃,而消息尚未同步到 Follower 副本,则可能导致数据丢失。
  • -1 或 all:生产者需要等待所有 ISR(In-Sync Replicas)中的副本都确认接收消息。
    优点:提供最高的消息持久性保证,只要有一个 ISR 副本存活,消息就不会丢失。
    缺点:发送延迟最高,尤其是当 ISR 中的副本数量较多时。

示例用法

1. 简单发送消息

启动生产者并手动输入消息:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic

输入消息后按回车键发送,输入 Ctrl + D 退出。

2. 发送带键的消息

使用键值对模式(键和值用逗号分隔):

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic \--property "parse.key=true" \--property "key.separator=,"

输入格式示例:

key1,value1
key2,value2
3. 从文件读取消息

将文件内容作为消息发送:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic < messages.txt
4. 发送批量消息

结合其他命令生成消息:

# 发送 1 到 100 的数字作为消息
seq 1 100 | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
5. 使用自定义配置

指定生产者配置文件(例如包含安全认证信息):

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic \--producer.config /path/to/producer.properties

配置文件示例

producer.properties 示例内容:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="your_username" \password="your_password";

注意事项

  • 确保 Kafka 集群正常运行且可访问。
  • 如果主题不存在,需要检查 Kafka 配置中的 auto.create.topics.enable 是否为 true(默认是),或手动创建主题。
  • 在生产环境中,建议使用配置文件而非命令行参数传递敏感信息(如认证凭证)。
  • 对于大量数据的导入,考虑使用 Kafka Connect 或专用的生产者应用程序,而非控制台生产者。

kafka-console-consumer.sh

kafka-console-consumer.sh 是 Kafka 提供的命令行工具,用于从 Kafka 主题消费(读取)消息。它支持多种消费模式,适合快速测试、调试或数据导出等场景。

基本用法

以下是常用的命令格式:

bin/kafka-console-consumer.sh --bootstrap-server <broker地址> --topic <主题名> [其他参数]

核心参数说明

参数作用
--bootstrap-server指定 Kafka broker 地址(如 localhost:9092
--topic指定要消费的主题名
--group指定消费组 ID(默认自动生成临时组)
--from-beginning从主题的最早消息开始消费
--partition指定消费特定分区(需配合 --offset 使用)
--offset指定消费起始位置(如 earliestlatest、具体偏移量)
--max-messages指定消费的最大消息数量后退出
--formatter指定消息格式器(如 JSON 解析器)
--property print.key=true打印消息的键(默认只打印值)
--property key.separator指定键值分隔符(默认 \t
--property print.timestamp=true打印消息时间戳

常见使用场景

1. 实时消费最新消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
  • 效果:持续监听 test-topic 的新消息(从当前位置开始)
  • 退出:按 Ctrl+C
2. 消费历史所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
  • 效果:从主题的第一条消息开始消费,直到最新消息
3. 使用消费组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --group my-group
  • 效果:加入 my-group 消费组,实现消息负载均衡
  • 特性:消费进度会被记录,重启后从上次位置继续消费
4. 消费特定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 0 --offset earliest
  • 效果:只消费分区 0 的消息,从最早位置开始
5. 格式化输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \--property print.key=true \--property print.timestamp=true \--property key.separator=":"
  • 输出格式示例:
    [2023-10-01T12:00:00,000] key1:value1
    [2023-10-01T12:00:01,000] key2:value2
    
6. 消费 JSON 格式消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
7. 导出消息到文件
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning > messages.txt
8. 消费指定数量的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --max-messages 100
http://www.dtcms.com/a/290724.html

相关文章:

  • Python 进阶(六): Word 基本操作
  • ROS 与 Ubuntu 版本的对应关系
  • 初学者STM32—USART
  • 了解类加载器吗?类加载器的类型有哪些?
  • Java 大视界 -- 基于 Java 的大数据分布式计算在地球物理勘探数据处理与地质结构建模中的应用(356)
  • 鹏鼎控股入职测评综合能力真题SHL测评题库2025年攻略
  • postgresql16.4 配置 数据库主从
  • PyTorch 实现 CIFAR-10 图像分类:从数据预处理到模型训练与评估
  • git bash命令不够完善,想整合msys2该怎么办?
  • 02-UE5蓝图初始的三个节点作用
  • 文娱投资的逆势突破:博派资本的文化旅游综合体战略
  • 阿里云宝塔Linux面板相关操作记录
  • 照片to谷歌地球/奥维地图新增功能:导出 GeoJSON 数据
  • 高级技术【Java】【反射】【注解】【动态代理】
  • c++:父类的析构函数定义为纯虚函数注意事项
  • “专属私有云”或“行业公有云(逻辑隔离的公共云专区)”两种主流部署模式到底有什么区别?政务云不就应该是专属的私有云么?政务云是不是不能混用?
  • 网络编程基础:从 OSI 模型到 TCP/IP 协议族的全面解析
  • 【AI高性能网络解析】第三期:数据快递,海量数据跨广域高效传输技术实践
  • 计算机网络:概述层---计算机网络的组成和功能
  • harbor镜像仓库由原来的v2.11.1版本升级到v2.13.1,数据不丢失
  • Taro 生命周期相关 API 详解
  • HTML整理
  • Lists的分批次操作
  • 安卓第一个项目
  • 信息学奥赛一本通 1576:【例 2】选课 | 洛谷 P2014 [CTSC1997] 选课
  • Netty中CompositeByteBuf的使用
  • 位标志法处理多选字段在数据库中的存储方式 查询效率与扩展性之间的权衡
  • https正向代理 GoProxy
  • 苹果最新系统iOS 17的调试和适配方法 - Xcode 14.3.1 真机调试指南
  • How does Misinformation Affect Large Language ModelBehaviors and Preferences?