安装
pip install nacos-sdk-python-1.0.0
from nacos import NacosClient
import json
import time
from typing import Optional, Dict, List, Anyclass NacosManager:"""Nacos配置与服务管理工具类,封装了增删改查操作"""def __init__(self, server_addresses: str, namespace: str = ""):"""初始化Nacos客户端:param server_addresses: Nacos服务器地址,格式为"ip:port":param namespace: 命名空间ID,默认为空"""self.client = NacosClient(server_addresses, namespace=namespace)def create_or_update_config(self, data_id: str, group: str, content: str) -> bool:"""创建或更新配置(存在则更新,不存在则创建):param data_id: 配置ID:param group: 配置分组:param content: 配置内容:return: 操作是否成功"""try:return self.client.publish_config(data_id, group, content)except Exception as e:print(f"配置创建/更新失败: {str(e)}")return Falsedef get_config(self, data_id: str, group: str, parse_json: bool = False) -> Optional[Any]:"""获取配置内容:param data_id: 配置ID:param group: 配置分组:param parse_json: 是否自动解析JSON格式:return: 配置内容(字符串或字典),失败返回None"""try:content = self.client.get_config(data_id, group)if not content:print(f"配置不存在: data_id={data_id}, group={group}")return Noneif parse_json:return json.loads(content)return contentexcept json.JSONDecodeError:print(f"配置不是有效的JSON格式: data_id={data_id}")return Noneexcept Exception as e:print(f"获取配置失败: {str(e)}")return Nonedef delete_config(self, data_id: str, group: str) -> bool:"""删除配置:param data_id: 配置ID:param group: 配置分组:return: 操作是否成功"""try:return self.client.remove_config(data_id, group)except Exception as e:print(f"删除配置失败: {str(e)}")return Falsedef register_service(self, service_name: str, ip: str, port: int,weight: float = 1.0, cluster_name: str = "DEFAULT") -> bool:"""注册服务实例:param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param weight: 服务权重:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.add_naming_instance(service_name=service_name,ip=ip,port=port,weight=weight,cluster_name=cluster_name,enable=True,healthy=True)except Exception as e:print(f"服务注册失败: {str(e)}")return Falsedef get_service_instances(self, service_name: str, healthy_only: bool = True) -> List[Dict]:"""获取服务实例列表:param service_name: 服务名称:param healthy_only: 是否只返回健康实例:return: 实例列表,失败返回空列表"""try:instances = self.client.list_naming_instances(service_name=service_name,healthy_only=healthy_only)return instances if instances else []except Exception as e:print(f"获取服务实例失败: {str(e)}")return []def update_service_instance(self, service_name: str, ip: str, port: int,weight: float = 1.0, cluster_name: str = "DEFAULT") -> bool:"""更新服务实例信息(主要用于更新权重):param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param weight: 新权重值:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.modify_naming_instance(service_name=service_name,ip=ip,port=port,weight=weight,cluster_name=cluster_name)except Exception as e:print(f"更新服务实例失败: {str(e)}")return Falsedef deregister_service(self, service_name: str, ip: str, port: int,cluster_name: str = "DEFAULT") -> bool:"""注销服务实例:param service_name: 服务名称:param ip: 服务IP地址:param port: 服务端口:param cluster_name: 集群名称:return: 操作是否成功"""try:return self.client.remove_naming_instance(service_name=service_name,ip=ip,port=port,cluster_name=cluster_name)except Exception as e:print(f"注销服务失败: {str(e)}")return False
if __name__ == "__main__":nacos = NacosManager(server_addresses="127.0.0.1:8848")print("===== 配置管理操作 =====")config_data_id = "demo_config"config_group = "DEFAULT_GROUP"create_ok = nacos.create_or_update_config(data_id=config_data_id,group=config_group,content='{"name": "nacos", "version": "1.0.0"}')print(f"创建配置: {'成功' if create_ok else '失败'}")time.sleep(1) config = nacos.get_config(data_id=config_data_id,group=config_group,parse_json=True)print(f"获取配置: {config}")delete_ok = nacos.delete_config(data_id=config_data_id,group=config_group)print(f"删除配置: {'成功' if delete_ok else '失败'}")print("\n===== 服务管理操作 =====")service_name = "demo_service"service_ip = "127.0.0.1"service_port = 8080register_ok = nacos.register_service(service_name=service_name,ip=service_ip,port=service_port,weight=0.8)print(f"注册服务: {'成功' if register_ok else '失败'}")time.sleep(1) instances = nacos.get_service_instances(service_name=service_name)print(f"服务实例: {instances}")update_ok = nacos.update_service_instance(service_name=service_name,ip=service_ip,port=service_port,weight=0.5)print(f"更新服务权重: {'成功' if update_ok else '失败'}")deregister_ok = nacos.deregister_service(service_name=service_name,ip=service_ip,port=service_port)print(f"注销服务: {'成功' if deregister_ok else '失败'}")