当前位置: 首页 > news >正文

从kafka和zookeeper中获取生产和消费偏移量

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST]
                       [-m INTERVAL_MINUTES]

Usage of argparse

optional arguments:
  -h, --help            show this help message and exit
  -k KAFKA_HOST, --kafka_host KAFKA_HOST
                        需要输入kafka:端口
  -z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST
                        需要输入zookeeper:端口
  -m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES
                        间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartition


def get_zoo_consumer_info(Topology):
    Topology_num = 0
    zk_cli.start()
    path = "/stormOffset/" + Topology + "/partition_0"
    if zk_cli.exists(path):
        str_data, stat = zk_cli.get(path)
        str_data = json.loads(str_data)
        Topology_num =  str_data.get("offset")
        #print("zookeeper now " + path + " offsets: " + str(Topology_num) )
    else:   
        print("Path " + path  + " does not exist.")
    return Topology_num

def get_kafka_consumer_info(server, topic):
    partition = 0
    tp = TopicPartition(topic, partition)
    end_offset = server.end_offsets([tp])[tp]
    #print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))
    return end_offset


if  __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Usage of argparse')
    parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')
    parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')
    parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')
    
    args = parser.parse_args()
    kafka_host= args.kafka_host
    zookeer_host= args.zookeeper_host

    Kafka_production_topics = "agent,record"
    Zoo_consumption_topics= "agentTopology,recordTopology"
    Interval_minutes = args.Interval_minutes

    try:
        zk_cli = KazooClient(hosts=zookeer_host)
        #print("init zookeeper " + zookeer_host + " conn ok")
    except Exception as e:
        print("init zookeeper conn error: "+ str(e))

    try:
        #kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
        #print("init kafka " + kafka_host + "  conn ok")
    except Exception as e:
        print("init kafka conn error: "+ str(e))


    zoo_offset = {}
    kafka_offset = {}
    Kafka_production_topics_list = Kafka_production_topics.split(",")
    Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")
    Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")
    Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")
    for i in range(0,len(Kafka_production_topics_list)):
        kafka_topics = Kafka_production_topics_list.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        kafka_offset[kafka_topics]=get_kafka_offset_num
        zoo_topics = Zoo_consumption_topics_list.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        zoo_offset[zoo_topics]= get_zoo_offset_num
    print("Interval " + str(Interval_minutes) + " minutes sleep")
    print("=======================================================================================")
    time.sleep(int(Interval_minutes) * 60)
    
    for i in range(0,len(Kafka_production_topics_list_2)):
        kafka_topics = Kafka_production_topics_list_2.pop()
        get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
        last_kafka_num = kafka_offset.get(kafka_topics)
        minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_num
        zoo_topics = Zoo_consumption_topics_list_2.pop()
        get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
        last_zoo_num =  zoo_offset.get(zoo_topics)
        minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_num
        Difference = minutes_kafka_offset_num - minutes_zoo_offset_num
        print("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)
        print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)
        print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))
        print("=======================================================================================")
    zk_cli.stop()
    # 关闭消费者连接
    kafka_server.close()

相关文章:

  • 手机也可以更换任意IP地址吗?
  • 【FasAPI】使用FastAPI来实现一个基于RBAC(基于角色的访问控制)的用户权限控制系统
  • Mysql之索引优化
  • 如意与葫芦:解读八卦福·门牌中的吉祥元素
  • K8s Calico替换为Cilium,以及安装Cilium过程(鲁莽版)
  • 嵌入式项目:STM32平衡车详解 (基础知识篇) (基于STM32F103C8T6)
  • Pytest-allure如何在测试完成后自动生成完整报告?
  • CVE-2024-46101
  • 本地服务器和云服务器区别在哪里
  • MobaXterm基本使用 -- 服务器状态、批量操作、显示/切换中文字体、修复zsh按键失灵
  • Rust 函数
  • 【JAVA高级】如何使用Redis加锁和解锁(一)、Lua脚本执行原理及流程
  • cMake学习笔记(初级使用)
  • C++学习:list模拟实现
  • 《中国工程科学》
  • 探索EasyCVR视频融合平台:在视频编解码与转码领域的灵活性优势
  • 【IOS】申请开发者账号(公司)
  • (已解决)torch.load的时候发生错误ModuleNotFoundError: No module named ‘models‘
  • 【Gitee自动化测试2】Git,Github,Gitlab,Gitee
  • 已存在的Python项目使用依赖管理工具UV
  • 领证不用户口本,还需哪些材料?补领证件如何操作?七问七答
  • 临港新片区:发布再保险、国际航运、生物医药3个领域数据出境操作指引
  • 中铁房地产24.7亿元竞得上海松江新城宅地,溢价率20.42%
  • 江苏省人社厅党组书记、厅长王斌接受审查调查
  • 最快3天开通一条定制公交线路!上海推出服务平台更快响应市民需求
  • 深入贯彻中央八项规定精神学习教育中央第六指导组指导督导中国工商银行见面会召开