只有通过Motor 获取 mongodb的collection,才能正常使用 async with collection.watch()监听集合变更
在使用 async with collection.watch(...) as stream:
监听集合变更时,这个 collection
对象(即 MongoDB 集合连接)应该如何正确获取?
✅ 核心答案
要让 async with collection.watch()
正常工作,collection
必须是一个 motor.motor_asyncio.AsyncIOMotorCollection
实例。
这意味着你需要通过 Motor 异步驱动 的完整链路来创建它:
AsyncIOMotorClient → Database → Collection
✅ 正确获取 collection
对象的步骤
1. 安装 Motor(如果还没安装)
pip install motor
2. 创建全局异步客户端(通常在应用启动时初始化)
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import ReadPreference# MongoDB 副本集连接字符串(必须是副本集才能用 watch)
MONGO_URL = "mongodb://127.0.0.1:27017,127.0.0.1:27018/?replicaSet=rs0"# 创建异步客户端(通常只创建一次,全局复用)
client = AsyncIOMotorClient(MONGO_URL,maxPoolSize=10,minPoolSize=1,connectTimeoutMS=5000,serverSelectionTimeoutMS=5000,socketTimeoutMS=10000,
)# 获取数据库对象
db = client["your_database_name"] # 替换为你的数据库名# 获取集合对象(这才是你要的 collection)
collection = db["upload_task_collection"] # 替换为你的集合名
此时:
print(type(collection))
# 输出:<class 'motor.motor_asyncio.AsyncIOMotorCollection'>
✅ 这个 collection
才能用于 async with collection.watch()
。
✅ 封装成工具类或依赖注入(推荐做法)
class AsyncMongoHelper:def __init__(self, uri: str, db_name: str):self.client = AsyncIOMotorClient(uri)self.db = self.client[db_name]def get_collection(self, name: str):return self.db[name]# 使用示例
mongo_helper = AsyncMongoHelper(uri="mongodb://127.0.0.1:27017,127.0.0.1:27018/?replicaSet=rs0",db_name="myapp_db"
)# 获取集合对象
collection = mongo_helper.get_collection("upload_task_collection")
然后传入监听函数:
async def watch_collection(collection, collection_name):pipeline = [{"$match": {"operationType": "insert"}}]try:async with collection.watch(pipeline=pipeline) as stream:print(f"开始监听集合 {collection_name} 的变更...")async for change in stream:print(f"变更: {change}")# 处理变更...except Exception as e:print(f"监听出错: {e}")
调用:
await watch_collection(collection, "upload_task_collection")
✅ 如果你在 FastAPI 中使用
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClientapp = FastAPI()@app.on_event("startup")
async def startup_db_client():app.mongodb_client = AsyncIOMotorClient("mongodb://127.0.0.1:27017/?replicaSet=rs0")app.mongodb = app.mongodb_client["mydatabase"]@app.on_event("shutdown")
async def shutdown_db_client():app.mongodb_client.close()# 路由中获取 collection
@app.get("/watch")
async def start_watch():collection = app.mongodb["upload_task_collection"]await watch_collection(collection, "upload_task_collection")return {"status": "watching"}
❌ 错误示范:不要这样做
import pymongo# ❌ 错误:这是同步 PyMongo 的集合
sync_collection = pymongo.MongoClient().mydb.mycollection# ❌ 错误:这不是 AsyncIOMotorCollection,不能用 async with
async with sync_collection.watch(): # AttributeError: __aenter__...
🔁 总结:如何获取正确的 collection
对象?
步骤 | 操作 |
---|---|
1️⃣ | 使用 pip install motor |
2️⃣ | 创建 AsyncIOMotorClient(uri) |
3️⃣ | 获取数据库:client['db_name'] |
4️⃣ | 获取集合:db['collection_name'] → 得到 AsyncIOMotorCollection |
5️⃣ | 用它进行 async with collection.watch() |
⚠️ 必要前提条件
- ✅ MongoDB 必须运行在 副本集模式(单机不支持
watch
) - ✅ 连接字符串必须包含
?replicaSet=rs0
(名称要匹配) - ✅ 用户有
changeStream
权限 - ✅ 使用
motor
,不是pymongo
同步驱动
✅ 只要你通过 Motor 获取 collection
,就能正常使用 async with collection.watch()
。