深入解析 ZeroMQ 请求-应答模式:服务端实现与全链路测试指南 || 测试ZMQ重连重试机制
一、需求分析
由于项目需要,先需要在按键按下时通过ZMQ发出按键请求,在收到ZMQ-REP时,表明按键有效按下,若超过3s仍未收到回复,则执行重发机制,超过3次则认为故障,进行日志记录。现需要测试该功能是否正常,需要通过Python模拟实现服务端来进行测试。
二、代码分析
# ================== 服务端代码 (rep_server.py) ==================
import zmq # ZeroMQ 核心库 - 实现高效网络通信
import json # JSON 处理库 - 用于数据序列化/反序列化def run_server():# 创建 ZeroMQ 上下文 (所有 socket 的容器)context = zmq.Context()# 创建 REP (Reply) 类型 socket - 遵循请求-应答模式socket = context.socket(zmq.REP) # 连接到客户端指定的地址 (实际生产中通常用 bind(),此处用 connect() 适用于指定设备场景)socket.connect("tcp://192.168.8.136:5592")# ✅ 最佳实践:生产环境建议服务端使用 bind(),客户端使用 connect()print("服务端已启动,等待客户端连接...")# 主服务循环 (持续处理请求)while True:try:# 🛑 阻塞接收客户端请求 (recv_json 自动进行 JSON 解析)message = socket.recv_json() print(f"收到请求: {message}")# 🔍 数据验证 (确保消息结构符合预期)if validate_message(message):# 🔁 原样返回数据 (演示基本应答逻辑)response = message socket.send_json(response) print("已发送应答数据")else:# ⚠️ 格式错误处理error_msg = {"error": "Invalid message format"}socket.send_json(error_msg) except json.JSONDecodeError:# ❌ JSON 解析异常处理error_msg = {"error": "Invalid JSON format"}socket.send_json(error_msg) except Exception as e:# 🚨 全局异常捕获error_msg = {"error": str(e)}socket.send_json(error_msg) # 🔐 消息验证函数 (确保数据结构完整性)
def validate_message(msg):return isinstance(msg, dict) and \ # 必须是字典类型"id" in msg and \ # 必须包含消息ID"data" in msg and \ # 必须包含数据段isinstance(msg["data"], dict) and \ # 数据段必须是字典"buttonIndex" in msg["data"] and \ # 包含按钮索引字段"value" in msg["data"] # 包含数值字段if __name__ == "__main__":run_server() # 🚀 启动服务端
三、实验结果及分析
服务端:程序运行时收到请求控制台打印回复发送请求
客户端:发送请求及未收到回复时重发机制及其记录