OpenWebUI(5)源码学习-后端socket通信模块
目录
- 引言
- 📁 main.py
- 🔧 核心作用
- ⚙️ 主要功能详解
- 1. **WebSocket 初始化**
- 2. **用户会话池管理**
- 3. **模型使用状态追踪**
- 4. **频道(Channel)消息处理**
- 5. **用户连接与断开处理**
- 6. **聊天事件分发器**
- 📁 utils.py
- 🔧 核心作用
- ⚙️ 主要功能详解
- 1. **Redis 锁机制 —— `RedisLock`**
- 2. **Redis 字典封装 —— `RedisDict`**
- 🧩 功能模块关联图
- 🧠 典型应用场景
- ✅ 实时聊天输出
- ✅ 用户在线状态同步
- ✅ 模型使用监控
- ✅ 频道消息广播
- 🛡️ 权限与安全控制
- 📦 数据结构概览
- 🧹 定时任务:usage_pool 清理
- ✅ 总结
- 🧠 技术亮点
引言
当前目录 open-webui\backend\open_webui\socket
包含两个核心文件:
main.py
utils.py
它们共同构成了 Open WebUI 的 WebSocket 通信模块,负责实现前后端实时交互、用户状态管理、模型使用监控和聊天事件广播等功能。
📁 main.py
🔧 核心作用
提供基于 Socket.IO
的 WebSocket 服务,支持以下功能:
- 实时聊天消息传递(如流式输出)
- 用户在线状态同步
- 模型调用状态广播
- 频道(群组)消息转发
- 多实例部署下的 Redis 同步支持
⚙️ 主要功能详解
1. WebSocket 初始化
if WEBSOCKET_MANAGER == "redis":mgr = socketio.AsyncRedisManager(...)sio = socketio.AsyncServer(client_manager=mgr, ...)
else:sio = socketio.AsyncServer()
- 支持两种模式:
- 单机模式(默认内存管理)
- 分布式模式(通过 Redis 管理连接池)
2. 用户会话池管理
- 使用
SESSION_POOL
和USER_POOL
跟踪每个用户的 WebSocket 连接。 - 每个用户可有多个 session ID(sid),例如来自不同设备或浏览器标签页的连接。
3. 模型使用状态追踪
USAGE_POOL
记录当前正在被使用的模型及其对应的连接。- 定期清理过期连接并广播更新的模型使用情况。
4. 频道(Channel)消息处理
- 支持用户加入特定频道,并在频道内广播消息(如打字状态、消息更新等)。
- 使用房间机制隔离不同频道的消息流。
5. 用户连接与断开处理
[connect](file://open-webui\backend\open_webui\socket\main.py#L177-L194)
: 当用户连接时记录其身份信息。[disconnect](file://open-webui\backend\open_webui\socket\main.py#L286-L300)
: 用户断开时清理会话池数据。user-list
: 广播当前在线用户列表。
6. 聊天事件分发器
提供 [get_event_emitter(request_info)](file://open-webui\backend\open_webui\socket\main.py#L303-L370)
工具函数,用于从后端向前端推送聊天相关事件,如:
- 流式输出 (
type: message
) - 状态更新 (
type: status
) - 内容替换 (
type: replace
)
这些事件将被发送到指定用户的 WebSocket 会话中。
📁 utils.py
🔧 核心作用
为 main.py
提供底层工具支持,特别是针对 Redis 数据结构的封装和分布式锁的实现。
⚙️ 主要功能详解
1. Redis 锁机制 —— RedisLock
class RedisLock:def aquire_lock(self): ...def renew_lock(self): ...def release_lock(self): ...
- 用于确保在多节点部署环境下只有一个实例执行周期性任务(如
usage_pool
清理)。 - 基于 Redis 的原子操作实现安全加锁/解锁。
2. Redis 字典封装 —— RedisDict
class RedisDict:def __setitem__(self, key, value): ...def __getitem__(self, key): ...def items(), keys(), values()...
- 将 Redis Hash 映射为 Python 字典接口。
- 支持跨实例共享数据(如用户池、会话池、模型使用池)。
示例:
SESSION_POOL[sid] = user.model_dump()
🧩 功能模块关联图
模块 | 关联说明 |
---|---|
models.users.Users | 获取用户信息,验证登录状态 |
models.channels.Channels | 获取用户所属频道并自动加入 |
models.chats.Chats | 更新聊天状态、消息内容 |
utils.redis.get_redis_connection | 提供 Redis 连接能力 |
env.* | 控制是否启用 WebSocket、Redis 地址、超时时间等 |
🧠 典型应用场景
✅ 实时聊天输出
- 模型生成文本时,通过
sio.emit("chat-events", ...)
推送流式输出。 - 前端监听该事件以实现实时显示效果。
✅ 用户在线状态同步
- 用户连接时自动加入全局用户池。
- 断开时自动移除,并广播更新后的在线用户列表。
✅ 模型使用监控
- 当某个模型被调用时,记录其使用状态。
- 前端可订阅
[usage](file://open-webui\backend\open_webui\socket\main.py#L160-L173)
事件,实时展示“当前谁在用哪个模型”。
✅ 频道消息广播
- 用户加入频道后,系统为其创建专属房间。
- 所有频道内的消息、打字状态等事件只推送给该房间成员。
🛡️ 权限与安全控制
- 所有连接必须携带有效的 JWT Token。
- 使用
[decode_token](file://open-webui\backend\open_webui\utils\auth.py#L130-L135)
解析用户身份。 - 只允许合法用户加入自己的频道。
- 消息发送前检查目标 SID 是否属于当前用户。
📦 数据结构概览
结构 | 类型 | 描述 |
---|---|---|
SESSION_POOL | dict / RedisDict | 存储每个 sid 对应的用户信息 |
USER_POOL | dict / RedisDict | 存储每个用户对应的所有 sid |
USAGE_POOL | dict / RedisDict | 跟踪当前正在使用的模型及连接 |
clean_up_lock | RedisLock | 控制分布式环境下的定时清理任务互斥访问 |
🧹 定时任务:usage_pool 清理
async def periodic_usage_pool_cleanup():...
- 每隔
[TIMEOUT_DURATION](file://open-webui\backend\open_webui\socket\main.py#L65-L65)
秒清理过期连接。 - 自动释放 Redis 锁,防止死锁。
- 触发后广播最新的模型使用状态。
✅ 总结
socket
目录是 Open WebUI 中实现实时通信的核心组件之一。它不仅提供了基础的 WebSocket 服务,还集成了 Redis 支持、模型使用监控、频道通信、用户状态跟踪等高级功能,适用于构建高并发、多用户、多模型的 AI 对话平台。
🧠 技术亮点
特性 | 说明 |
---|---|
实时通信 | 使用 socket.io 实现双向通信 |
分布式支持 | 支持 Redis 管理连接池,适用于集群部署 |
状态同步 | 维护用户在线状态、模型使用情况 |
事件驱动 | 支持自定义事件(如 typing、message、status) |
频道消息 | 支持按频道组织用户并广播消息 |
此模块为整个系统的实时性、协作性和状态一致性提供了关键支撑,是打造互动式 AI 应用的重要基础设施。