用Python和Websockets库构建一个高性能、低延迟的实时消息推送服务
基础的服务端代码:
import asyncio
import websockets
import json
from datetime import datetimeclass MessageServer:def __init__(self):self.clients = set() # 存储所有连接的客户端async def register(self, websocket):"""注册新客户端"""self.clients.add(websocket)print(f"新客户端连接 当前连接数: {len(self.clients)}")async def unregister(self, websocket):"""注销客户端"""self.clients.remove(websocket)print(f"客户端断开 当前连接数: {len(self.clients)}")async def broadcast(self, message, sender_ws=None):"""广播消息给所有客户端"""if self.clients:# 过滤掉发送者自己 避免回显targets = {client for client in self.clients if client != sender_ws}if targets:await asyncio.gather(*[client.send(json.dumps(message)) for client in targets],return_exceptions=True)async def handle_client(self, websocket, path):"""处理客户端连接"""await self.register(websocket)try:async for message in websocket:data = json.loads(message)# 添加时间戳data['timestamp'] = datetime.now().isoformat()print(f"收到消息: {data}")# 广播给其他客户端await self.broadcast(data, websocket)except websockets.exceptions.ConnectionClosed:passfinally:await self.unregister(websocket)# 启动服务器
server = MessageServer()
start_server = websockets.serve(server.handle_client, "localhost", 8765)print("WebSocket服务器启动在 ws://localhost:8765")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()客户端代码
import asyncio
import websockets
import json
import threadingclass MessageClient:def __init__(self, uri):self.uri = uriself.websocket = Noneasync def connect(self):"""连接服务器"""self.websocket = await websockets.connect(self.uri)print("连接成功!")async def send_message(self, message):"""发送消息"""if self.websocket:data = {'type': 'message','content': message,'user': 'user_123' # 实际项目中应该是真实用户ID}await self.websocket.send(json.dumps(data))async def listen(self):"""监听服务器消息"""try:async for message in self.websocket:data = json.loads(message)print(f"[{data.get('timestamp', '')}] {data.get('user', '未知')}: {data.get('content', '')}")except websockets.exceptions.ConnectionClosed:print("连接已断开")def start_input_loop(self):"""处理用户输入"""loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)async def input_handler():while True:message = input()if message.lower() == 'quit':breakawait self.send_message(message)loop.run_until_complete(input_handler())# 使用示例
async def main():client = MessageClient("ws://localhost:8765")await client.connect()# 启动输入线程input_thread = threading.Thread(target=client.start_input_loop)input_thread.daemon = Trueinput_thread.start()# 监听消息await client.listen()if __name__ == "__main__":asyncio.run(main())性能优化首先是连接池管理。当连接数达到几千甚至上万时 内存占用会很大,解决办法是设置最大连接数限制:
async def handle_client(self, websocket, path):if len(self.clients) >= MAX_CONNECTIONS:await websocket.close(code=1013, reason="服务器连接数已满")return# 其他处理逻辑...高并发时如果直接广播 会造成阻塞。我现在都用Redis做消息队列了。
import redis
import asyncioclass RedisMessageServer(MessageServer):def __init__(self):super().__init__()self.redis_client = redis.Redis(host='localhost', port=6379, db=0)self.pubsub = self.redis_client.pubsub()async def start_redis_listener(self):"""监听Redis消息"""self.pubsub.subscribe('chat_messages')for message in self.pubsub.listen():if message['type'] == 'message':data = json.loads(message['data'])await self.broadcast(data)部署的话我推荐用nginx做反向代理
upstream websocket {server 127.0.0.1:8765;
}server {listen 80;location /ws {proxy_pass http://websocket;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_set_header Host $host;proxy_read_timeout 86400;}
}proxy_read_timeout要设置长一点。不然连接会被nginx强制断开。
生产环境一定要加心跳检测。我用的是每30秒发一次ping:
async def heartbeat(self, websocket):try:while True:await websocket.ping()await asyncio.sleep(30)except:await self.unregister(websocket)