当前位置: 首页 > news >正文

python --nacos相关

安装

# python: 3.7.3
pip install nacos-sdk-python-1.0.0
from nacos import NacosClient
import json
import time
from typing import Optional, Dict, List, Anyclass NacosManager:"""Nacos配置与服务管理工具类,封装了增删改查操作"""def __init__(self, server_addresses: str, namespace: str = ""):"""初始化Nacos客户端:param server_addresses: Nacos服务器地址,格式为"ip:port":param namespace: 命名空间ID,默认为空"""self.client = NacosClient(server_addresses, namespace=namespace)# ------------------------------# 配置管理相关操作# ------------------------------def create_or_update_config(self, data_id: str, group: str, content: str) -> bool:"""创建或更新配置(存在则更新,不存在则创建):param data_id: 配置ID:param group: 配置分组:param content: 配置内容:return: 操作是否成功"""try:return self.client.publish_config(data_id, group, content)except Exception as e:print(f"配置创建/更新失败: {str(e)}")return Falsedef get_config(self, data_id: str, group: str, parse_json: bool = False) -> Optional[Any]:"""获取配置内容:param data_id: 配置ID:param group: 配置分组:param parse_json: 是否自动解析JSON格式:return: 配置内容(字符串或字典),失败返回None"""try:content = self.client.get_config(data_id, group)if not content:print(f"配置不存在: data_id={data_id}, group={group}")return Noneif parse_json:return json.loads(content)return contentexcept json.JSONDecodeError:print(f"配置不是有效的JSON格式: data_id={data_id}")return Noneexcept Exception as e:print(f"获取配置失败: {str(e)}")return Nonedef delete_config(self, data_id: str, group: str) -> bool:"""删除配置:param data_id: 配置ID:param group: 配置分组:return: 操作是否成功"""try:return self.client.remove_config(data_id, group)except Exception as e:print(f"删除配置失败: {str(e)}")return False# ------------------------------# 服务管理相关操作# ------------------------------def register_service(self, service_name: str, ip: str, port: int,weight: float = 1.0, cluster_name: str = "DEFAULT") -> bool:"""注册服务实例:param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param weight: 服务权重:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.add_naming_instance(service_name=service_name,ip=ip,port=port,weight=weight,cluster_name=cluster_name,enable=True,healthy=True)except Exception as e:print(f"服务注册失败: {str(e)}")return Falsedef get_service_instances(self, service_name: str, healthy_only: bool = True) -> List[Dict]:"""获取服务实例列表:param service_name: 服务名称:param healthy_only: 是否只返回健康实例:return: 实例列表,失败返回空列表"""try:instances = self.client.list_naming_instances(service_name=service_name,healthy_only=healthy_only)return instances if instances else []except Exception as e:print(f"获取服务实例失败: {str(e)}")return []def update_service_instance(self, service_name: str, ip: str, port: int,weight: float = 1.0, cluster_name: str = "DEFAULT") -> bool:"""更新服务实例信息(主要用于更新权重):param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param weight: 新权重值:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.modify_naming_instance(service_name=service_name,ip=ip,port=port,weight=weight,cluster_name=cluster_name)except Exception as e:print(f"更新服务实例失败: {str(e)}")return Falsedef deregister_service(self, service_name: str, ip: str, port: int,cluster_name: str = "DEFAULT") -> bool:"""注销服务实例:param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.remove_naming_instance(service_name=service_name,ip=ip,port=port,cluster_name=cluster_name)except Exception as e:print(f"注销服务失败: {str(e)}")return False# 使用示例
if __name__ == "__main__":# 初始化Nacos管理器(替换为你的Nacos地址)nacos = NacosManager(server_addresses="127.0.0.1:8848")# 配置管理示例print("===== 配置管理操作 =====")config_data_id = "demo_config"config_group = "DEFAULT_GROUP"# 创建/更新配置create_ok = nacos.create_or_update_config(data_id=config_data_id,group=config_group,content='{"name": "nacos", "version": "1.0.0"}')print(f"创建配置: {'成功' if create_ok else '失败'}")time.sleep(1)  # 等待配置同步# 获取配置(自动解析JSON)config = nacos.get_config(data_id=config_data_id,group=config_group,parse_json=True)print(f"获取配置: {config}")# 删除配置delete_ok = nacos.delete_config(data_id=config_data_id,group=config_group)print(f"删除配置: {'成功' if delete_ok else '失败'}")# 服务管理示例print("\n===== 服务管理操作 =====")service_name = "demo_service"service_ip = "127.0.0.1"service_port = 8080# 注册服务register_ok = nacos.register_service(service_name=service_name,ip=service_ip,port=service_port,weight=0.8)print(f"注册服务: {'成功' if register_ok else '失败'}")time.sleep(1)  # 等待服务注册生效# 获取服务实例instances = nacos.get_service_instances(service_name=service_name)print(f"服务实例: {instances}")# 更新服务权重update_ok = nacos.update_service_instance(service_name=service_name,ip=service_ip,port=service_port,weight=0.5)print(f"更新服务权重: {'成功' if update_ok else '失败'}")# 注销服务deregister_ok = nacos.deregister_service(service_name=service_name,ip=service_ip,port=service_port)print(f"注销服务: {'成功' if deregister_ok else '失败'}")
http://www.dtcms.com/a/325598.html

相关文章:

  • MSE ZooKeeper:Flink高可用架构的企业级选择
  • 《图解技术体系》New generation CMDB resource model framework
  • 自然语言处理实战:用LSTM打造武侠小说生成器
  • 【AI论文】R-Zero:从零数据起步的自进化推理大语言模型
  • JavaScript 中如何实现大文件并行下载
  • AI(2)-神经网络(激活函数)
  • 支持小语种的在线客服系统,自动翻译双方语言,适合对接跨境海外客户
  • NY185NY190美光固态闪存NY193NY195
  • 《深度剖析前端框架中错误边界:异常处理的基石与进阶》
  • pom.xml父子模块配置
  • 深入理解Android Kotlin Flow:响应式编程的现代实践
  • 部署open-webui到本地
  • TDengine IDMP 基本功能(1.界面布局和操作)
  • 某地渣库边坡自动化监测服务项目
  • 企业高性能web服务器1
  • FPGA实现Aurora 8B10B图像视频传输,基于GTX高速收发器,提供4套工程源码和技术支持
  • 新手向:Python实现数据可视化图表生成
  • LVPECL、LVDS、LVTTL、LVCMOS四种逻辑电平标准的全面对比
  • DDoS 攻击成本测算:从带宽损耗到业务中断的量化分析
  • FPGA硬件设计1 最小芯片系统-Altera EP4CE10F17C8、Xilinx xc7a100t
  • 邬贺铨院士:AI与数字安全融合是数字化建设核心驱动力
  • 使用TextureView和MediaPlayer播放视频黑屏问题
  • 设计模式(三)——观察者模式
  • 数据结构:串、数组与广义表
  • 使用 Rust 创建 32 位 DLL 的完整指南
  • VoxCraft-生数科技推出的免费3D模型AI生成工具
  • Rust 库开发全面指南
  • Vue 项目中主从表异步保存实战:缓存导致接口不执行问题排查与解决
  • 芯盾时代 SDP 助力运营商远程接入体系全面升级
  • linux实战:基于Ubuntu的专业相机