当前位置: 首页 > news >正文

用Python和Websockets库构建一个高性能、低延迟的实时消息推送服务

基础的服务端代码:

import asyncio
import websockets
import json
from datetime import datetimeclass MessageServer:def __init__(self):self.clients = set()  # 存储所有连接的客户端async def register(self, websocket):"""注册新客户端"""self.clients.add(websocket)print(f"新客户端连接 当前连接数: {len(self.clients)}")async def unregister(self, websocket):"""注销客户端"""self.clients.remove(websocket)print(f"客户端断开 当前连接数: {len(self.clients)}")async def broadcast(self, message, sender_ws=None):"""广播消息给所有客户端"""if self.clients:# 过滤掉发送者自己 避免回显targets = {client for client in self.clients if client != sender_ws}if targets:await asyncio.gather(*[client.send(json.dumps(message)) for client in targets],return_exceptions=True)async def handle_client(self, websocket, path):"""处理客户端连接"""await self.register(websocket)try:async for message in websocket:data = json.loads(message)# 添加时间戳data['timestamp'] = datetime.now().isoformat()print(f"收到消息: {data}")# 广播给其他客户端await self.broadcast(data, websocket)except websockets.exceptions.ConnectionClosed:passfinally:await self.unregister(websocket)# 启动服务器
server = MessageServer()
start_server = websockets.serve(server.handle_client, "localhost", 8765)print("WebSocket服务器启动在 ws://localhost:8765")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

客户端代码

import asyncio
import websockets
import json
import threadingclass MessageClient:def __init__(self, uri):self.uri = uriself.websocket = Noneasync def connect(self):"""连接服务器"""self.websocket = await websockets.connect(self.uri)print("连接成功!")async def send_message(self, message):"""发送消息"""if self.websocket:data = {'type': 'message','content': message,'user': 'user_123'  # 实际项目中应该是真实用户ID}await self.websocket.send(json.dumps(data))async def listen(self):"""监听服务器消息"""try:async for message in self.websocket:data = json.loads(message)print(f"[{data.get('timestamp', '')}] {data.get('user', '未知')}: {data.get('content', '')}")except websockets.exceptions.ConnectionClosed:print("连接已断开")def start_input_loop(self):"""处理用户输入"""loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)async def input_handler():while True:message = input()if message.lower() == 'quit':breakawait self.send_message(message)loop.run_until_complete(input_handler())# 使用示例
async def main():client = MessageClient("ws://localhost:8765")await client.connect()# 启动输入线程input_thread = threading.Thread(target=client.start_input_loop)input_thread.daemon = Trueinput_thread.start()# 监听消息await client.listen()if __name__ == "__main__":asyncio.run(main())

性能优化首先是连接池管理。当连接数达到几千甚至上万时 内存占用会很大,解决办法是设置最大连接数限制:

async def handle_client(self, websocket, path):if len(self.clients) >= MAX_CONNECTIONS:await websocket.close(code=1013, reason="服务器连接数已满")return# 其他处理逻辑...

高并发时如果直接广播 会造成阻塞。我现在都用Redis做消息队列了。

import redis
import asyncioclass RedisMessageServer(MessageServer):def __init__(self):super().__init__()self.redis_client = redis.Redis(host='localhost', port=6379, db=0)self.pubsub = self.redis_client.pubsub()async def start_redis_listener(self):"""监听Redis消息"""self.pubsub.subscribe('chat_messages')for message in self.pubsub.listen():if message['type'] == 'message':data = json.loads(message['data'])await self.broadcast(data)

部署的话我推荐用nginx做反向代理

upstream websocket {server 127.0.0.1:8765;
}server {listen 80;location /ws {proxy_pass http://websocket;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_set_header Host $host;proxy_read_timeout 86400;}
}

 proxy_read_timeout要设置长一点。不然连接会被nginx强制断开。

生产环境一定要加心跳检测。我用的是每30秒发一次ping:

async def heartbeat(self, websocket):try:while True:await websocket.ping()await asyncio.sleep(30)except:await self.unregister(websocket)

http://www.dtcms.com/a/602136.html

相关文章:

  • 海尔网站建设水平河北廊坊seo网站建设网站优化
  • 小型深圳网站定制开发最专业的网站建设
  • 中山网站优化排名徐州祥云做网站
  • 8、hall速度控制——速度电流双闭环控制(一)
  • 网页版C语言编译器 | 在线体验C语言编程,快速编译与调试
  • 网站如何调用微博网站集群建设是
  • 「单题起答」功能解锁丨考试升级
  • Effective Python 第50条:用__set_name__给类属性加注解
  • 泉州市住房与城乡建设网站常用的网站有哪些
  • wordpress站点设置使用期限武夷山网站制作
  • python 迭代器和生成器
  • 编译型语言的两大步骤 | 深入理解编译过程与优化技术
  • (三)分支与合并 - git rebase 命令的使用
  • K8S上高可用SeaTunnel 集群部署
  • wdcp 默认网站中学网站建设方案 分校区
  • 网站营销站点有你想网页设计师个人简历参考范文
  • Windows 使用 docker 搭建 gitea
  • 多维决策系统的工程化实践:从评估框架到智能筛选引擎
  • 二十八、STM32的USART (介绍)
  • 双滦区seo整站排名seo在网站制作
  • 关于网站维护的书籍建设网站的技术性背景
  • 现代CPU性能分析与优化
  • 悬浮提词器免费版哪个好用?功能测评与实用应用场景
  • 使用OpenGL实现Gouraud材质
  • 2025年数据中心不间断电源(UPS)市场报告:趋势、挑战与投资战略全景分析
  • 网站建设开标书哪家公司
  • 【前端面试】CSS篇
  • 第六章,主从服务器
  • 16.udp_socket(三)
  • 【Git Merge branch】Git 合并提交(Merge Commit)的成因与清理:从本地修复到安全重写历史