当前位置: 首页 > news >正文

只有通过Motor 获取 mongodb的collection,才能正常使用 async with collection.watch()监听集合变更

在使用 async with collection.watch(...) as stream: 监听集合变更时,这个 collection 对象(即 MongoDB 集合连接)应该如何正确获取?


✅ 核心答案

要让 async with collection.watch() 正常工作,collection 必须是一个 motor.motor_asyncio.AsyncIOMotorCollection 实例

这意味着你需要通过 Motor 异步驱动 的完整链路来创建它:

AsyncIOMotorClient → Database → Collection

✅ 正确获取 collection 对象的步骤

1. 安装 Motor(如果还没安装)
pip install motor
2. 创建全局异步客户端(通常在应用启动时初始化)
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import ReadPreference# MongoDB 副本集连接字符串(必须是副本集才能用 watch)
MONGO_URL = "mongodb://127.0.0.1:27017,127.0.0.1:27018/?replicaSet=rs0"# 创建异步客户端(通常只创建一次,全局复用)
client = AsyncIOMotorClient(MONGO_URL,maxPoolSize=10,minPoolSize=1,connectTimeoutMS=5000,serverSelectionTimeoutMS=5000,socketTimeoutMS=10000,
)# 获取数据库对象
db = client["your_database_name"]  # 替换为你的数据库名# 获取集合对象(这才是你要的 collection)
collection = db["upload_task_collection"]  # 替换为你的集合名

此时:

print(type(collection))
# 输出:<class 'motor.motor_asyncio.AsyncIOMotorCollection'>

✅ 这个 collection 才能用于 async with collection.watch()


✅ 封装成工具类或依赖注入(推荐做法)

class AsyncMongoHelper:def __init__(self, uri: str, db_name: str):self.client = AsyncIOMotorClient(uri)self.db = self.client[db_name]def get_collection(self, name: str):return self.db[name]# 使用示例
mongo_helper = AsyncMongoHelper(uri="mongodb://127.0.0.1:27017,127.0.0.1:27018/?replicaSet=rs0",db_name="myapp_db"
)# 获取集合对象
collection = mongo_helper.get_collection("upload_task_collection")

然后传入监听函数:

async def watch_collection(collection, collection_name):pipeline = [{"$match": {"operationType": "insert"}}]try:async with collection.watch(pipeline=pipeline) as stream:print(f"开始监听集合 {collection_name} 的变更...")async for change in stream:print(f"变更: {change}")# 处理变更...except Exception as e:print(f"监听出错: {e}")

调用:

await watch_collection(collection, "upload_task_collection")

✅ 如果你在 FastAPI 中使用

from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClientapp = FastAPI()@app.on_event("startup")
async def startup_db_client():app.mongodb_client = AsyncIOMotorClient("mongodb://127.0.0.1:27017/?replicaSet=rs0")app.mongodb = app.mongodb_client["mydatabase"]@app.on_event("shutdown")
async def shutdown_db_client():app.mongodb_client.close()# 路由中获取 collection
@app.get("/watch")
async def start_watch():collection = app.mongodb["upload_task_collection"]await watch_collection(collection, "upload_task_collection")return {"status": "watching"}

❌ 错误示范:不要这样做

import pymongo# ❌ 错误:这是同步 PyMongo 的集合
sync_collection = pymongo.MongoClient().mydb.mycollection# ❌ 错误:这不是 AsyncIOMotorCollection,不能用 async with
async with sync_collection.watch():  # AttributeError: __aenter__...

🔁 总结:如何获取正确的 collection 对象?

步骤操作
1️⃣使用 pip install motor
2️⃣创建 AsyncIOMotorClient(uri)
3️⃣获取数据库:client['db_name']
4️⃣获取集合:db['collection_name'] → 得到 AsyncIOMotorCollection
5️⃣用它进行 async with collection.watch()

⚠️ 必要前提条件

  • ✅ MongoDB 必须运行在 副本集模式(单机不支持 watch
  • ✅ 连接字符串必须包含 ?replicaSet=rs0(名称要匹配)
  • ✅ 用户有 changeStream 权限
  • ✅ 使用 motor,不是 pymongo 同步驱动

✅ 只要你通过 Motor 获取 collection,就能正常使用 async with collection.watch()

http://www.dtcms.com/a/462393.html

相关文章:

  • 做一个网站 如何盈利网站开发一般用什么软件有哪些
  • 能够完美“适配”不同传感器的语音芯片WT2003H
  • 怎样建设网站的步骤网站建设中js控制什么
  • 陇南地网站建设黄骅市有什么好玩的地方
  • 零基础从头教学Linux(Day 46)
  • RK3588从数据集到训练到部署YoloV8
  • 网站商城建设价格做网站的一定要开80或8080端口
  • STranslate(翻译工具OCR工具) 中文绿色版
  • 算法学习 || 动态规划(买卖股票的最佳时机2)
  • 网站常用图标素材哈尔滨手机网站建设
  • 各种爬虫框架及其特点
  • 架设网站多少钱郑州汉狮做网站的大公司
  • 大厂MySQL数据库规范文档
  • 怎么做网站 高中信息技术锡盟建设工程网站
  • 公司做网站的费用会计分录营销手机软件开发定制
  • 怎么做垂直网站专业营销的网站建设公司哪家好
  • 福田市网站建设推广外包做的网站可以直接去收录吗
  • cynest下料喷漆挡板 exit图形
  • HandBrake:免费无广告,压缩、格式转换
  • 从不订购的客户-力扣
  • 大学文明校园网站建设方案怎么建网址
  • 保姆级教程-剪映多视频融合及识别文字转换方法
  • 北京建筑公司网站东莞手机网站建设怎么选
  • HTTP 头部 和 Headers 对象
  • 张掖作风建设年网站湖南seo网站多少钱
  • gps的时间精度
  • 如何在工商局网站做身份确认广东短视频推广效果好
  • 第29节:第二阶段总结 - 打造一个3D游戏原型
  • 设计的有趣的网站推荐怎样申请免费网站域名
  • --- 前后端的文件交互 ---