websocket python 实现
目录
安装:
websocket python server
客户端:
安装:
pip install websocket-clientpip install websocket_server
websocket python server
# filename: global_websocket_server.py
import json
import threading
import time
import queue
from typing import Callable, Optional, Tuple, Any, Dictfrom websocket_server import WebsocketServer # pip install websocket-serverclass GlobalWebSocketServer:_instance = None_lock = threading.Lock()def __new__(cls, host: str = "0.0.0.0", port: int = 8765):with cls._lock:if cls._instance is None:cls._instance = super().__new__(cls)cls._instance._initialized = Falsereturn cls._instancedef __init__(self, host: str = "0.0.0.0", port: int = 8765):if getattr(self, "_initialized", False):returnself.host = hostself.port = portself.on_message: Optional[Callable[[dict, str], None]] = Noneself.server: Optional[WebsocketServer] = Noneself.server_thread: Optional[threading.Thread] = None# clients: dict(client_id -> client_info)self.clients: Dict[int, dict] = {}self.clients_lock = threading.Lock()# 发送队列,元素为 (target_client_id_or_None, message_str)self.send_queue: "queue.Queue[Tuple[Optional[int], str]]" = queue.Queue()self.sender_thread: Optional[threading.Thread] = Noneself.running = Falseself._initialized = Trueself.last_time = time.time()self.last_msg="None"print(f"[GlobalWebSocketServer] Initialized with {host}:{port}")# ---------- 回调设置 ----------def set_message_callback(self, callback: Callable[[dict, str], None]):"""callback(payload_dict_or_raw, client_id_str)如果消息是 JSON,会传 dict;否则传原始字符串。"""self.on_message = callbackprint("[GlobalWebSocketServer] Message callback set")# ---------- 启动 / 停止 ----------def start(self) -> bool:if self.running:print("[GlobalWebSocketServer] Server already running")return Truetry:# 创建 WebsocketServer(threaded model)self.server = WebsocketServer(port=self.port, host=self.host, loglevel=0)# 事件回调绑定self.server.set_fn_new_client(self._on_new_client)self.server.set_fn_client_left(self._on_client_left)self.server.set_fn_message_received(self._on_message_received)# 启动服务器线程(blocking run_forever)self.server_thread = threading.Thread(target=self._run_server, daemon=True, name="WS_Server_Thread")self.running = Trueself.server_thread.start()# 启动发送线程self.sender_thread = threading.Thread(target=self._send_loop, daemon=True, name="WS_Send_Thread")self.sender_thread.start()print(f"[GlobalWebSocketServer] Listening on ws://{self.host}:{self.port}")return Trueexcept Exception as e:print(f"[GlobalWebSocketServer] Failed to start: {e}")self.running = Falsereturn Falsedef _run_server(self):try:# start the websocket server (blocking call)if self.server:self.server.run_forever()except Exception as e:print(f"[GlobalWebSocketServer] Server run exception: {e}")finally:self.running = Falsedef stop(self):"""优雅停止服务器"""self.running = Falsetry:# 先停止 websocket serverif self.server:try:self.server.shutdown_gracefully()except Exception:# 旧版本可能没有 shutdown_gracefully,试 try stop in other waytry:self.server.server_close()except Exception:passself.server = None# 通知发送线程退出try:self.send_queue.put_nowait((None, None)) # 退出信号except Exception:pass# 关闭已知客户端结构with self.clients_lock:self.clients.clear()print("[GlobalWebSocketServer] Stopped")except Exception as e:print(f"[GlobalWebSocketServer] Stop error: {e}")def restart(self, host: Optional[str] = None, port: Optional[int] = None) -> bool:self.stop()if host:self.host = hostif port:self.port = port# 短暂延迟以释放端口time.sleep(0.1)return self.start()# ---------- 客户端事件 ----------def _on_new_client(self, client, server):# client 中常见字段: id, handler, addressclient_id = client['id']addr = client.get('address')with self.clients_lock:self.clients[client_id] = clientprint(f"[GlobalWebSocketServer] Client connected: id={client_id}, addr={addr}")def _on_client_left(self, client, server):client_id = client['id']with self.clients_lock:if client_id in self.clients:del self.clients[client_id]print(f"[GlobalWebSocketServer] Client disconnected: id={client_id}")def _on_message_received(self, client, server, message):client_id = client['id']addr = client.get('address')print(f"[GlobalWebSocketServer] Received from {addr} (id={client_id}): {message}")# 解析 JSON(若不能解析则传回原始字符串)payload: Any = messagetry:payload = json.loads(message)except Exception:pass# 调用回调(如果有)if self.on_message:try:self.on_message(payload, str(client_id))except Exception as e:print(f"[GlobalWebSocketServer] Callback error: {e}")# ---------- 发送队列与发送线程 ----------def _send_loop(self):"""发送线程:从 send_queue 中按序取出消息并发送队列项 (target_client_id_or_None, message_str)- target_client_id_or_None: None -> 广播,或 int -> 指定客户端 id"""while True:try:item = self.send_queue.get()if not self.running and item == (None, None):breakif item is None:continuetarget_id, msg = itemif msg is None and target_id is None:# 退出信号breakself._do_send(target_id, msg)time.sleep(1)except Exception as e:print(f"[GlobalWebSocketServer] Send loop error: {e}")finally:# 小 sleep 防止 busy loop(当大量消息时可去掉)time.sleep(0.001)def _do_send(self, target_id: Optional[int], msg: str) -> bool:"""底层发送方法,由发送线程调用"""if not self.server:print("[GlobalWebSocketServer] No server running, drop message")return False# 保证以换行结束(可选)if not msg.endswith("\n"):msg = msg + "\n"try:if target_id is None:# 广播with self.clients_lock:for cid, client in list(self.clients.items()):try:self.server.send_message(client, msg)except Exception as e:print(f"[GlobalWebSocketServer] Broadcast send error to {cid}: {e}")print(f"WebSocket Broadcasted: {msg.strip()[:16]}")else:# 指定客户端发送with self.clients_lock:client = self.clients.get(int(target_id))if client:try:self.server.send_message(client, msg)print(f"[GlobalWebSocketServer] Sent to {target_id}: {msg.strip()}")return Trueexcept Exception as e:print(f"[GlobalWebSocketServer] Send to {target_id} error: {e}")return Falseelse:print(f"[GlobalWebSocketServer] Target client {target_id} not connected")return Falseexcept Exception as e:print(f"[GlobalWebSocketServer] _do_send exception: {e}")return False# ---------- 对外接口:入队发送 ----------def enqueue_send(self, msg: str, target_client_id: Optional[int] = None):"""将文本消息放入发送队列。target_client_id: None -> 广播;否则传入客户端 id(int 或 str 可)"""try:if target_client_id is not None:target_client_id = int(target_client_id)self.send_queue.put((target_client_id, msg))except Exception as e:print(f"[GlobalWebSocketServer] enqueue_send error: {e}")def enqueue_send_json(self, data: dict, target_client_id: Optional[int] = None):if data['guideId']== self.last_msg:returnself.last_msg=data['guideId']if time.time()-self.last_time>0.1:try:msg = json.dumps(data, ensure_ascii=False)self.enqueue_send(msg, target_client_id)except Exception as e:print(f"[GlobalWebSocketServer] enqueue_send_json error: {e}")# ---------- 辅助方法 ----------def is_connected(self) -> bool:with self.clients_lock:return len(self.clients) > 0def get_client_ids(self):with self.clients_lock:return list(self.clients.keys())# 单例获取
def get_global_websocket_server(host: str = "0.0.0.0", port: int = 5000) -> GlobalWebSocketServer:return GlobalWebSocketServer(host, port)server_g = get_global_websocket_server(host="0.0.0.0", port=5000)
if server_g.start():print("WebSocket server started.")
# ===== 使用示例 =====
if __name__ == "__main__":def on_msg(payload, client_id_str):print(f"[CALLBACK] From {client_id_str}: {payload}")# 如果收到 ping,则回 pongtry:if isinstance(payload, dict) and payload.get("type") == "ping":server_g.enqueue_send_json({"type": "pong", "ts": time.time()}, target_client_id=int(client_id_str))except Exception as e:print("Callback handling error:", e)server_g.set_message_callback(on_msg)try:while True:if server_g.is_connected():result = {"type": "danger","message": "huichui_err","code": "1001","title": "huichui","description": "挥锤动作不对,请调整姿势","videoUrl": "","content": "手腕应该保持稳定,肘部发力","guideId": "huichui","timestamp": time.time()}server_g.enqueue_send_json(result)else:print("No clients connected.")time.sleep(3)except KeyboardInterrupt:print("Shutting down.")finally:server_g.stop()
客户端:
import json
import threading
import time
import websocketclass ReconnectingWebSocketClient:def __init__(self, url="ws://127.0.0.1:5000", heartbeat_interval=10, reconnect_interval=5):self.url = urlself.heartbeat_interval = heartbeat_intervalself.reconnect_interval = reconnect_intervalself.ws = Noneself.running = Falseself.last_pong_time = time.time()def on_open(self, ws):print("[Client] Connected to server")self.last_pong_time = time.time()def on_message(self, ws, message):print("[Client] Received:", message)try:payload = json.loads(message)if isinstance(payload, dict) and payload.get("type") == "pong":self.last_pong_time = time.time()except Exception:passdef on_close(self, ws, close_status_code, close_msg):print("[Client] Connection closed:", close_status_code, close_msg)def on_error(self, ws, error):print("[Client] Error:", error)def _send_heartbeat(self):"""后台线程:定期发送心跳并检测超时"""while self.running:try:if self.ws and self.ws.sock and self.ws.sock.connected:# 发送 pingping_msg = {"type": "ping", "ts": time.time()}self.ws.send(json.dumps(ping_msg))print("[Client] Sent heartbeat")# 检查 pong 超时if time.time() - self.last_pong_time > self.heartbeat_interval * 2:print("[Client] Heartbeat timeout, closing connection")self.ws.close()else:print("[Client] Not connected, skip heartbeat")except Exception as e:print("[Client] Heartbeat error:", e)time.sleep(self.heartbeat_interval)def connect(self):"""建立连接并启动心跳线程"""self.running = Truewhile self.running:try:self.ws = websocket.WebSocketApp(self.url,on_open=self.on_open,on_message=self.on_message,on_close=self.on_close,on_error=self.on_error,)# 启动 WebSocket 客户端线程wst = threading.Thread(target=self.ws.run_forever, daemon=True)wst.start()# 启动心跳线程heartbeat_thread = threading.Thread(target=self._send_heartbeat, daemon=True)heartbeat_thread.start()# 等待线程结束(直到断线)wst.join()except Exception as e:print("[Client] Connect exception:", e)if self.running:print(f"[Client] Reconnecting in {self.reconnect_interval} sec...")time.sleep(self.reconnect_interval)def stop(self):self.running = Falseif self.ws:try:self.ws.close()except:passif __name__ == "__main__":client = ReconnectingWebSocketClient("ws://127.0.0.1:5000")try:client.connect()except KeyboardInterrupt:print("Stopping client")client.stop()