pymongo库:简易方式存取数据
文档
基础使用
- 前提:开发机器已安装mongo配置环境,已启动服务。
- 安装:
python3 -m pip install pymongo
- 导入:
import pymongo
- 与MongoClient建立连接:
client = MongoClient()
from pymongo import MongoClient# 创建连接的三种方式:(任选一种即可) ## 第一种:使用默认的主机 localhost 和端口号 27017 创建连接 client = MongoClient() ## 第二种:指定主机和端口创建连接 client_1 = MongoClient("localhost", 27017) ## 第三种:使用url格式创建连接 client_2 = MongoClient("mongodb://localhost:27017/")# 有账号密码时的连接 client3 = MongoClient(f"mongodb://{username}:{password}@localhost:27017/mydatabase?authSource=admin") ## 或者使用独立参数 client4 = MongoClient(host='localhost',port=27017,username='your_username',password='your_password@123',authSource='admin', # 认证数据库 ) ## 若密码包含特殊字符,则需要额外处理 from urllib.parse import quote_plus safe_password = quote_plus("p@ssw:rd/$")
- 获取数据库:
db = client.test_database
# 获取数据库的两种方式: ## 第一种:通过 .数据库名 获取 db = client.test_database ## 第二种:如果数据库名含特殊字符,不能使用属性样式访问时,可通过 字典式 访问 db1 = client['test-database']
- 收集(数据库表):
collection = db.test_collection
# 获取数据库表的两种方式: ## 第一种:通过 .表名 获取 collection = db.test_collection ## 第二种:如果表名含特殊字符,不能使用属性样式访问时,可通过 字典式 访问 collection1 = db['test-collection']
- 文件(表数据):可以包含原生Python类型(如
datetime.datetime
实例),它们将自动转换为相应的BSON
类型。(二进制json格式)# 构造表数据 import datetime post = {"author": "Mike","text": "My first blog post!","tags": ["mongodb", "python", "pymongo"],"date": datetime.datetime.now(tz=datetime.timezone.utc), }
- 插入文档(将数据插入到数据库表中):
collection.insert_one(post)
⚠️注意:由于延迟创建,此时(在向集合和数据库中插入第一个文档时)才会创建这些集合和数据库。# 将文档插入到集合中(即将数据插入到数据库表) post_id = collection.insert_one(post).inserted_id print(post_id) # 插入数据对应的id # 查看集合(数据库表) db.list_collection_names() # ['collection']
- 获取单个文档(获取表中某个数据):
result = collection.find_one({'key':'value'})
⚠️注意:返回的文档包含 “_id
” ,在插入时自动添加。(也可以按{'_id': ObjectId(post_id)}
来查找)# 默认获取第一个数据 collection.find_one() # 根据条件查找匹配的数据 collection.find_one({'key':'value'})
- 大容量插入(批量插入多条数据):
result = collection.insert_many(collection_data_list)
new_posts = [{"author": "Mike","text": "Another post!","tags": ["bulk", "insert"],"date": datetime.datetime(2009, 11, 12, 11, 14),},{"author": "Eliot","title": "MongoDB is fun","text": "and pretty easy too!","date": datetime.datetime(2009, 11, 10, 10, 45),}, ] result = collection.insert_many(new_posts) # 参数为列表 result.inserted_ids # 可以获取插入后的id列表
- 查询多个文档:
result = collection.find({'key':'value'})
,result是在collection表内搜索key=value的结果列表。 - 计数:
- 获取所有数据条数:
collection.count_documents({})
- 获取特定条件的数据条数:
collection.count_documents({"author": "Mike"})
- 获取所有数据条数:
- 范围查询:
collection.find({"date": {"$lt": date_value}}).sort("author")
,查询date_value日期之前的帖子,按作者对结果进行排序。d = datetime.datetime(2009, 11, 12, 12) result = collection.find({"date": {"$lt": d}}).sort("author") # result是一个字典列表
- 索引:添加索引有助于加速某些查询,还可以为查询和存储文档添加附加功能。
result = collection.create_index([("user_id", pymongo.ASCENDING)], unique=True) # 将user_id设置为唯一索引 indexs = sorted(list(collection.index_information())) # 获取collection表中已创建的索引列表
PyMongo 常用方法汇总
方法 | 使用场景 | 示例代码 | 关键特性 |
---|---|---|---|
find() | 条件查询、数据检索 | db.users.find({"status": "active"}, {"name": 1, "email": 1}) | 支持投影、排序、分页链式操作 |
insert_one() | 插入单个文档 | result = db.products.insert_one({"name": "Mouse", "price": 29.99}) print(result.inserted_id) | 返回插入文档的_id |
insert_many() | 批量插入文档 | users = [{"name": "Alice"}, {"name": "Bob"}] result = db.users.insert_many(users, ordered=False) | 支持有序/无序插入,返回插入ID列表 |
update_one() | 更新单个文档 | db.users.update_one( {"_id": 123}, {"$set": {"status": "active"}} ) | 使用$set避免覆盖整个文档 |
update_many() | 批量更新文档 | result = db.orders.update_many( {"status": "pending"}, {"$inc": {"retries": 1}} ) | 返回匹配和修改的文档计数 |
delete_one() | 删除单个文档 | db.sessions.delete_one({"expire_at": {"$lt": datetime.now()}}) | 删除第一个匹配文档 |
delete_many() | 批量删除文档 | result = db.logs.delete_many({"created": {"$lt": datetime(2023,1,1)}}) print(result.deleted_count) | 返回删除的文档数量 |
count_documents() | 统计符合条件的文档数量 | active_users = db.users.count_documents( {"status": "active", "last_login": {"$gt": last_month}} ) | 替代已废弃的count()方法 |
distinct() | 获取字段的唯一值列表 | categories = db.products.distinct("category") | 比聚合$group更高效 |
aggregate() ⚠️ | 复杂数据分析、多阶段处理 | pipeline = [ {"$match": {"status": "active"}}, {"$group": {"_id": "$dept", "count": {"$sum": 1}}} ] results = db.users.aggregate(pipeline) | 支持 $match , $group , $lookup 等聚合阶段,先 $match 过滤比较高效 |
find_one() | 查找单个文档 | user = db.users.find_one({"email": "user@example.com"}) | 直接返回文档对象或None |
find_one_and_update() | 原子查找并更新文档 | task = db.tasks.find_one_and_update( {"status": "pending"}, {"$set": {"status": "processing"}}, return_document=ReturnDocument.AFTER ) | 保证操作的原子性 |
create_index() | 创建查询索引 | db.orders.create_index([("user_id", 1), ("create_time", -1)]) | 显著提升查询性能 |
bulk_write() | 高性能批量操作 | ops = [ UpdateOne({"id": 1}, {"$inc": {"views": 1}}), DeleteMany({"expired": True}) ] result = db.collection.bulk_write(ops, ordered=False) | 减少网络往返,提升吞吐量,ordered=False 无序操作更快 |
drop() | 删除整个集合 | db.temp_data.drop() | 谨慎使用,不可逆操作 |
根据使用场景选择对应的方法:
aggregate中pipeline的核心聚合操作符速查表
操作符 | 类别 | 功能描述 | 语法示例 |
---|---|---|---|
$match | 筛选阶段 | 过滤文档 (类似SQL的WHERE) | {"$match": {"status": "active"}} |
$project | 重塑阶段 | 选择/重命名字段 (类似SELECT) | {"$project": {"name": 1, "year": {"$year": "$date"}}} |
$group | 分组阶段 | 按字段分组并计算聚合值 | {"$group": {"_id": "$dept", "total": {"$sum": "$salary"}}} |
$sort | 排序阶段 | 结果排序 (1升序, -1降序) | {"$sort": {"age": -1, "name": 1}} |
$limit | 限制阶段 | 限制输出文档数量 | {"$limit": 10} |
$skip | 跳过阶段 | 跳过指定数量文档 | {"$skip": 5} |
$unwind | 数组处理 | 展开数组为多条文档 | {"$unwind": "$tags"} |
$lookup | 关联查询 | 跨集合关联查询 (类似SQL JOIN) | {"$lookup": { "from": "products", # 关联集合 "localField": "product_id", # 本地字段 "foreignField": "_id", # 关联集合字段 "as": "product_info" # 输出字段名}} |
$addFields | 字段操作 | 添加新字段 (不改变原字段) | {"$addFields": {"discount": {"$multiply": ["$price", 0.9]}}} |
$count | 统计阶段 | 返回文档总数 | {"$count": "total_users"} |
$facet | 多管道处理 | 同一输入执行多个聚合管道 | pipeline = [ # 单次查询获取多种统计结果 {"$facet": { "department_stats": [ # 部门统计管道 {"$group": {... }}], "age_distribution": [ # 年龄分布管道 {"$bucket": {... }} ], "total_count": [ # 总计数管道 {"$count": "value"} ] }}] |
在聚合管道中使用的核心表达式操作符:
类型 | 操作符 |
---|---|
算术运算 | $add , $subtract , $multiply , $divide , $mod |
比较运算 | $eq , $ne , $gt , $gte , $lt , $lte , $cmp |
逻辑运算 | $and , $or , $not , $cond (三元表达式) |
日期处理 | $year , $month , $dayOfMonth , $hour , $minute , $dateToString |
字符串处理 | $concat , $substr , $toLower , $toUpper , $trim , $split |
数组处理 | $size , $slice , $map , $filter , $in , $arrayElemAt |
数据类型 | $type , $convert , $toInt , $toString , $toDate |
条件处理 | $ifNull , $switch (多分支条件) |
聚合管道优化策略
-
管道顺序优化原则
-
性能优化技巧
- 将
$match
和$project
放在管道最前端减少数据处理量 - 在
$lookup
前使用$match
过滤关联集合数据 - 避免在
$group
中使用$push
操作大数组 - 对
$sort
和$match
使用的字段创建索引 - 使用
allowDiskUse=True
处理大数据集result = collection.aggregate(pipeline, allowDiskUse=True)
- 将