当前位置: 首页 > news >正文

使用pymongo进行MongoDB的回收

在 PyMongo 中使用 compact 命令进行 MongoDB 碎片回收的完整操作指南如下:

一、核心执行方法

from pymongo import MongoClient
import time# 1. 连接到 MongoDB 实例
client = MongoClient("mongodb://username:password@host:27017/dbname?authSource=admin")# 2. 选择目标数据库和集合
db = client["your_database"]
collection = db["your_collection"]# 3. 执行 compact 命令
try:# 执行碎片回收(返回操作ID)result = db.command("compact", collection.name)print(f"Compact operation started. Operation ID: {result['operationTime']}")# 监控操作进度(可选)operation_id = result["operationTime"]while True:current_ops = db.command("currentOp", {"operationTime": operation_id})if not current_ops.get("inprog", []):breakprint("Compact in progress...")time.sleep(10)print("✅ Compact completed successfully!")except Exception as e:print(f"❌ Compact failed: {str(e)}")
finally:client.close()

二、关键参数配置

# 添加额外参数(副本集secondary节点需要force)
result = db.command("compact", collection.name,force=True,              # 强制在secondary节点运行compression={"type": "zlib"},  # 指定压缩算法paddingFactor=1.1,        # 预留空间因子(0-4.0)maxPaddingBytes=1024,     # 最大填充字节tieredStorage={"useRecycledSpace": True}  # Atlas专用
)

三、集群环境操作方案

1. 副本集自动滚动执行
rs_members = ["rs1/mongo1:27017","rs1/mongo2:27017","rs1/mongo3:27017"
]for member in rs_members:member_client = MongoClient(f"mongodb://user:pass@{member}/admin?replicaSet=rs1")# 检查节点类型is_primary = member_client.admin.command("isMaster").get("ismaster")# 降级主节点(每次处理前)if is_primary:member_client.admin.command("replSetStepDown", 300)  # 降级300秒try:db = member_client["your_db"]db.command("compact", "your_collection", force=True)print(f"✅ Compact completed on {member}")except Exception as e:print(f"❌ Failed on {member}: {str(e)}")finally:member_client.close()
2. 分片集群自动处理
# 通过Config Server获取分片列表
config_client = MongoClient("mongodb://config_server:27019")
shards = config_client.config.shards.find()for shard in shards:shard_name = shard["_id"]shard_host = shard["host"].split("/")[-1]  # 提取主机地址try:shard_client = MongoClient(f"mongodb://{shard_host}/admin")# 确认是分片主节点if shard_client.admin.command("isMaster").get("ismaster"):db = shard_client["your_db"]db.command("compact", "your_collection")print(f"✅ Compact on shard {shard_name} completed")else:print(f"⚠️ {shard_host} is not primary, skipped")except Exception as e:print(f"❌ Shard {shard_name} failed: {str(e)}")finally:shard_client.close()

四、操作结果验证

# 对比前后存储状态
pre_stats = collection.stats()
# ... compact 执行 ...
post_stats = collection.stats()print(f"存储优化报告:")
print(f"- 原始大小: {pre_stats['storageSize'] / 1024**2:.2f} MB")
print(f"- 优化后: {post_stats['storageSize'] / 1024**2:.2f} MB")
print(f"- 节省空间: {(pre_stats['storageSize'] - post_stats['storageSize']) / 1024**2:.2f} MB")
print(f"- 碎片率: {100 * (pre_stats['size'] / pre_stats['storageSize'] - 1):.1f}% → "f"{100 * (post_stats['size'] / post_stats['storageSize'] - 1):.1f}%")

五、安全操作注意事项

  1. 阻塞机制处理

    # 检查当前操作是否被阻塞
    if db.current_op({"command.compact": {"$exists": True}}):print("⚠️ Another compact already running")exit()# 设置超时自动中断
    client = MongoClient(connectTimeoutMS=30000, socketTimeoutMS=3600000)
    
  2. 磁盘空间保障

    # 检查磁盘空间
    disk_stats = client.admin.command("fsInfo")
    free_space = disk_stats["fsUsedSize"] - disk_stats["fsTotalSize"]
    coll_size = collection.stats()["storageSize"]if free_space < coll_size * 1.5:print(f"❌ Insufficient disk space. Need {coll_size*1.5} bytes, only {free_space} available")exit()
    
  3. Atlas 云服务专用

    # Atlas需要特殊授权
    client = MongoClient(connect_string, authMechanism="MONGODB-AWS")# 使用分层存储API
    compact_opts = {"tieredStorage": {"useRecycledSpace": True,"reclaimSpace": True}
    }
    

六、替代方案实现

无损在线重建方案

def online_recompact(db_name, coll_name):temp_name = f"{coll_name}_compact_{int(time.time())}"# 1. 创建临时集合db.command("create", temp_name)# 2. 逐步复制数据(避免大事务阻塞)source = db[coll_name]dest = db[temp_name]batch_size = 1000total_docs = source.count_documents({})for skip in range(0, total_docs, batch_size):docs = source.find().skip(skip).limit(batch_size)dest.insert_many(list(docs))# 3. 原集合原子替换source.rename(f"old_{coll_name}", dropTarget=True)dest.rename(coll_name)db[f"old_{coll_name}"].drop()

最佳实践总结

  1. 执行窗口选择

    # 获取当前时间并判断
    from datetime import datetime
    current_hour = datetime.now().hour
    if 0 <= current_hour < 5:  # 凌晨执行run_compact()
    else:print("⚠️ Operation declined: Not in maintenance window")
    
  2. 定时清理脚本框架

    import schedule
    import timedef weekly_compact():shard_cluster_compact()  # 调用前述集群函数# 每周日凌晨1点执行
    schedule.every().sunday.at("01:00").do(weekly_compact)while True:schedule.run_pending()time.sleep(60)
    
  3. 健康检查指标

    HEALTH_THRESHOLD = 0.8  # 碎片率阈值def needs_compact(collection):stats = collection.stats()fragmentation = 1 - (stats["size"] / stats["storageSize"])return fragmentation > HEALTH_THRESHOLD# 自动检测执行
    if needs_compact(collection):run_compact(collection)
    

关键提示:在MongoDB Atlas中,建议启用https://docs.atlas.mongodb.com/tiered-storage/替代手动compact。对10GB以上的集合操作时,优先采用online_recompact方案确保业务连续性。

http://www.dtcms.com/a/291065.html

相关文章:

  • JAVA高级第七章输入和输出处理(二)
  • 前缀和题目:元素和小于等于阈值的正方形的最大边长
  • PostgreSQL高可用架构Repmgr部署流程
  • 按需搭建web网站
  • 【2025】Vscode Python venv虚拟环境显示“激活终端”成功但是在终端中“并没有激活成功”,pip安装还是会安装到全局环境中的解决方法;
  • CataPro本地安装教程--No GPU--cpu only模式--网络资料整理
  • Android Navigation 组件:简化应用导航的利器
  • [硬件电路-67]:模拟器件 - 高输入阻抗、低输出阻抗本质:最小化能量的汲取,最大化能量传递
  • Dynamics 365 Contact Center是什么
  • NX636NX644美光固态闪存NX663NX665
  • MySQL笔记4
  • 行业实例-国产中望3D曲面建模如何实现电脑精准+协同设计
  • AI绘画生成东汉末年黄忠全身像提示词
  • 第二阶段-第二章—8天Python从入门到精通【itheima】-134节(SQL——DQL——分组聚合)
  • ansible批量部署zabbix客户端
  • 2024年ASOC SCI2区TOP,基于Jaya算法的粒子滤波器用于非线性模型贝叶斯更新,深度解析+性能实测
  • (十九)深入了解 AVFoundation-编辑:使用 AVMutableVideoComposition 实现视频加水印与图层合成(上)——理论篇
  • 【每日算法】专题四_前缀和
  • 算法-比较排序
  • Redis入门教程(一):基本数据类型
  • ppp实验
  • BEVformer个人理解与解读
  • 2025暑期—02卷积与滤波-边缘检测
  • 180页PPT烟草集团物流数字化架构设计咨询指南
  • 牛客网题解 | 单词识别
  • 宝塔访问lnmp项目,跳转不到项目根目录问题解决
  • Spring关于依赖注入的几种方式和Spring配置文件的标签
  • 大模型后训练——SFT实践
  • (SAM)Segment Anything论文精读(逐段解析)
  • 磁悬浮轴承振动的智能克星:自适应陷波器设计与DSP实现全解析