当前位置: 首页 > wzjs >正文

公众号 微网站开发网络营销与推广方案

公众号 微网站开发,网络营销与推广方案,制作灯笼的手工做法步骤,宁波seo关键词培训以下是 Python 与 PyMongo 的完整功能整理,涵盖基础操作、高级功能、性能优化及常见应用场景: 1. 安装与连接 (1) 安装 PyMongo pip install pymongo(2) 连接 MongoDB from pymongo import MongoClient# 基础连接(默认本地,端口…

以下是 Python 与 PyMongo 的完整功能整理,涵盖基础操作、高级功能、性能优化及常见应用场景:


1. 安装与连接
(1) 安装 PyMongo

pip install pymongo

(2) 连接 MongoDB

from pymongo import MongoClient# 基础连接(默认本地,端口27017)
client = MongoClient('mongodb://localhost:27017/')# 带认证的连接
client = MongoClient('mongodb://username:password@host:27017/dbname?authSource=admin'
)# 连接副本集或分片集群
client = MongoClient('mongodb://node1:27017,node2:27017/?replicaSet=rs0')

2. 数据库与集合操作
(1) 选择数据库和集合

db = client['mydatabase']    # 选择数据库(惰性创建)
collection = db['users']     # 选择集合(惰性创建)

(2) 列出数据库和集合

# 列出所有数据库(需权限)
print(client.list_database_names())# 列出数据库中的集合
print(db.list_collection_names())

(3) 删除集合或数据库

db.drop_collection('users')  # 删除集合
client.drop_database('mydatabase')  # 删除数据库

3. 文档操作(CRUD)
(1) 插入文档

# 插入单条文档
user = {'name': 'Alice', 'age': 30, 'tags': ['python', 'dev']}
result = collection.insert_one(user)
print(result.inserted_id)  # 输出插入的 ObjectId# 批量插入
users = [{'name': 'Bob'}, {'name': 'Charlie'}]
result = collection.insert_many(users)
print(result.inserted_ids)  # 输出所有插入的 ObjectId 列表

(2) 查询文档

# 查询单条文档
user = collection.find_one({'name': 'Alice'})
print(user)  # 返回字典或 None# 查询多条文档(带条件、投影和排序)
cursor = collection.find({'age': {'$gt': 25}},           # 条件:年龄大于25{'_id': 0, 'name': 1},          # 投影:仅返回 name 字段
).sort('age', pymongo.ASCENDING)    # 按年龄升序排序for doc in cursor:print(doc)# 统计数量
count = collection.count_documents({'age': {'$gt': 25}})

(3) 更新文档

# 更新单条文档
result = collection.update_one({'name': 'Alice'},{'$set': {'age': 31}, '$addToSet': {'tags': 'database'}}  # 更新操作符
)
print(result.modified_count)  # 输出影响的文档数# 批量更新
result = collection.update_many({'age': {'$lt': 30}},{'$inc': {'age': 1}}  # 年龄加1
)

(4) 删除文档

# 删除单条文档
result = collection.delete_one({'name': 'Alice'})# 批量删除
result = collection.delete_many({'age': {'$gt': 40}})

4. 高级查询与聚合
(1) 查询操作符

# 比较:$gt, $gte, $lt, $lte, $ne
collection.find({'age': {'$gt': 20}})# 逻辑:$and, $or, $not
collection.find({'$or': [{'age': 30}, {'name': 'Bob'}]})# 数组:$in, $nin, $all, $elemMatch
collection.find({'tags': {'$in': ['python', 'java']}})

(2) 聚合管道

pipeline = [{'$match': {'age': {'$gt': 25}}},          # 筛选条件{'$group': {'_id': '$city', 'count': {'$sum': 1}}},  # 按城市分组计数{'$sort': {'count': -1}},                  # 按计数降序排序{'$limit': 5}                              # 取前5条
]result = collection.aggregate(pipeline)
for doc in result:print(doc)

(3) 索引管理

# 创建索引
collection.create_index([('name', pymongo.ASCENDING)], unique=True)# 查看索引
print(collection.index_information())# 删除索引
collection.drop_index('name_1')

5. 事务与原子性操作
(1) 多文档事务(MongoDB 4.0+)

with client.start_session() as session:session.start_transaction()try:collection.update_one({'name': 'Alice'},{'$inc': {'balance': -100}},session=session)collection.update_one({'name': 'Bob'},{'$inc': {'balance': 100}},session=session)session.commit_transaction()except Exception as e:session.abort_transaction()print("事务回滚:", e)

(2) 原子操作符

# 原子更新
collection.find_one_and_update({'name': 'Alice'},{'$inc': {'counter': 1}},return_document=pymongo.ReturnDocument.AFTER  # 返回更新后的文档
)

6. 性能优化与最佳实践
(1) 查询优化
• 使用投影减少返回字段:

collection.find({}, {'_id': 0, 'name': 1})

• 覆盖查询(Covered Query):确保查询字段和投影字段在索引中。

(2) 批量操作

# 批量插入(减少网络开销)
bulk_ops = [pymongo.InsertOne({'name': f'User_{i}'}) for i in range(1000)]
collection.bulk_write(bulk_ops)

(3) 连接池管理

client = MongoClient('mongodb://localhost:27017/',maxPoolSize=100,      # 最大连接数minPoolSize=10,       # 最小空闲连接socketTimeoutMS=3000  # 超时时间
)

7. 数据建模与模式设计
(1) 内嵌文档与引用
• 内嵌文档:适合频繁访问的子数据。

user = {'name': 'Alice','address': {'city': 'New York', 'zip': '10001'}  # 内嵌文档
}

• 引用关系:适合独立实体。

# 用户引用订单
order = {'user_id': user['_id'], 'product': 'Laptop'}

(2) 分片策略
• 选择分片键:高频查询字段(如 user_id)。

• 分片命令(需在 mongos 执行):

sh.shardCollection("mydb.orders", {"user_id": 1})

8. 安全与运维
(1) 认证与权限

# 创建用户
db.command('createUser', 'admin',pwd='secret',roles=[{'role': 'readWrite', 'db': 'mydb'}]
)

(2) 备份与恢复
mongodump 备份:

mongodump --uri="mongodb://user:pass@host:27017/mydb" --out=/backup

mongorestore 恢复:

mongorestore --uri="mongodb://host:27017" /backup/mydb

(3) 监控与日志
• 查看数据库状态:

server_status = db.command('serverStatus')
print(server_status['connections']['available'])

• 启用慢查询日志:

mongod --setParameter slowMS=100 --profileLevel 2

9. 常见应用场景
(1) 日志存储与分析

# 插入日志
log_entry = {'timestamp': datetime.now(),'level': 'INFO','message': 'User login success'
}
collection.insert_one(log_entry)# 分析错误日志数量
pipeline = [{'$match': {'level': 'ERROR'}},{'$group': {'_id': '$service', 'count': {'$sum': 1}}}
]

(2) 实时排行榜

# 更新分数
collection.update_one({'user_id': 1001},{'$inc': {'score': 10}},upsert=True
)# 获取前10名
top_players = collection.find().sort('score', -1).limit(10)

10. 扩展工具与库
(1) 使用 Motor 实现异步操作

from motor.motor_asyncio import AsyncIOMotorClientasync def query_data():client = AsyncIOMotorClient('mongodb://localhost:27017')collection = client.mydb.userscursor = collection.find({'age': {'$gt': 20}})async for doc in cursor:print(doc)

(2) 使用 MongoEngine(ORM)

from mongoengine import Document, StringField, IntFieldclass User(Document):name = StringField(required=True)age = IntField()# 查询数据
users = User.objects(age__gt=25)

总结

功能PyMongo 方法/操作典型场景
基础 CRUDinsert_one, find, update_many数据增删改查
聚合分析aggregate + 管道操作复杂统计、日志分析
事务管理start_session + 事务块转账、订单处理
性能优化索引、批量操作、连接池高并发读写、大数据处理
数据建模内嵌文档、引用关系、分片电商、社交网络、IoT 数据存储

通过合理使用 PyMongo,可以高效操作 MongoDB 应对多样化的数据存储需求,结合 Redis 实现缓存加速,构建高性能应用。

以下是一个基于 pymongo 封装的 MongoDB 基础操作类,支持连接管理、CRUD、索引操作、聚合查询、分页等常用功能:

from typing import Any, Dict, List, Optional, Union
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import PyMongoError
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from pymongo.collection import Collection
from bson import ObjectIdclass MongoDBClient:"""MongoDB 基础操作类"""def __init__(self, uri: str = "mongodb://localhost:27017/",db_name: str = "mydatabase", collection_name: str = "default_collection"):"""初始化 MongoDB 客户端:param uri: MongoDB 连接 URI:param db_name: 数据库名称:param collection_name: 集合名称"""self.client = MongoClient(uri)self.db = self.client[db_name]self.collection: Collection = self.db[collection_name]# ------------------------- 基础 CRUD 操作 -------------------------def insert_one(self, document: Dict) -> Optional[ObjectId]:"""插入单条文档"""try:result: InsertOneResult = self.collection.insert_one(document)return result.inserted_idexcept PyMongoError as e:print(f"Insert one error: {e}")return Nonedef insert_many(self, documents: List[Dict]) -> Optional[List[ObjectId]]:"""批量插入文档"""try:result: InsertManyResult = self.collection.insert_many(documents)return result.inserted_idsexcept PyMongoError as e:print(f"Insert many error: {e}")return Nonedef find_one(self, query: Dict, projection: Optional[Dict] = None) -> Optional[Dict]:"""查询单条文档"""try:return self.collection.find_one(query, projection)except PyMongoError as e:print(f"Find one error: {e}")return Nonedef find(self, query: Dict, projection: Optional[Dict] = None,sort: Optional[List[tuple]] = None,limit: int = 0) -> List[Dict]:"""查询多条文档"""try:cursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)if limit > 0:cursor = cursor.limit(limit)return list(cursor)except PyMongoError as e:print(f"Find error: {e}")return []def update_one(self, query: Dict, update_data: Dict, upsert: bool = False) -> Optional[UpdateResult]:"""更新单条文档"""try:result: UpdateResult = self.collection.update_one(query, {'$set': update_data}, upsert=upsert)return resultexcept PyMongoError as e:print(f"Update one error: {e}")return Nonedef delete_one(self, query: Dict) -> Optional[DeleteResult]:"""删除单条文档"""try:result: DeleteResult = self.collection.delete_one(query)return resultexcept PyMongoError as e:print(f"Delete one error: {e}")return None# ------------------------- 高级操作 -------------------------def create_index(self, keys: List[tuple], unique: bool = False, background: bool = True) -> Optional[str]:"""创建索引"""try:index_name = self.collection.create_index(keys, unique=unique, background=background)return index_nameexcept PyMongoError as e:print(f"Create index error: {e}")return Nonedef count_documents(self, query: Dict) -> int:"""统计文档数量"""try:return self.collection.count_documents(query)except PyMongoError as e:print(f"Count documents error: {e}")return 0def aggregate(self, pipeline: List[Dict]) -> List[Dict]:"""聚合查询"""try:return list(self.collection.aggregate(pipeline))except PyMongoError as e:print(f"Aggregate error: {e}")return []def paginate(self, query: Dict, page: int = 1, per_page: int = 10,sort: Optional[List[tuple]] = None,projection: Optional[Dict] = None) -> Dict:"""分页查询"""try:total = self.count_documents(query)skip = (page - 1) * per_pagecursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)documents = cursor.skip(skip).limit(per_page)return {"total": total,"page": page,"per_page": per_page,"data": list(documents)}except PyMongoError as e:print(f"Paginate error: {e}")return {"total": 0, "page": page, "per_page": per_page, "data": []}# ------------------------- 事务支持 -------------------------def execute_transaction(self, operations: callable) -> bool:"""执行事务操作"""session = self.client.start_session()try:with session.start_transaction():operations(self.collection, session)return Trueexcept PyMongoError as e:session.abort_transaction()print(f"Transaction aborted: {e}")return False# ------------------------- 工具方法 -------------------------@staticmethoddef to_objectid(_id: Union[str, ObjectId]) -> ObjectId:"""将字符串转换为 ObjectId"""return _id if isinstance(_id, ObjectId) else ObjectId(_id)def close(self):"""关闭连接"""self.client.close()# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":# 初始化客户端mongo_client = MongoDBClient(uri="mongodb://user:pass@localhost:27017/",db_name="test_db",collection_name="users")# 插入数据user_id = mongo_client.insert_one({"name": "Alice","age": 30,"email": "alice@example.com"})print(f"Inserted ID: {user_id}")# 查询数据user = mongo_client.find_one({"name": "Alice"})print(f"Found user: {user}")# 分页查询pagination = mongo_client.paginate(query={"age": {"$gt": 20}},page=1,per_page=10,sort=[("age", ASCENDING)])print(f"Page 1 data: {pagination['data']}")# 关闭连接mongo_client.close()

核心功能说明

功能方法说明
连接管理__init__, close支持自定义 URI、数据库和集合名称
CRUD 操作insert_one, find提供单条/批量操作,支持投影和排序
索引管理create_index可创建唯一索引、后台索引
聚合查询aggregate支持完整的聚合管道操作
分页查询paginate返回分页数据和总记录数
事务支持execute_transaction封装多文档事务操作
类型转换to_objectid字符串与 ObjectId 互转

使用场景

  1. 快速开发:直接继承或实例化类,无需重复编写 CRUD 代码。
  2. Web 后端:集成到 FastAPI/Django 服务中,处理用户数据。
  3. 数据分析:通过聚合方法实现复杂统计。
  4. 定时任务:封装数据清洗、日志处理等操作。

优化建议

  1. 连接池配置:在初始化时添加 maxPoolSizeminPoolSize 参数。
  2. 日志记录:将 print 替换为 logging 模块记录错误信息。
  3. 异步支持:使用 motor 库实现异步版本(适合 FastAPI 等异步框架)。
  4. 数据校验:集成 pydantic 对输入数据进行模式验证。
  5. 缓存集成:结合 Redis 实现高频查询缓存。
http://www.dtcms.com/wzjs/554399.html

相关文章:

  • 长沙网站开网站建设方案推广
  • 自己的电脑做网站当服务器使用医疗号网站开发
  • 怎么看别人网站是什么语言做的wordpress怎么改模板
  • 手机上做网站php江西省赣州市定南县
  • 网站内容策划方案香奈儿网站建设的目标
  • 国外网站设计版式欣赏那个网站可以做微课
  • 广州网页模板建站什么软件做电影短视频网站
  • 2016网站设计欣赏怎样自己免费建设一个网站
  • 一般做企业网站需要什么google关键词指数
  • 网站怎么维护更新单位网站建设情况说明书
  • 泗阳网站定制网站建设的具体流程图
  • 做影视后期应该关注哪些网站wordpress网站网速慢
  • 网站建设sem怎么做南充市住房和城乡建设厅官方网站
  • 做网站要用写接口佛山网站建设shundeit
  • 黑色个人网站欣赏wordpress表单代码
  • 常熟网站在线阅读网站建设方案
  • 成都住房和城乡建设局网站首页如何用网络营销推广
  • 学校网站建设计划书怎么免费制作网页
  • 郑州加盟网站建设wordpress解析插件
  • 什么腾讯网站做任务能刷q币优秀seo平台
  • 商城网站怎么做推广黑色时尚橱柜网站源码
  • 东莞seo网站排名优化公司wordpress用什么主机好
  • 不花钱做网站wordpress添加友链
  • 北京网站推广排名服务电子商务网站建设及推广方案论文
  • 站长之家ip地址归属查询装饰公司营销型网站建设
  • 怎么做网站设计你建立的网站使用了那些营销方法
  • 中国建设银行网站密码是什么网站建设去哪可接单
  • 商城网站开发鞍山钟点工招聘信息
  • 小程序管理平台登陆南昌seo服务
  • 网站开发那种语言西安网站建设公司有哪些