当前位置: 首页 > news >正文

MongoDB 集合更新后通过 Socket.IO 自动推送数据到前端 (FastAPI 实现)

MongoDB 集合更新后通过 Socket.IO 自动推送数据到前端 (FastAPI 实现)

要实现 MongoDB 集合更新后自动推送数据到前端的功能,你需要以下几个组件:

  1. MongoDB 变更流 (Change Streams) 监听集合变化
  2. FastAPI 后端处理 WebSocket 连接
  3. Socket.IO 实现实时通信
  4. 前端监听 Socket.IO 事件

后端实现 (FastAPI)

首先安装必要的依赖:

pip install fastapi uvicorn motor pymongo python-socketio

1. 创建 FastAPI 应用并设置 MongoDB 连接

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
from motor.motor_asyncio import AsyncIOMotorClient
import asyncioapp = FastAPI()# 设置 CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# MongoDB 配置
MONGO_URL = "mongodb://localhost:27017"
DB_NAME = "your_database"
COLLECTION1 = "collection1"
COLLECTION2 = "collection2"# 初始化 MongoDB 客户端
mongo_client = AsyncIOMotorClient(MONGO_URL)
db = mongo_client[DB_NAME]# 初始化 Socket.IO
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio, app)

2. 设置 MongoDB 变更流监听

async def watch_collection(collection_name):collection = db[collection_name]async with collection.watch() as stream:async for change in stream:print(f"Change detected in {collection_name}: {change}")# 发送变更通知给所有客户端await sio.emit(f"{collection_name}_changed", change)

3. 启动变更流监听任务

@app.on_event("startup")
async def startup_event():# 启动两个集合的监听任务asyncio.create_task(watch_collection(COLLECTION1))asyncio.create_task(watch_collection(COLLECTION2))

4. 设置 Socket.IO 事件处理

@sio.event
async def connect(sid, environ):print(f"Client connected: {sid}")@sio.event
async def disconnect(sid):print(f"Client disconnected: {sid}")

5. 运行应用

if __name__ == "__main__":import uvicornuvicorn.run(socket_app, host="0.0.0.0", port=8000)

前端实现

前端需要使用 Socket.IO 客户端库来监听后端推送的变更。

HTML/JavaScript 示例

<!DOCTYPE html>
<html>
<head><title>Real-time MongoDB Updates</title><script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
</head>
<body><h1>Real-time Updates</h1><div id="collection1-updates"></div><div id="collection2-updates"></div><script>const socket = io('http://localhost:8000');// 监听集合1的变更socket.on('collection1_changed', (data) => {console.log('Collection1 changed:', data);const div = document.getElementById('collection1-updates');div.innerHTML += `<p>${JSON.stringify(data)}</p>`;});// 监听集合2的变更socket.on('collection2_changed', (data) => {console.log('Collection2 changed:', data);const div = document.getElementById('collection2-updates');div.innerHTML += `<p>${JSON.stringify(data)}</p>`;});// 连接事件socket.on('connect', () => {console.log('Connected to server');});// 断开事件socket.on('disconnect', () => {console.log('Disconnected from server');});</script>
</body>
</html>

高级配置

1. 过滤变更事件

你可以修改 watch_collection 函数来过滤特定类型的变更:

async def watch_collection(collection_name):pipeline = [{'$match': {'$or': [{'operationType': 'insert'},{'operationType': 'update'},{'operationType': 'replace'},{'operationType': 'delete'}]}}]collection = db[collection_name]async with collection.watch(pipeline=pipeline) as stream:async for change in stream:print(f"Change detected in {collection_name}: {change}")await sio.emit(f"{collection_name}_changed", change)

2. 安全性考虑

  • 添加认证中间件保护 WebSocket 连接
  • 限制可以监听的集合
  • 过滤敏感数据不发送到前端

3. 性能优化

  • 使用批处理发送多个变更
  • 添加速率限制
  • 考虑使用 Redis 作为 Socket.IO 的消息队列

完整示例

完整的后端代码示例:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
from motor.motor_asyncio import AsyncIOMotorClient
import asyncioapp = FastAPI()# CORS 配置
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# MongoDB 配置
MONGO_URL = "mongodb://localhost:27017"
DB_NAME = "your_database"
COLLECTION1 = "collection1"
COLLECTION2 = "collection2"# 初始化 MongoDB
mongo_client = AsyncIOMotorClient(MONGO_URL)
db = mongo_client[DB_NAME]# 初始化 Socket.IO
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
socket_app = socketio.ASGIApp(sio, app)async def watch_collection(collection_name):"""监听 MongoDB 集合变更并广播到客户端"""pipeline = [{'$match': {'$or': [{'operationType': 'insert'},{'operationType': 'update'},{'operationType': 'replace'},{'operationType': 'delete'}]}}]collection = db[collection_name]try:async with collection.watch(pipeline=pipeline) as stream:print(f"开始监听集合 {collection_name} 的变更...")async for change in stream:print(f"变更检测于 {collection_name}: {change}")# 发送变更到特定房间或所有客户端await sio.emit(f"{collection_name}_changed", {"operation": change["operationType"],"data": change.get("fullDocument"),"id": change.get("documentKey", {}).get("_id")})except Exception as e:print(f"监听集合 {collection_name} 时出错: {e}")# 出错后重新连接await asyncio.sleep(5)asyncio.create_task(watch_collection(collection_name))@app.on_event("startup")
async def startup_event():"""应用启动时开始监听集合变更"""asyncio.create_task(watch_collection(COLLECTION1))asyncio.create_task(watch_collection(COLLECTION2))@sio.event
async def connect(sid, environ):"""客户端连接事件"""print(f"客户端已连接: {sid}")await sio.emit("message", {"data": f"欢迎! 你的ID是 {sid}"}, room=sid)@sio.event
async def disconnect(sid):"""客户端断开事件"""print(f"客户端已断开: {sid}")if __name__ == "__main__":import uvicornuvicorn.run(socket_app, host="0.0.0.0", port=8000)

这个实现提供了完整的从 MongoDB 变更到前端实时更新的流程,你可以根据具体需求进行调整和扩展。

http://www.dtcms.com/a/462279.html

相关文章:

  • 东胜网站建设医疗网站前置审批查询
  • windows如何设置mongodb的副本集
  • 物流网站有哪些网站被做301跳转了怎么办
  • shell脚本命令删除Zookeeper提供者服务中的指定IP节点
  • 六安网站制作公司价格龙口网络
  • Node.js使用Express框架解决中文乱码问题
  • 设计模式--桥接模式:解耦抽象与实现的灵活设计
  • 做竞价的网站怎么做网站数据库备份
  • 基于FireBeetle 2 ESP32-C5的智能植物光照系统——物联网农业实践
  • 天津住房与城乡建设厅网站首页包头学做网站
  • 【Frida Android】基础篇1:基础环境配置
  • YOLOv11安卓目标检测App完整开发指南
  • 鸿蒙NEXT实战:使用公共事件实现跨进程通信
  • npm升级提示error engine not compatible with your version of node/npm: npm@11.6.2
  • 我的网站为什么打不开怎么回事啊携程做旅游的网站
  • 网站推广的表现方式网站开发需要用到哪些设备
  • 缓存大杀器-redis
  • 网站建设管理方案网站开发与app开发的区别
  • 装修公司网站制作大数据营销成功案例
  • 【STM32】I2C通信—硬件外设
  • 脚手架学习
  • 做网站好还是做淘宝好现在手机网站用什么做的
  • 建设行业网站平台的瓶颈网站网页
  • 【Linux】线程概念与控制(2)
  • vue项目发布后图标乱码解决方案
  • 成都手机网站重庆本地建站
  • UI设计(二)赛博科技修仙通讯录——东方仙盟筑基期
  • 实时数仓历史数据优化
  • 网站建设在哪能看企业网站建立流程的第一步是什么
  • 告别手动配置:用 Terraform 定义你的 RustFS 存储帝国