MongoDB 集合更新后通过 Socket.IO 自动推送数据到前端 (FastAPI 实现)
MongoDB 集合更新后通过 Socket.IO 自动推送数据到前端 (FastAPI 实现)
要实现 MongoDB 集合更新后自动推送数据到前端的功能,你需要以下几个组件:
- MongoDB 变更流 (Change Streams) 监听集合变化
- FastAPI 后端处理 WebSocket 连接
- Socket.IO 实现实时通信
- 前端监听 Socket.IO 事件
后端实现 (FastAPI)
首先安装必要的依赖:
pip install fastapi uvicorn motor pymongo python-socketio
1. 创建 FastAPI 应用并设置 MongoDB 连接
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
from motor.motor_asyncio import AsyncIOMotorClient
import asyncioapp = FastAPI()# 设置 CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# MongoDB 配置
MONGO_URL = "mongodb://localhost:27017"
DB_NAME = "your_database"
COLLECTION1 = "collection1"
COLLECTION2 = "collection2"# 初始化 MongoDB 客户端
mongo_client = AsyncIOMotorClient(MONGO_URL)
db = mongo_client[DB_NAME]# 初始化 Socket.IO
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio, app)
2. 设置 MongoDB 变更流监听
async def watch_collection(collection_name):collection = db[collection_name]async with collection.watch() as stream:async for change in stream:print(f"Change detected in {collection_name}: {change}")# 发送变更通知给所有客户端await sio.emit(f"{collection_name}_changed", change)
3. 启动变更流监听任务
@app.on_event("startup")
async def startup_event():# 启动两个集合的监听任务asyncio.create_task(watch_collection(COLLECTION1))asyncio.create_task(watch_collection(COLLECTION2))
4. 设置 Socket.IO 事件处理
@sio.event
async def connect(sid, environ):print(f"Client connected: {sid}")@sio.event
async def disconnect(sid):print(f"Client disconnected: {sid}")
5. 运行应用
if __name__ == "__main__":import uvicornuvicorn.run(socket_app, host="0.0.0.0", port=8000)
前端实现
前端需要使用 Socket.IO 客户端库来监听后端推送的变更。
HTML/JavaScript 示例
<!DOCTYPE html>
<html>
<head><title>Real-time MongoDB Updates</title><script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
</head>
<body><h1>Real-time Updates</h1><div id="collection1-updates"></div><div id="collection2-updates"></div><script>const socket = io('http://localhost:8000');// 监听集合1的变更socket.on('collection1_changed', (data) => {console.log('Collection1 changed:', data);const div = document.getElementById('collection1-updates');div.innerHTML += `<p>${JSON.stringify(data)}</p>`;});// 监听集合2的变更socket.on('collection2_changed', (data) => {console.log('Collection2 changed:', data);const div = document.getElementById('collection2-updates');div.innerHTML += `<p>${JSON.stringify(data)}</p>`;});// 连接事件socket.on('connect', () => {console.log('Connected to server');});// 断开事件socket.on('disconnect', () => {console.log('Disconnected from server');});</script>
</body>
</html>
高级配置
1. 过滤变更事件
你可以修改 watch_collection
函数来过滤特定类型的变更:
async def watch_collection(collection_name):pipeline = [{'$match': {'$or': [{'operationType': 'insert'},{'operationType': 'update'},{'operationType': 'replace'},{'operationType': 'delete'}]}}]collection = db[collection_name]async with collection.watch(pipeline=pipeline) as stream:async for change in stream:print(f"Change detected in {collection_name}: {change}")await sio.emit(f"{collection_name}_changed", change)
2. 安全性考虑
- 添加认证中间件保护 WebSocket 连接
- 限制可以监听的集合
- 过滤敏感数据不发送到前端
3. 性能优化
- 使用批处理发送多个变更
- 添加速率限制
- 考虑使用 Redis 作为 Socket.IO 的消息队列
完整示例
完整的后端代码示例:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
from motor.motor_asyncio import AsyncIOMotorClient
import asyncioapp = FastAPI()# CORS 配置
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# MongoDB 配置
MONGO_URL = "mongodb://localhost:27017"
DB_NAME = "your_database"
COLLECTION1 = "collection1"
COLLECTION2 = "collection2"# 初始化 MongoDB
mongo_client = AsyncIOMotorClient(MONGO_URL)
db = mongo_client[DB_NAME]# 初始化 Socket.IO
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio, app)async def watch_collection(collection_name):"""监听 MongoDB 集合变更并广播到客户端"""pipeline = [{'$match': {'$or': [{'operationType': 'insert'},{'operationType': 'update'},{'operationType': 'replace'},{'operationType': 'delete'}]}}]collection = db[collection_name]try:async with collection.watch(pipeline=pipeline) as stream:print(f"开始监听集合 {collection_name} 的变更...")async for change in stream:print(f"变更检测于 {collection_name}: {change}")# 发送变更到特定房间或所有客户端await sio.emit(f"{collection_name}_changed", {"operation": change["operationType"],"data": change.get("fullDocument"),"id": change.get("documentKey", {}).get("_id")})except Exception as e:print(f"监听集合 {collection_name} 时出错: {e}")# 出错后重新连接await asyncio.sleep(5)asyncio.create_task(watch_collection(collection_name))@app.on_event("startup")
async def startup_event():"""应用启动时开始监听集合变更"""asyncio.create_task(watch_collection(COLLECTION1))asyncio.create_task(watch_collection(COLLECTION2))@sio.event
async def connect(sid, environ):"""客户端连接事件"""print(f"客户端已连接: {sid}")await sio.emit("message", {"data": f"欢迎! 你的ID是 {sid}"}, room=sid)@sio.event
async def disconnect(sid):"""客户端断开事件"""print(f"客户端已断开: {sid}")if __name__ == "__main__":import uvicornuvicorn.run(socket_app, host="0.0.0.0", port=8000)
这个实现提供了完整的从 MongoDB 变更到前端实时更新的流程,你可以根据具体需求进行调整和扩展。