Websocket的基本使用
1. WebSocket
WebSocket 是一种在单个TCP连接上进行全双工通信的协议,它在现代 Web 开发和网络应用中发挥着重要作用。在 WebSocket 出现之前,实现服务器与客户端实时通信主要采用轮询Polling和长轮询Long - Polling等技术。轮询是客户端定时向服务器发送请求询问是否有新数据;长轮询则是客户端发送请求后,服务器若没有新数据就保持连接,直到有新数据才响应。这些方式存在效率低、资源消耗大等问题。为了解决这些问题,WebSocket 协议应运而生,它由 HTML5 标准定义,旨在提供一种高效、实时的双向通信机制。
1.1 工作原理
Websocket协议的主流版本为Websocket 13,由RFC 6455定义,基于TCP协议实现。WebSocket 的实现原理基于其独特的协议设计和通信机制,核心在于建立持久化的全双工连接,突破传统 HTTP 请求-响应模式的限制。以下是其实现原理的详细解析:
-
握手阶段【协议升级】:
WebSocket通过HTTP协议发起握手,升级到WebSocket协议,过程如下:-
客户端请求:客户端发送
HTTP请求,携带Upgrade: websocket头,表明希望升级到WebSocket协议。GET /chat HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== # 随机16字节 Base64 编码,客户端生成的随机值,用于服务端验证 Sec-WebSocket-Version: 13 // 指定协议版本 Origin: https://example.com -
服务端响应:服务端验证请求后返回
101 Switching Protocols响应,完成协议升级。HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= # 由客户端 Key + GUID 哈希生成 -
握手完成:成功握手后,
TCP连接保持打开状态,后续通信直接使用Websocket协议帧,不在依赖Http。
-
-
数据传输阶段:一旦连接升级成功,就建立了一个全双工的
WebSocket连接。客户端和服务器可以在这个连接上随时向对方发送数据,而不需要像HTTP请求那样每次都重新建立连接。 -
关闭连接阶段:当一方想要关闭连接时,会发送一个关闭帧,另一方收到后也发送一个关闭帧进行确认,然后双方关闭连接。
1.2 特点与应用场景
Websocket一些有以下特点:
- 全双工通信:客户端和服务器可以同时、独立地向对方发送数据,这使得实时通信变得更加高效。例如,在在线聊天应用中,用户可以随时发送和接收消息。
- 低延迟:由于避免了
HTTP请求的开销,WebSocket连接建立后数据传输的延迟非常低,适合对实时性要求较高的应用,如金融交易系统、实时游戏等。 - 较少的控制开销:
WebSocket协议在连接建立后,只需要较少的控制信息来维持连接,相比于HTTP请求,减少了额外的头部信息,降低了带宽消耗。 - 跨平台兼容性:现代浏览器和大多数服务器端框架都支持
WebSocket协议,使得开发者可以方便地在不同平台上实现WebSocket通信。
基于以上特点,能够体现出Websocket协议的优势,以下是该协议的常用场景:
- 实时聊天应用:如在线客服系统、即时通讯软件等,用户可以实时发送和接收消息,实现高效的沟通。
- 实时数据更新:股票行情、体育赛事比分等需要实时更新数据的场景,服务器可以实时将最新数据推送给客户端。
- 多人在线游戏:在多人在线游戏中,玩家的操作和状态需要实时同步,
WebSocket可以保证游戏的流畅性和实时性。 - 多人协作:多个用户可以同时编辑同一个文档,服务器实时将每个用户的编辑操作同步给其他用户。
2. Python中使用Websocket
在 Python 中实现 WebSocket 协议通常借助第三方库,主流选择包括 websockets【轻量异步】、Tornado【异步框架】和 Django Channels【集成 Django 生态】。
| 库 | 特点 | 适用场景 |
|---|---|---|
websockets | 轻量级异步库,API 简洁,支持 Python 3.6+。 | 快速搭建简单 WebSocket 服务 |
Tornado | 异步网络框架,内置 WebSocket 支持,适合复杂应用。 | 高性能实时服务,如聊天服务器 |
Django Channels | 集成 Django,支持 WebSocket、HTTP/2 等协议,需搭配 ASGI 服务器。 | Django 项目中的实时功能扩展 |
2.1 websockets API
websockets 是一个用于在 Python 中实现 WebSocket 协议的异步库,适用于构建实时通信应用。安装指令为pip install websockets,该库的常用API如下:
-
服务端
API-
websockets.serve():创建WebSocket服务器。其参数如下:-
handler:处理客户端连接的异步函数,需接收websocket和path参数。 -
host/port:绑定地址和端口。 -
ping_interval/ping_timeout:心跳检测间隔和超时时间,默认禁用。 -
origins:允许的跨域来源,列表形式,如origins=["https://example.com"]。 -
ssl:SSL上下文,用于wss://安全连接。async def handler(websocket, path): pass server = websockets.serve(handler, "localhost", 8765, ping_interval=30)
-
-
WebSocketServer对象:通过serve()返回的服务器对象,通常用async with管理生命周期。async with websockets.serve(...) as server: await server.wait_closed() # 阻塞直到服务器关闭
-
-
客户端常用
API-
websockets.connect():连接到WebSocket服务器。其参数如下:-
uri:服务器地址,如ws://localhost:8765。 -
ping_interval/ping_timeout:客户端心跳配置。 -
ssl:用于HTTPS的SSL上下文。async with websockets.connect("ws://localhost:8765") as websocket: await websocket.send("Hello")
-
-
-
连接对象``WebSocketCommonProtocol
:无论是服务端还是客户端的连接,均通过websocket` 对象操作,核心方法如下。-
发送消息:
await websocket.send(message),发送文本或二进制数据。await websocket.send("Hello!") await websocket.send(json.dumps({"data": "test"}).encode()) -
接收消息:
await websocket.recv(),接收一条文本或二进制消息。// 循环接收 async for message in websocket: print(f"Received: {message}") -
关闭连接:
await websocket.close(code=1000, reason=""),主动关闭连接,支持状态码和原因,如1000【正常关闭】、1001【服务器终止】、1002【协议错误】。await websocket.close(code=1000, reason="Done") -
连接状态
websocket.open:连接是否处于打开状态。websocket.closed:连接是否已关闭。
-
-
异常处理
-
websockets.exceptions.ConnectionClosed:当连接意外时关闭抛出。try: await websocket.recv() except websockets.exceptions.ConnectionClosed as e: print(f"Connection closed: {e.code}, {e.reason}") -
InvalidHandshake:握手失败,如服务器未实现WebSocket。 -
InvalidMessage:消息格式错误。
-
-
配置
SSL/TLS【配置安全连接】-
服务端配置
import ssl ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain("server.crt", "server.key") server = websockets.serve(handler, "localhost", 8765, ssl=ssl_context) -
客户端配置
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.load_verify_locations("server.crt") async with websockets.connect("wss://localhost:8765", ssl=ssl_context) as ws: pass
-
2.2 async与await
在 Python 里,async 和 await 是异步编程的核心特性,它们基于 asyncio 库实现,能让代码在处理 I/O 密集型任务时更高效。
-
async关键字-
定义协程函数:
async用于定义协程函数。协程函数是一种特殊的函数,它不会像普通函数那样直接执行,而是返回一个协程对象。要执行协程函数,需要将其放到事件循环中。在下面例子中,greet是一个协程函数,调用它时并不会立即执行函数体中的代码,而是返回一个协程对象。import asyncio # 使用 async 定义协程函数 async def greet(): print("开始执行协程函数") await asyncio.sleep(1) # 模拟 I/O 操作 print("协程函数执行结束") return "Hello, World!" # 调用协程函数,返回协程对象 coro = greet() print(type(coro)) # <class 'coroutine'> -
异步生成器:
async还能用于定义异步生成器,异步生成器可以在异步环境中逐个生成值。在下面例子中,async_generator是一个异步生成器,async for用于迭代异步生成器中的值。import asyncio # 定义异步生成器 async def async_generator(): for i in range(3): await asyncio.sleep(1) yield i async def main(): async for num in async_generator(): print(num) asyncio.run(main()) -
await关键字-
暂停协程执行:
await只能在协程函数内部使用,它用于暂停当前协程的执行,等待一个可等待对象【另一个协程、Future对象、Task对象】完成,并返回其结果。当遇到await时,控制权会暂时交回给事件循环,事件循环可以去执行其他任务。在下面例子中,main协程函数里的await fetch_data()会暂停main协程的执行,直到fetch_data协程执行完毕,然后将fetch_data的返回值赋给result。import asyncio async def fetch_data(): print("开始获取数据") await asyncio.sleep(2) # 模拟耗时的数据获取操作 print("数据获取完成") return "Data" async def main(): result = await fetch_data() # 等待 fetch_data 协程完成 print(f"获取到的数据: {result}") # 创建事件循环并运行主协程 asyncio.run(main()) -
处理多个可等待对象:可以使用
asyncio.gather()同时运行多个协程,await会等待所有协程都完成。在下面例子中,asyncio.gather(task1(), task2())会并发运行task1和task2协程,await会等待这两个协程都完成,然后将它们的返回值以列表形式存储在results中。import asyncio async def task1(): print("Task 1 开始") await asyncio.sleep(1) print("Task 1 完成") return 1 async def task2(): print("Task 2 开始") await asyncio.sleep(2) print("Task 2 完成") return 2 async def main(): results = await asyncio.gather(task1(), task2()) print(f"所有任务的结果: {results}") asyncio.run(main())
-
-
在下面例子中,fetch 是一个协程函数,用于发送 HTTP 请求并返回响应内容。main 协程函数中,使用 asyncio.gather() 并发执行多个 fetch 协程,await 会等待所有请求都完成,然后打印每个响应内容的长度。
import asyncio
import aiohttp
# 异步函数,用于发送 HTTP 请求
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
# 主协程函数,用于并发执行多个请求
async def main():
urls = [
"http://example.com",
"http://example.org",
"http://example.net"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(len(result))
# 运行主协程
asyncio.run(main())
2.2 实时日志监控
// 服务端代码
import asyncio
import websockets
import time
connected = set()
async def log_monitor(websocket, path):
connected.add(websocket)
try:
# 模拟持续发送日志
while True:
log = f"Log at {time.ctime()}"
await websocket.send(log)
await asyncio.sleep(1)
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected.remove(websocket)
async def main():
async with websockets.serve(log_monitor, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
// 客户端代码
import asyncio
import websockets
async def log_client():
async with websockets.connect("ws://localhost:8765") as websocket:
async for log in websocket:
print(f"Received log: {log}")
asyncio.run(log_client())
