往kafka创建生产者和消费者,并且打数据和消费数据
1. 创建Topic(在任意节点执行)
# 进入Kafka的bin目录(替换为实际路径)
cd /opt/kafka/bin# 创建名为my_topic的Topic,副本因子3,分区数3,保留7天
./kafka-topics.sh \--create \--zookeeper node1:2181,node2:2181,node3:2181 \--replication-factor 3 \--partitions 3 \--topic my_topic \--config retention.ms=604800000
2. 验证Topic创建成功
# 查看Topic详情
./kafka-topics.sh \--describe \--zookeeper node1:2181 \--topic my_topic# 预期输出:
# Topic: my_topic PartitionCount:3 ReplicationFactor:3 Configs: retention.ms=604800000
# Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
# Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
3. 启动生产者持续发送数据
# 在后台启动生产者(每秒发送一条带时间戳的数据)
nohup bash -c ' \while true; do \echo "测试数据 $(date +\"%Y-%m-%d %H:%M:%S\")" | \./kafka-console-producer.sh \--broker-list node1:9092,node2:9092,node3:9092 \--topic my_topic; \sleep 1; \done \
' > producer.log 2>&1 &
关键参数说明:
- ZooKeeper地址:
node1:2181,node2:2181,node3:2181
(替换为实际ZooKeeper集群地址) - Broker地址:
node1:9092,node2:9092,node3:9092
(替换为实际Kafka Broker地址) - 保留时间:
retention.ms=604800000
(7天,单位毫秒) - 副本因子:3(确保数据在3个节点上各存一份)
验证生产者运行:
# 实时查看生产者日志
tail -f producer.log# 使用消费者验证数据
./kafka-console-consumer.sh \--bootstrap-server node1:9092 \--topic my_topic \--from-beginning
注意事项:
-
确保ZooKeeper和Kafka集群已启动
-
替换所有IP地址和路径为实际集群配置
-
生产环境建议配置防火墙规则开放2181/9092端口
-
可通过
jps
命令检查Kafka进程状态
消费Kafka数据。以下是正确的Shell脚本实现:
#!/bin/bash# Kafka消费者脚本
# 配置参数
BOOTSTRAP_SERVERS="192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092" # 替换为您的Kafka Broker地址
TOPIC="my_topic" # 与生产者Topic名称一致
GROUP_ID="shell-consumer-group"echo "启动Kafka消费者,消费Topic: $TOPIC"
echo "按Ctrl+C停止消费"
echo "--------------------------------"# 持续消费消息
while true; do# 使用kafka-console-consumer.sh消费消息kafka-console-consumer.sh \--bootstrap-server "$BOOTSTRAP_SERVERS" \--topic "$TOPIC" \--group "$GROUP_ID" \--from-beginning \--timeout-ms 1000 \--max-messages 10# 每批消息后暂停1秒sleep 1
done
使用说明:
-
将上述内容保存为
kafka_consumer.sh
-
添加执行权限:
bashchmod +x kafka_consumer.sh
-
运行脚本:
bash./kafka_consumer.sh
关键参数说明:
BOOTSTRAP_SERVERS
:Kafka集群地址,多个Broker用逗号分隔TOPIC
:要消费的Topic名称,需与生产者使用的Topic一致GROUP_ID
:消费者组ID,同一组内的消费者共享消费进度--from-beginning
:从最早的消息开始消费--timeout-ms
:每次拉取消息的超时时间--max-messages
:每次拉取的最大消息数量
注意事项:
- 确保Kafka安装路径正确,
kafka-console-consumer.sh
在系统PATH中 - 如果使用非标准端口,请修改
9092
为实际端口 - 生产环境建议添加日志持久化功能
- 可通过
--offset
参数指定消费起始位置
这个脚本会持续消费my_topic
中的消息,每批消费10条,然后暂停1秒,直到用户手动停止(按Ctrl+C)