kafka 生产消息和消费消息 kafka-console-producer.sh kafka-console-consumer.sh
目录
- kafka-console-producer.sh
- 基本用法
- 常用参数说明
- 示例用法
- 1. 简单发送消息
- 2. 发送带键的消息
- 3. 从文件读取消息
- 4. 发送批量消息
- 5. 使用自定义配置
- 配置文件示例
- 注意事项
- kafka-console-consumer.sh
- 基本用法
- 核心参数说明
- 常见使用场景
- 1. 实时消费最新消息
- 2. 消费历史所有消息
- 3. 使用消费组
- 4. 消费特定分区
- 5. 格式化输出
- 6. 消费 JSON 格式消息
- 7. 导出消息到文件
- 8. 消费指定数量的消息
kafka-console-producer.sh
是 Kafka 提供的一个命令行工具,用于向 Kafka 主题发送消息(即生产消息)。它允许你在终端中手动输入消息,或从文件、其他命令的输出中读取消息并发送到 Kafka。
kafka-console-producer.sh
基本用法
以下是使用 kafka-console-producer.sh
的基本命令格式:
bin/kafka-console-producer.sh --bootstrap-server <broker地址> --topic <主题名>
常用参数说明
--bootstrap-server
: 指定 Kafka broker 的地址(例如localhost:9092
),用于建立初始连接。--topic
: 指定要发送消息的主题名称。--property
: 设置额外的生产者配置,例如:parse.key=true
: 启用键值对模式(需要配合key.separator
使用)。key.separator=,
: 指定键和值之间的分隔符(默认为制表符\t
)。
--producer.config
: 指定生产者配置文件的路径。--request-required-acks
用于控制生产者发送消息后需要等待多少个副本确认接收,以此来平衡消息发送的可靠性和性能。这个参数会影响消息的持久性保证和发送延迟。
该参数指定了生产者在认定消息发送成功之前需要收到的确认数(acks)。主要有以下几种取值:
- 0:生产者不等待任何副本的确认,直接认为消息发送成功。
优点:发送延迟最低。
缺点:如果消息在传输过程中丢失,生产者不会知道,可能导致数据丢失。 - 1(默认值):生产者只需要等待 Leader 副本确认接收即可。
优点:在 Leader 正常工作的情况下,可以保证消息不丢失。
缺点:如果 Leader 接收消息后立即崩溃,而消息尚未同步到 Follower 副本,则可能导致数据丢失。 - -1 或 all:生产者需要等待所有 ISR(In-Sync Replicas)中的副本都确认接收消息。
优点:提供最高的消息持久性保证,只要有一个 ISR 副本存活,消息就不会丢失。
缺点:发送延迟最高,尤其是当 ISR 中的副本数量较多时。
示例用法
1. 简单发送消息
启动生产者并手动输入消息:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
输入消息后按回车键发送,输入 Ctrl + D
退出。
2. 发送带键的消息
使用键值对模式(键和值用逗号分隔):
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic \--property "parse.key=true" \--property "key.separator=,"
输入格式示例:
key1,value1
key2,value2
3. 从文件读取消息
将文件内容作为消息发送:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic < messages.txt
4. 发送批量消息
结合其他命令生成消息:
# 发送 1 到 100 的数字作为消息
seq 1 100 | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
5. 使用自定义配置
指定生产者配置文件(例如包含安全认证信息):
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic \--producer.config /path/to/producer.properties
配置文件示例
producer.properties
示例内容:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="your_username" \password="your_password";
注意事项
- 确保 Kafka 集群正常运行且可访问。
- 如果主题不存在,需要检查 Kafka 配置中的
auto.create.topics.enable
是否为true
(默认是),或手动创建主题。 - 在生产环境中,建议使用配置文件而非命令行参数传递敏感信息(如认证凭证)。
- 对于大量数据的导入,考虑使用 Kafka Connect 或专用的生产者应用程序,而非控制台生产者。
kafka-console-consumer.sh
kafka-console-consumer.sh
是 Kafka 提供的命令行工具,用于从 Kafka 主题消费(读取)消息。它支持多种消费模式,适合快速测试、调试或数据导出等场景。
基本用法
以下是常用的命令格式:
bin/kafka-console-consumer.sh --bootstrap-server <broker地址> --topic <主题名> [其他参数]
核心参数说明
参数 | 作用 |
---|---|
--bootstrap-server | 指定 Kafka broker 地址(如 localhost:9092 ) |
--topic | 指定要消费的主题名 |
--group | 指定消费组 ID(默认自动生成临时组) |
--from-beginning | 从主题的最早消息开始消费 |
--partition | 指定消费特定分区(需配合 --offset 使用) |
--offset | 指定消费起始位置(如 earliest 、latest 、具体偏移量) |
--max-messages | 指定消费的最大消息数量后退出 |
--formatter | 指定消息格式器(如 JSON 解析器) |
--property print.key=true | 打印消息的键(默认只打印值) |
--property key.separator | 指定键值分隔符(默认 \t ) |
--property print.timestamp=true | 打印消息时间戳 |
常见使用场景
1. 实时消费最新消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
- 效果:持续监听
test-topic
的新消息(从当前位置开始) - 退出:按
Ctrl+C
2. 消费历史所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
- 效果:从主题的第一条消息开始消费,直到最新消息
3. 使用消费组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --group my-group
- 效果:加入
my-group
消费组,实现消息负载均衡 - 特性:消费进度会被记录,重启后从上次位置继续消费
4. 消费特定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 0 --offset earliest
- 效果:只消费分区 0 的消息,从最早位置开始
5. 格式化输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \--property print.key=true \--property print.timestamp=true \--property key.separator=":"
- 输出格式示例:
[2023-10-01T12:00:00,000] key1:value1 [2023-10-01T12:00:01,000] key2:value2
6. 消费 JSON 格式消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
7. 导出消息到文件
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning > messages.txt
8. 消费指定数量的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --max-messages 100