当前位置: 首页 > news >正文

RabbitMQ RPC模式Python示例

文章目录

  • 1.服务端
  • 2.客户端
  • 3.调用结果

1.服务端

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@File:      rabbitmq_server.py
@Date:      2025/6/26 10:42
@Author:    xxx
@Description:
1. RabbitMQ服务端,支持多节点命令执行
2. 作为被控节点运行,可接收定向命令并返回结果
"""import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser# 定义日志模块
logger = logging.getLogger()
# 设置全局日志级别设为最低(DEBUG)
# # 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
logger.setLevel(logging.DEBUG)# 1. 文件日志(仅输出到文件)
file_handler = logging.FileHandler('rabbitmq_server.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)  # 文件记录DEBUG及以上级别
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)# 2. 控制台日志(仅输出到控制台)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)  # 控制台仅显示INFO及以上
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"class RabbitMQServer:"""RabbitMQ RPC服务器类功能:接收并执行来自客户端的定向命令"""def __init__(self, node_name=None, mq_user="rabbitmq", mq_password="rabbitmq@123",mq_virtual_host="/", mq_host=None, mq_port=5671,mq_ca="/opt/ssl/ca_certificate.pem"):"""初始化RabbitMQ服务端:param node_name: 节点名称标识(唯一):param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径"""# 节点配置self.NODE_NAME = node_name if node_name else socket.gethostname()# 连接配置self.RABBITMQ_USER = mq_userself.RABBITMQ_UNLOCK_CODE = mq_passwordself.RABBITMQ_VIRTUAL_HOST = mq_virtual_host# 如果没有设置RabbitMQ服务器IP,则连接到配置文件中设置的IP节点self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")self.RABBITMQ_PORT = mq_portself.SSL_CA_PATH = mq_ca# 初始化连接self._setup_connection()def get_option(self, file_path, section, option):"""获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据"""parser = configparser.ConfigParser()parser.read(file_path)if not parser.has_option(section, option):return ""else:return parser.get(section, option)def _get_ssl_options(self):"""配置SSL安全连接选项"""# 创建一个 SSL/TLS 安全通信的上下文对象# 生产环境建议指定协议版本,避免使用不安全的默认值context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)# 在 SSL/TLS 通信中承担核心安全验证功能context.load_verify_locations(self.SSL_CA_PATH)# 创建 RabbitMQ 的 SSL 连接配置对象return pika.SSLOptions(context,        # ssl.SSLContext 对象,包含证书、协议版本等SSL配置# TODO 可以根据具体证书,设置该选项"localhost"     # 服务器主机名,用于证书验证(需匹配证书CN或SAN字段),主机名验证:强制检查证书中的 CN(Common Name) 或 SAN(Subject Alternative Name) 是否匹配"localhost")def _setup_connection(self):"""建立RabbitMQ连接并设置队列"""# 创建 RabbitMQ 连接所需的用户名/密码认证对,用于后续建立连接时的身份验证credentials = pika.PlainCredentials(self.RABBITMQ_USER,         # RabbitMQ 服务认证用户名(字符串类型)self.RABBITMQ_UNLOCK_CODE   # RabbitMQ 服务认证密码(字符串类型))# RabbitMQ 连接参数设置connection_params = pika.ConnectionParameters(# RabbitMQ服务器IP地址(字符串类型)# - 若未配置则自动读取配置文件中的虚拟IP# - 示例:'192.168.120.19' 或 'rabbitmq.example.com'host=self.RABBITMQ_HOST,# RabbitMQ服务端口(整数类型)# - 默认使用加密端口5671# - 非加密连接通常用5672port=self.RABBITMQ_PORT,# 虚拟主机名称(字符串类型)# - 默认'/'表示根虚拟主机# - 用于多租户隔离场景virtual_host=self.RABBITMQ_VIRTUAL_HOST,# 认证凭证对象(pika.PlainCredentials)# - 包含用户名/密码信息# - 必须与RabbitMQ配置的用户权限匹配credentials=credentials,# SSL配置对象(pika.SSLOptions)# - 包含CA证书和主机名验证配置# - 为空时建立非加密连接ssl_options=self._get_ssl_options(),# 心跳检测间隔(秒,整数类型),建议设置为300-1200秒(防止NAT超时断开)# - 600秒=10分钟检测一次连接活性# - 0表示禁用心跳(不推荐)heartbeat=600   # 需 ≥ 客户端配置)# 建立与 RabbitMQ 服务器的同步阻塞式连接# 阻塞式:所有操作(如发送/接收消息)会阻塞当前线程直到完成self.connection = pika.BlockingConnection(connection_params)# 在连接上创建 AMQP 信道self.channel = self.connection.channel()# 声明节点专用队列(绑定到该信道)# 如果队列不存在则自动创建,如果存在则直接绑定self.channel.queue_declare(queue=self.NODE_NAME,   # 队列名称(使用节点名作为唯一标识)durable=True            # 队列持久化标志,需要配合消息的delivery_mode=2使用才能完全持久化)# 控制并发数,值越大并发越高,但资源消耗和复杂度也增加# TODO 集群需要调研配置并发度self.channel.basic_qos(prefetch_count=1)# 将当前信道绑定到指定队列,开始监听消息,消费消息(通过该信道)self.channel.basic_consume(queue=self.NODE_NAME,   # 指定消费的队列名称(当前节点专属队列)on_message_callback=self._execute_command,  # 消息处理回调函数auto_ack=False          # 手动消息确认模式)def _execute_command(self, ch, method, props, body):"""执行接收到的命令并返回结果"""try:# 解析消息内容(JSON格式)message = json.loads(body.decode('utf-8'))command = message.get('command', '')    # 获取要执行的命令target = message.get('target', '')      # 获取目标节点标识logger.info(f" [x] 收到({target})命令:{command}")# 校验目标节点(防止误处理其他节点的消息)if target != self.NODE_NAME:logger.warning(f" [x] 收到非本节点({self.NODE_NAME})命令,已忽略")ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息(防止重新投递)returnlogger.info(f" [*] 执行命令 【{command}】...")try:# 通过子进程执行命令(同步阻塞)output = subprocess.check_output(command,shell=True,     # 启用shell解析stderr=subprocess.STDOUT,   # 捕获标准错误timeout=60      # 超时时间(秒))response = output.decode('utf-8')except subprocess.TimeoutExpired:# 超时异常response = "Error: Command timed out"except subprocess.CalledProcessError as e:# 命令执行失败response = f"Error: {e.output.decode('utf-8')}"except Exception as e:# 其他系统异常response = f"System Error: {str(e)}"# 返回执行结果ch.basic_publish(exchange='',    # 默认交换器routing_key=props.reply_to, # 回复到客户端指定的回调队列properties=pika.BasicProperties(correlation_id=props.correlation_id,    # 关联ID(匹配请求-响应)delivery_mode=2 # 持久化消息到磁盘,会降低性能(服务重启不丢失)),body=response.encode('utf-8')   # 响应内容)logger.info(f" [*] 命令执行完成")# 确认消息处理完成ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:# 全局异常处理logger.exception(f" [x] 消息处理异常: {str(e)}")# 否定确认(可能重新投递)ch.basic_nack(delivery_tag=method.delivery_tag)def start(self, max_retries=5, retry_delay=10):"""启动RabbitMQ服务并持续监听消息功能:管理服务生命周期,处理连接异常和重试逻辑:param max_retries: 最大重试次数,默认5次:param retry_delay: 重试间隔时间(秒),默认10秒:return:"""# 当前重试次数计数器retry_count = 0# 主服务循环(持续运行直到主动终止)while True:try:# 打印服务状态信息logger.info(f" [*] {self.NODE_NAME} 节点服务启动 (尝试 {retry_count + 1}/{max_retries})")logger.info(f" [*] 等待队列 {self.NODE_NAME} 中的请求...")# 检查并重建连接(如果不存在或已关闭)if not hasattr(self, 'connection') or self.connection.is_closed:self._setup_connection()    # 初始化AMQP连接# 开始消费消息(阻塞调用)self.channel.start_consuming()except pika.exceptions.AMQPConnectionError as e:# RabbitMQ连接异常处理retry_count += 1logger.exception(f"连接失败: {str(e)}")# 超过最大重试次数则终止服务if retry_count >= max_retries:logger.error(" [x] 达到最大重试次数,终止服务")self.close()break   # 退出循环# 未达上限则延迟重试logger.warning(f" [*] {retry_delay}秒后尝试重新连接...")time.sleep(retry_delay)except KeyboardInterrupt:# 处理用户主动终止(Ctrl + C)logger.error("\n [x] 接收到终止信号")self.close()logger.error(" [x] 服务已停止")break   # 退出循环except Exception as e:# 其他未捕获异常处理logger.exception(f"服务异常: {str(e)}")time.sleep(retry_delay) # 防止异常时CPU空转def close(self):"""安全关闭RabbitMQ连接功能:清理资源,确保连接被正确关闭:return:"""# 防御式编程:检查连接存在且未关闭if hasattr(self, 'connection') and not self.connection.is_closed:self.connection.close() # 关闭AMQP连接logger.info(" [x] 连接已安全关闭")if __name__ == '__main__':# 服务启动入口(自动获取主机名作为节点名)server = RabbitMQServer()try:server.start()except KeyboardInterrupt:logger.error("\n [x] 接收到终止信号")server.close()logger.error(" [x] 服务已停止")

2.客户端

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@File:      rabbitmq_client.py
@Date:      2025/6/26 10:43
@Author:    xxx
@Description:
1. RabbitMQ客户端类,支持向指定节点发送SSH命令
2. 作为控制端运行,可定向发送命令并接收执行结果
"""import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser# 定义日志模块
logger = logging.getLogger()
# 设置全局日志级别设为最低(DEBUG)
# # 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
logger.setLevel(logging.DEBUG)# 1. 文件日志(仅输出到文件)
file_handler = logging.FileHandler('rabbitmq_client.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)  # 文件记录DEBUG及以上级别
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)# 2. 控制台日志(仅输出到控制台)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)  # 控制台仅显示INFO及以上
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"class RabbitMQClient:"""RabbitMQ RPC客户端类功能:向指定节点发送命令并获取执行结果"""def __init__(self, mq_user="rabbitmq", mq_password="rabbitmq@123", mq_virtual_host="/",mq_host=None, mq_port=5671, mq_ca="/opt/ssl/ca_certificate.pem"):"""初始化RabbitMQ客户端:param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径"""# 连接配置self.RABBITMQ_USER = mq_userself.RABBITMQ_UNLOCK_CODE = mq_passwordself.RABBITMQ_VIRTUAL_HOST = mq_virtual_host# 如果没有设置RabbitMQ服务器IP,则连接到配置文件中设置的IP节点self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")self.RABBITMQ_PORT = mq_portself.SSL_CA_PATH = mq_ca# 响应相关变量self.response = Noneself.corr_id = None# 建立连接logger.info(" [x] 正在建立连接 ...")self._connect()logger.info(" [x] 连接建立成功")def get_option(self, file_path, section, option):"""获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据"""parser = configparser.ConfigParser()parser.read(file_path)if not parser.has_option(section, option):return ""else:return parser.get(section, option)def _connect(self):"""建立RabbitMQ连接并初始化回调队列功能:配置安全连接参数、创建通信信道、设置消息回调处理:return:"""# 创建SSL安全上下文,强制使用TLS 1.2协议(禁用不安全的老版本协议)ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)# 加载CA根证书用于验证服务端证书(防止中间人攻击)ssl_context.load_verify_locations(self.SSL_CA_PATH)# 将SSL配置封装为RabbitMQ专用的SSLOptions对象# "localhost"参数要求服务端证书必须包含该主机名(CN或SAN字段)ssl_options = pika.SSLOptions(ssl_context, "localhost")# 创建认证凭证对象(明文用户名/密码,实际传输时会通过SSL加密)credentials = pika.PlainCredentials(self.RABBITMQ_USER,         # RabbitMQ服务用户名self.RABBITMQ_UNLOCK_CODE   # RabbitMQ服务密码)# 配置连接参数connection_params = pika.ConnectionParameters(host=self.RABBITMQ_HOST,    # RabbitMQ服务器地址port=self.RABBITMQ_PORT,    # 默认使用5671加密端口virtual_host=self.RABBITMQ_VIRTUAL_HOST,    # 虚拟主机隔离环境credentials=credentials,    # 用户名密码凭证ssl_options=ssl_options,    # SSL安全配置heartbeat=60                # 1分钟心跳检测(防连接中断),更频繁的心跳检测(如AWS ELB默认60秒空闲超时))# 建立阻塞式连接(同步操作,会阻塞直到连接成功或抛出异常)self.connection = pika.BlockingConnection(connection_params)# 创建AMQP信道(单个连接可创建多个信道,减少TCP连接开销)self.channel = self.connection.channel()# 声明临时回调队列(exclusive=True表示连接关闭时自动删除队列)result = self.channel.queue_declare(queue='',       # 空队列名表示由RabbitMQ自动生成exclusive=True  # 独占队列,仅当前连接可用)# 保存自动生成的队列名称(用于接收服务端响应)self.callback_queue = result.method.queue# 绑定消息消费回调self.channel.basic_consume(queue=self.callback_queue,  # 监听回调队列on_message_callback=self._on_response,  # 响应消息处理函数# TODO 生产环境建议:auto_ack=False  改为手动ACK确保消息可靠处理auto_ack=False   # auto_ack=True 自动确认消息(不推荐生产环境使用))def _on_response(self, ch, method, props, body):"""RPC模式下的响应消息回调处理函数功能:匹配并接收服务端返回的命令执行结果处理逻辑:1.通过correlation_id匹配对应的请求2.将二进制消息体解码为字符串3.存储结果供execute_command方法获取:param ch: (pika.channel.Channel): 接收到消息的信道对象:param method: (pika.spec.Basic.Deliver): 包含投递信息(如delivery_tag):param props: (pika.spec.BasicProperties): 消息属性(含correlation_id等):param body: (bytes): 消息体内容(服务端返回的执行结果):return:"""# 校验消息关联ID(确保是本请求的响应)try:if self.corr_id == props.correlation_id:# 解码服务端返回的消息内容(UTF-8编码)self.response = body.decode('utf-8')# 注意:此处不需要手动ack,因为消息已在服务端处理时ackexcept UnicodeDecodeError as e:self.response = f"解码失败: {str(e)}"def execute_command(self, command, target_node=None, timeout=60):"""向指定RabbitMQ节点发送命令并获取执行结果(RPC模式):param command (str): 要执行的shell命令字符串(如"ls -l"):param target_node (str): 目标节点标识,对应服务端的队列名- 默认None表示发送到当前主机节点:param timeout (int): 等待响应的超时时间(秒),默认60秒:return str: 命令执行结果文本异常:TimeoutError: 超过指定时间未收到响应时抛出AMQP相关异常: 消息发送失败时抛出向指定节点执行远程命令"""# 初始化响应存储和请求IDself.response = None    # 清空之前的响应self.corr_id = str(uuid.uuid4())    # 生成唯一请求标识# 确定目标节点(如果执行命令没有指定节点,默认发送到当前主机)if not target_node:target_node = socket.gethostname()# 构建RPC请求消息体(JSON格式)(包含命令和目标节点信息)message = {"command": command,         # 要执行的命令"target": target_node,      # 目标节点标识"timestamp": time.time()    # 请求时间戳}# 发送消息到目标节点的专属队列self.channel.basic_publish(exchange='',    # 使用默认直连交换机routing_key=target_node,  # 通过队列名路由到指定节点properties=pika.BasicProperties(reply_to=self.callback_queue,   # 设置回调队列名correlation_id=self.corr_id,    # 标记请求ID),# JSON序列化消息body=json.dumps(message).encode('utf-8'))# 等待响应(带超时机制)start_time = time.time()while self.response is None:# 处理网络事件(非阻塞)self.connection.process_data_events()# 超时检查if time.time() - start_time > timeout:raise TimeoutError(f"等待节点 {target_node} 响应超时")# 避免CPU空转,此处sleep不能太长,影响命令返回时效性time.sleep(0.1)return self.response    # 返回执行结果def close(self):"""安全关闭RabbitMQ连接功能:1. 清理网络连接资源2. 自动删除临时队列(exclusive队列)3. 防止资源泄漏:return:"""if self.connection and not self.connection.is_closed:# 关闭连接(会触发信道关闭)self.connection.close()logger.warning(" [x] 连接已关闭")if __name__ == '__main__':# 使用示例client = RabbitMQClient()try:# 向不同节点发送命令nodes = ["node247", "node248", "node249"]for node in nodes:try:logger.info(f"\n向节点 {node} 执行命令: hostname")logger.info(client.execute_command(command="hostname", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")try:logger.info(f"\n向节点 {node} 执行命令: ls -l /opt/")logger.info(client.execute_command(command="ls -l /opt/", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")try:logger.info(f"\n向节点 {node} 执行命令: date")logger.info(client.execute_command(command="date", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")finally:client.close()

3.调用结果

# 服务端,节点启动
192.168.120.17 node17
192.168.120.18 node18
192.168.120.19 node19python3 rabbitmq_server.py# 客户端,节点启动
192.168.120.17 node17python3 rabbitmq_client.py

相关文章:

  • 基于[coze][dify]搭建一个智能体工作流,抓取热门视频数据,自动存入在线表格
  • 【C++特殊工具与技术】固有的不可移植的特性(3)::extern“C“
  • 《C++初阶之类和对象》【友元 + 内部类 + 匿名对象】
  • 【.net core】【sqlsugar】在where条件查询时使用原生SQL
  • 清理华为云服务器内存使用率
  • 2025-6-27-C++ 学习 模拟与高精度(7)
  • unityButton问题
  • Gitee 持续集成与交付(CI/CD)篇
  • 【VPX3U】国产嵌入式平台:RK3588J×JH930硬件架构与红外应用方案
  • 推荐一个基于C#开发的跨平台构建自动化系统!
  • 【项目开发】删除表中所有含重复字段的数据
  • JetBrains AI助手登陆Android Studio!智能编码提升Kotlin开发效能
  • 无法访问 文件或目录损坏且无法读取
  • SPL 报表开发:不依赖逻辑数仓的轻量级多数据源报表
  • Linux命令行操作基础
  • 封装nuxt3的SSR请求和CSR请求方法
  • Linux基本指令篇 —— tac指令
  • GraphQL API-1
  • AIOps与人工智能的融合:从智能运维到自适应IT生态的革命
  • Java项目:基于SSM框架实现的宠物综合服务平台管理系统【ssm+B/S架构+源码+数据库+毕业论文+开题报告】