当前位置: 首页 > news >正文

增量同步 + 双库写入 + 时间游标更新

✅ 目标:构建一个通用、可配置的跨数据库同步框架

✅ 支持功能

  • ✅ 多数据表、可配置同步(不只 supplier)
  • ✅ 增量同步(基于修改时间)
  • ✅ 批量读取(分页 / 批量)
  • ✅ 双库事务保证(成功才更新时间)
  • ✅ 日志完善、异常捕获
  • ✅ Celery 定时调用支持

🏗️ 一、目录结构建议

project/
├── business_service/
│   ├── sync_framework/
│   │   ├── __init__.py
│   │   ├── base_sync.py        # 通用同步类
│   │   ├── sync_config.py      # 表配置
│   │   ├── sync_runner.py      # 执行入口
│   │   ├── utils.py            # 通用方法(分页游标、SQL构建)
│   │   ├── logs/
│   │   │   └── sync.log

🧩 二、同步配置文件(sync_config.py

SYNC_CONFIG = {"supplier": {"source_db": "default",  # SQL Server"target_db": "wms",      # MySQL"table_name": "wms_supplier","batch_size": 100,"update_field": "last_time","source_sql": """SELECT TOP {batch_size}a.FSUPPLIERID as supplier_id,a.FDocumentStatus as supplier_document_status,a.FForbidStatus as supplier_forbide_status,a.FNumber as supplier_number,a.FPRIMARYGROUP as supplier_group_id,a.FMASTERID as master_id,a.FUSEORGID as user_org_id,a.FMODIFYDATE as last_time,(SELECT L.FNAME FROM dbo.T_BD_SUPPLIER_L L WHERE L.FSUPPLIERID = a.FSUPPLIERID) AS supplier_name,(SELECT G.FNUMBER FROM dbo.T_BD_SUPPLIERGROUP G WHERE G.FID = a.FPRIMARYGROUP) AS supplier_group_number,(SELECT GL.FNAME FROM dbo.T_BD_SUPPLIERGROUP_L GL WHERE GL.FID = a.FPRIMARYGROUP) AS supplier_group_nameFROM dbo.T_BD_SUPPLIER aWHERE a.FDOCUMENTSTATUS='C' AND a.FFORBIDSTATUS='A' AND a.FMODIFYDATE > '{last_time}'ORDER BY a.FMODIFYDATE ASC""","insert_sql": """INSERT INTO wms_supplier (supplier_id, supplier_document_status, supplier_forbide_status, supplier_number,supplier_group_id, master_id, user_org_id, supplier_name,supplier_group_number, supplier_group_name, last_time) VALUES {values}ON DUPLICATE KEY UPDATEsupplier_document_status=VALUES(supplier_document_status),supplier_forbide_status=VALUES(supplier_forbide_status),supplier_number=VALUES(supplier_number),supplier_group_id=VALUES(supplier_group_id),master_id=VALUES(master_id),user_org_id=VALUES(user_org_id),supplier_name=VALUES(supplier_name),supplier_group_number=VALUES(supplier_group_number),supplier_group_name=VALUES(supplier_group_name),last_time=VALUES(last_time)"""}
}

🧠 三、通用同步逻辑(base_sync.py

import logging
from django.db import connections, transaction
from django.utils import timezone
from .sync_config import SYNC_CONFIG
from wms.models import TableUpdateRecordlogger = logging.getLogger(__name__)class TableUpdateManager:"""维护最后更新时间的工具类"""@staticmethoddef get_last_update(table_name):record = TableUpdateRecord.objects.filter(table_name=table_name).first()return record.last_updated_at if record else None@staticmethoddef update_time(table_name, update_time):obj, _ = TableUpdateRecord.objects.update_or_create(table_name=table_name,defaults={"last_updated_at": update_time, "modified_at": timezone.now()})return objclass DataSyncService:"""通用表同步服务"""def __init__(self, table_key: str):if table_key not in SYNC_CONFIG:raise ValueError(f"未知同步表配置: {table_key}")self.conf = SYNC_CONFIG[table_key]self.table_key = table_keydef fetch_source_data(self, last_time):"""从源数据库读取增量数据"""sql = self.conf["source_sql"].format(last_time=last_time.strftime("%Y-%m-%d %H:%M:%S"),batch_size=self.conf["batch_size"])with connections[self.conf["source_db"]].cursor() as cursor:cursor.execute(sql)rows = cursor.fetchall()cols = [col[0] for col in cursor.description]return [dict(zip(cols, r)) for r in rows]def insert_target_data(self, data):"""批量插入目标数据库"""if not data:returnwith connections[self.conf["target_db"]].cursor() as cursor:values = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",(d["supplier_id"],d["supplier_document_status"],d["supplier_forbide_status"],d["supplier_number"],d["supplier_group_id"],d["master_id"],d["user_org_id"],d["supplier_name"],d["supplier_group_number"],d["supplier_group_name"],d["last_time"],),).decode("utf-8")for d in data)sql = self.conf["insert_sql"].format(values=values)cursor.execute(sql)@transaction.atomic(using="wms")def sync(self):"""同步主函数"""try:last_time = TableUpdateManager.get_last_update(self.table_key) or timezone.datetime(2000, 1, 1)data = self.fetch_source_data(last_time)if not data:logger.info(f"[{self.table_key}] 无需更新,当前时间: {last_time}")returnself.insert_target_data(data)new_time = data[-1]["last_time"]TableUpdateManager.update_time(self.table_key, new_time)logger.info(f"[{self.table_key}] 同步成功, 条数={len(data)}, 更新时间={new_time}")except Exception as e:logger.exception(f"[{self.table_key}] 同步失败: {e}")raise

⚙️ 四、统一入口(sync_runner.py

from .base_sync import DataSyncServicedef run_sync(table_key="supplier"):service = DataSyncService(table_key)service.sync()

然后你可以在:

python manage.py shell

里执行:

from business_service.sync_framework.sync_runner import run_sync
run_sync("supplier")

🕒 五、Celery 定时任务集成

from celery import shared_task
from business_service.sync_framework.sync_runner import run_sync@shared_task
def sync_supplier_task():run_sync("supplier")

然后加到 Celery Beat 定时任务中即可。


✅ 六、可扩展性

你只需要在 sync_config.py 中增加新表:

"material": {...},
"customer": {...},

就能自动被框架支持,无需改同步逻辑。


🧩 七、事务与可靠性保证

  • @transaction.atomic(using="wms") 确保 MySQL 端写入 + 时间记录为一个原子操作;
  • SQL Server 读取只读事务;
  • 所有异常会被捕获记录日志;
  • Celery 可结合重试机制实现补偿。
http://www.dtcms.com/a/450458.html

相关文章:

  • python爬虫爬小说来做网站wordpress分类设置主题
  • 太原网站定制python django做的网站
  • 普法网站建设方案网站开发谢辞
  • wordpress上传doc文件大小昆明二级站seo整站优化排名
  • 力扣136.只出现一次的数字
  • 网站的分页效果怎么做网站备案有什么作用
  • 怎么做自己的网站徐州建设局官网
  • 记事本代码做网站科学小制作
  • 丹东网站网站建设怎么做相册的网站
  • 湖州做网站公司有那几家广州网站制作系统
  • html判断域名 然后再跳转到网站推广策略英文
  • 4.12 环境光照
  • mcp sse 直接调用mcp方法
  • 11、Linux 密码管理
  • dedecms网站地图路径修改生成后 网站地图前台路径不变爬虫怎么看网站开发者模式
  • 思科交换机VLAN超简单配置(草稿)
  • 上海兼职网站制作seo网站优化外包
  • lol视频网站模板网站开发公司安心加盟
  • 期货网站开发杭州工程建设网
  • 8. 直线方程式
  • 小红书网站开发费用电脑做服务器上传网站
  • 丘受网站谁做的网球吧电商是做什么
  • daya87—字符串—同构字符串(LeetCode-205)
  • AI编程开发系统018-基于Vue+SpringBoot的付费自习室管理系统(源码+部署说明+演示视频+源码介绍+lw)
  • 网站建设合同前期需注意哪些问题海口财务
  • 写网站的教程宝安专业网站建设
  • 黑龙江营商环境建设局网站呼和浩特市做网站公司好的
  • Redis核心通用命令深度解析
  • 官方网站的推广策划怎么做企业查询宝下载
  • python如何控制电脑鼠标