当前位置: 首页 > news >正文

往kafka创建生产者和消费者,并且打数据和消费数据

1. 创建Topic(在任意节点执行)
# 进入Kafka的bin目录(替换为实际路径)
cd /opt/kafka/bin# 创建名为my_topic的Topic,副本因子3,分区数3,保留7天
./kafka-topics.sh \--create \--zookeeper node1:2181,node2:2181,node3:2181 \--replication-factor 3 \--partitions 3 \--topic my_topic \--config retention.ms=604800000
2. 验证Topic创建成功
# 查看Topic详情
./kafka-topics.sh \--describe \--zookeeper node1:2181 \--topic my_topic# 预期输出:
# Topic: my_topic   PartitionCount:3    ReplicationFactor:3    Configs: retention.ms=604800000
# Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
# Partition: 1    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
# Partition: 2    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
3. 启动生产者持续发送数据
# 在后台启动生产者(每秒发送一条带时间戳的数据)
nohup bash -c ' \while true; do \echo "测试数据 $(date +\"%Y-%m-%d %H:%M:%S\")" | \./kafka-console-producer.sh \--broker-list node1:9092,node2:9092,node3:9092 \--topic my_topic; \sleep 1; \done \
' > producer.log 2>&1 &
关键参数说明:
  • ZooKeeper地址node1:2181,node2:2181,node3:2181(替换为实际ZooKeeper集群地址)
  • Broker地址node1:9092,node2:9092,node3:9092(替换为实际Kafka Broker地址)
  • 保留时间retention.ms=604800000(7天,单位毫秒)
  • 副本因子:3(确保数据在3个节点上各存一份)
验证生产者运行:
# 实时查看生产者日志
tail -f producer.log# 使用消费者验证数据
./kafka-console-consumer.sh \--bootstrap-server node1:9092 \--topic my_topic \--from-beginning
注意事项:
  1. 确保ZooKeeper和Kafka集群已启动

  2. 替换所有IP地址和路径为实际集群配置

  3. 生产环境建议配置防火墙规则开放2181/9092端口

  4. 可通过jps命令检查Kafka进程状态

消费Kafka数据。以下是正确的Shell脚本实现:
#!/bin/bash# Kafka消费者脚本
# 配置参数
BOOTSTRAP_SERVERS="192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092"  # 替换为您的Kafka Broker地址
TOPIC="my_topic"  # 与生产者Topic名称一致
GROUP_ID="shell-consumer-group"echo "启动Kafka消费者,消费Topic: $TOPIC"
echo "按Ctrl+C停止消费"
echo "--------------------------------"# 持续消费消息
while true; do# 使用kafka-console-consumer.sh消费消息kafka-console-consumer.sh \--bootstrap-server "$BOOTSTRAP_SERVERS" \--topic "$TOPIC" \--group "$GROUP_ID" \--from-beginning \--timeout-ms 1000 \--max-messages 10# 每批消息后暂停1秒sleep 1
done

使用说明:

  1. 将上述内容保存为kafka_consumer.sh

  2. 添加执行权限:

    bashchmod +x kafka_consumer.sh
    
  3. 运行脚本:

    bash./kafka_consumer.sh
    

关键参数说明:

  • BOOTSTRAP_SERVERS:Kafka集群地址,多个Broker用逗号分隔
  • TOPIC:要消费的Topic名称,需与生产者使用的Topic一致
  • GROUP_ID:消费者组ID,同一组内的消费者共享消费进度
  • --from-beginning:从最早的消息开始消费
  • --timeout-ms:每次拉取消息的超时时间
  • --max-messages:每次拉取的最大消息数量

注意事项:

  1. 确保Kafka安装路径正确,kafka-console-consumer.sh在系统PATH中
  2. 如果使用非标准端口,请修改9092为实际端口
  3. 生产环境建议添加日志持久化功能
  4. 可通过--offset参数指定消费起始位置

这个脚本会持续消费my_topic中的消息,每批消费10条,然后暂停1秒,直到用户手动停止(按Ctrl+C)

http://www.dtcms.com/a/482271.html

相关文章:

  • linux iptables介绍
  • sqlite: 动态列类型
  • 做商品网站数据库有哪些阜阳做网站多少钱
  • 房地产开发公司网站网站推广方案200字
  • Android MVVM架构解析:现代开发的首选模式
  • 车机系统的「共享镜头」:如何实现多用户同时拍照
  • 开源链动2+1模式AI智能名片S2B2C商城小程序在竞争激烈的中低端面膜服装行业中的应用与策略
  • Java学习路线推荐!
  • 网站伪静态是什么意思个人网站设计模板素材
  • 萧山工程建设有限公司网站济南网站建设公司哪家专业
  • KingbaseES JDBC 深度实战指南(上):从驱动选型到连接管理,夯实国产数据库交互基础
  • Datawhale25年10月组队学习:math for AI+Task1简介和动机
  • Blender从入门到精通:建模、材质与动画完整实战教程
  • QT QML交互原理:信号与槽机制
  • 怎么做网站投放广告的代理商临沂市罗庄区住房和建设局网站
  • 新浪云sae免费wordpress网站wordpress文章图片本地化
  • 蜱媒病原体的宏基因组发现与机器学习预测模型构建
  • MySQL----锁
  • 《探秘 Linux 进程控制:驾驭系统运行的核心之力》
  • 客户价值体系构建咨询——南方略咨询集团
  • 做户外旅游网站微信网页版官网登录
  • 从QT软件开发到UI设计落地:兰亭妙微的全流程体验方法论
  • 开源 C++ QT QML 开发(二十)多媒体--摄像头拍照
  • Redis速通
  • 误删mysql某表数据,通过binlog2sql工具数据恢复
  • MyBatisPlus中LambdaQueryChainWrapper链式条件查询的常用示例
  • STM32与W25Q64 SPI通信全解析
  • 创办个人网站淘客怎样做网站
  • 网站流量用完了湘潭网络公司
  • Cogent DataHub vs Kepware,两大工业数据中间件的深度对比分析