当前位置: 首页 > news >正文

django 使用绑定多个数据库实现数据的同步

文章目录

    • 1. 配置多数据库
    • 2. 定义模型
    • 3. 跨库同步任务
    • 4. 要点说明
    • 5. 进一步优化
  • 🔑 需求分解
  • 📌 1. 模型定义
  • 📌 2. 字段映射配置
  • 📌 3. 增量批量同步函数
  • 📌 4. Celery 定时任务
  • 📌 5. 特点
  • ⚡️ 建议
    • 📌 1. Redis 配置
    • 📌 2. 同步配置(多表映射)
    • 📌 3. 通用同步服务
    • 📌 4. Celery 任务
    • 📌 5. Celery 定时配置
    • 📌 6. 效果

明白了 ✅ 你的需求是:

  • 项目里有两个数据库:

    • default → SQL Server
    • wms → MySQL
  • 要把 default 中某个表的部分字段,同步到 wms 中的某个表

这类跨库同步在 Django 里完全可以做到,关键在于 多数据库配置 + 数据库路由/指定数据库操作


1. 配置多数据库

settings.py

DATABASES = {"default": {   # SQL Server"ENGINE": "mssql","NAME": "default_db","USER": "sa","PASSWORD": "your_password","HOST": "127.0.0.1","PORT": "1433","OPTIONS": {"driver": "ODBC Driver 17 for SQL Server",},},"wms": {   # MySQL"ENGINE": "django.db.backends.mysql","NAME": "wms_db","USER": "root","PASSWORD": "your_password","HOST": "127.0.0.1","PORT": "3306",},
}

2. 定义模型

假设 default 里有 Customer 表(SQL Server),
wms 里有 WmsCustomer 表(MySQL)。

# myapp/models.py
from django.db import models# default 数据库(SQL Server)
class Customer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()class Meta:managed = False  # 不要让 Django 管理(迁移),因为表已经存在db_table = "customer"app_label = "default_app"# wms 数据库(MySQL)
class WmsCustomer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()class Meta:managed = Falsedb_table = "wms_customer"app_label = "wms_app"

3. 跨库同步任务

写一个简单的管理命令(或者 Celery 任务):

# myapp/management/commands/sync_customers.py
from django.core.management.base import BaseCommand
from myapp.models import Customer, WmsCustomer
from django.db import transactionclass Command(BaseCommand):help = "同步 default.customer 到 wms.wms_customer"def handle(self, *args, **kwargs):customers = Customer.objects.using("default").all()with transaction.atomic(using="wms"):for c in customers:WmsCustomer.objects.using("wms").update_or_create(id=c.id,defaults={"name": c.name,"email": c.email,})self.stdout.write(self.style.SUCCESS("同步完成"))

运行:

python manage.py sync_customers

4. 要点说明

  • .using("xxx") 可以指定操作哪个数据库。
  • transaction.atomic(using="wms") 保证同步到 wms 时是原子性的。
  • update_or_create 用于避免重复插入,按主键 id 匹配。
  • 如果表结构不完全一致,可以只映射需要的字段。

5. 进一步优化

  • 你可以把这个同步逻辑放到 Celery 定时任务,比如每分钟跑一次。
  • 如果是大数据量,最好用 批量操作bulk_create / bulk_update)来提高效率。

要不要我帮你写一个 支持批量同步(每次 1000 条)的 Celery 定时任务版本,避免大表同步时性能问题?

👌 明白了,你的需求升级成了一个 跨库字段映射 + 增量同步 + 批量处理 + Celery 定时任务 的场景。这个在实际项目里非常常见(例如:从 ERP / WMS / CRM 系统同步数据)。我帮你设计一个通用的方案。


🔑 需求分解

  1. 多数据库default (SQL Server) → wms (MySQL)。
  2. 字段映射:源表 10 个字段,目标表只取其中 5 个。
  3. 增量同步:基于 last_modified 字段(源库记录更新时间)。
  4. 批量同步:每次批量插入/更新 1000 条。
  5. 定时任务:Celery Beat 定时触发。

📌 1. 模型定义

# myapp/models.py
from django.db import models# SQL Server 源表 (default 数据库)
class Customer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()phone = models.CharField(max_length=20)address = models.TextField()last_modified = models.DateTimeField()  # 用来做增量同步class Meta:managed = Falsedb_table = "customer"app_label = "default_app"# MySQL 目标表 (wms 数据库)
class WmsCustomer(models.Model):id = models.AutoField(primary_key=True)full_name = models.CharField(max_length=100)email = models.EmailField()contact = models.CharField(max_length=20)updated_at = models.DateTimeField()class Meta:managed = Falsedb_table = "wms_customer"app_label = "wms_app"

📌 2. 字段映射配置

可以用一个字典配置 源字段 → 目标字段

FIELD_MAPPING = {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",
}

📌 3. 增量批量同步函数

# myapp/services/sync_service.py
from django.db import transaction
from myapp.models import Customer, WmsCustomer
from django.utils.timezone import nowFIELD_MAPPING = {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",
}BATCH_SIZE = 1000def sync_customers(last_sync_time=None):"""增量批量同步"""queryset = Customer.objects.using("default").all()if last_sync_time:queryset = queryset.filter(last_modified__gt=last_sync_time)total = queryset.count()print(f"需要同步 {total} 条数据")for start in range(0, total, BATCH_SIZE):batch = queryset.order_by("last_modified")[start:start+BATCH_SIZE]objs = []for src in batch:mapped_data = {FIELD_MAPPING[src_field]: getattr(src, src_field)for src_field in FIELD_MAPPING.keys()}objs.append(WmsCustomer(**mapped_data))with transaction.atomic(using="wms"):WmsCustomer.objects.using("wms").bulk_create(objs, batch_size=BATCH_SIZE, ignore_conflicts=True)return total

📌 4. Celery 定时任务

# myapp/tasks.py
from celery import shared_task
from django.utils.timezone import now
from .services.sync_service import sync_customers
from myapp.models import TaskLog@shared_task
def sync_customers_task():try:# 假设 last_sync_time 可以存储在某个表或缓存中last_sync_time = None  # TODO: 从缓存或数据库中取上次同步时间total = sync_customers(last_sync_time=last_sync_time)# TODO: 同步成功后,更新 last_sync_timeTaskLog.objects.using("wms").create(task_name="sync_customers_task",status="success",message=f"成功同步 {total} 条数据")except Exception as e:TaskLog.objects.using("wms").create(task_name="sync_customers_task",status="failure",message=str(e))raise

celery.py 里添加定时配置:

from celery.schedules import crontabapp.conf.beat_schedule = {"sync-customers-every-5-min": {"task": "myapp.tasks.sync_customers_task","schedule": crontab(minute="*/5"),  # 每 5 分钟执行一次},
}

📌 5. 特点

  • 批量写入bulk_create,一次性提交。
  • 增量同步:根据 last_modified,只同步变化的。
  • 字段映射:通过字典配置,不需要硬编码。
  • Celery 定时:可配置为 5 秒、5 分钟或每天定时。
  • 日志记录:用 TaskLog 表存储执行状态,可在 Django Admin 查看。

⚡️ 建议

如果 数据量特别大(几十万+),可以考虑:

  1. 先在 SQL Server 里加索引(针对 last_modified)。
  2. iterator() 避免一次性加载所有数据。
  3. 如果目标库里可能有数据更新,考虑用 bulk_update 替代 bulk_create

要不要我帮你把这个 同步服务 改成支持 配置多个表的映射规则(比如一个配置文件里定义:源表 A → 目标表 B,源表 C → 目标表 D),这样就能用一套框架同步任意表了?

好主意 ✅
你想要的就是做一个 通用的跨库同步服务

  • 支持 多个表的配置化同步
  • 每个表的字段映射可以配置
  • 增量同步:基于 last_modified
  • Redis 存储每个表的 last_sync_time,避免落地数据库

我给你一个完整的方案:


📌 1. Redis 配置

settings.py

# Redis 用于保存每个表的 last_sync_time
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 1

新建一个 redis 客户端工具 utils/redis_client.py

import redis
from django.conf import settingsredis_client = redis.StrictRedis(host=settings.REDIS_HOST,port=settings.REDIS_PORT,db=settings.REDIS_DB,decode_responses=True
)

📌 2. 同步配置(多表映射)

myapp/sync_config.py

SYNC_CONFIG = {"customer": {"source_model": "myapp.Customer","target_model": "myapp.WmsCustomer","field_mapping": {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",},"batch_size": 1000,},"order": {"source_model": "myapp.Order","target_model": "myapp.WmsOrder","field_mapping": {"order_no": "order_code","amount": "total_price","last_modified": "updated_at",},"batch_size": 500,}
}

📌 3. 通用同步服务

myapp/services/sync_service.py

import importlib
from django.db import transaction
from django.utils.dateparse import parse_datetime
from utils.redis_client import redis_clientdef load_model(model_path):"""动态加载模型: e.g. 'myapp.Customer'"""app, model = model_path.split(".")return getattr(importlib.import_module(app + ".models"), model)def sync_table(table_name, config):"""同步单个表"""source_model = load_model(config["source_model"])target_model = load_model(config["target_model"])field_mapping = config["field_mapping"]batch_size = config.get("batch_size", 1000)# 从 Redis 获取上次同步时间last_sync_str = redis_client.get(f"sync:{table_name}:last_sync")last_sync_time = parse_datetime(last_sync_str) if last_sync_str else None# 增量查询queryset = source_model.objects.using("default").all()if last_sync_time:queryset = queryset.filter(last_modified__gt=last_sync_time)total = queryset.count()print(f"[{table_name}] 待同步 {total} 条数据")last_updated = Nonefor start in range(0, total, batch_size):batch = queryset.order_by("last_modified")[start:start+batch_size]objs = []for src in batch:mapped_data = {field_mapping[src_field]: getattr(src, src_field)for src_field in field_mapping.keys()}objs.append(target_model(**mapped_data))last_updated = getattr(src, "last_modified", None)with transaction.atomic(using="wms"):target_model.objects.using("wms").bulk_create(objs, batch_size=batch_size, ignore_conflicts=True)# 更新 Redis 中的 last_sync_timeif last_updated:redis_client.set(f"sync:{table_name}:last_sync", last_updated.isoformat())return total

📌 4. Celery 任务

myapp/tasks.py

from celery import shared_task
from myapp.sync_config import SYNC_CONFIG
from myapp.services.sync_service import sync_table
from myapp.models import TaskLog@shared_task
def sync_all_tables_task():for table_name, config in SYNC_CONFIG.items():try:total = sync_table(table_name, config)TaskLog.objects.using("wms").create(task_name=f"sync_{table_name}",status="success",message=f"同步 {total} 条数据")except Exception as e:TaskLog.objects.using("wms").create(task_name=f"sync_{table_name}",status="failure",message=str(e))raise

📌 5. Celery 定时配置

celery.py

from celery.schedules import crontabapp.conf.beat_schedule = {"sync-all-tables-every-5-min": {"task": "myapp.tasks.sync_all_tables_task","schedule": crontab(minute="*/5"),  # 每 5 分钟同步一次},
}

📌 6. 效果

  • 配置化sync_config.py 里定义任意表的同步规则(源表、目标表、字段映射)。
  • 增量同步:基于 last_modified,只同步有变动的数据。
  • 批量处理bulk_create 分批写入。
  • Redis 记录:每个表的 last_sync_time 都存在 Redis,不污染数据库。
  • 后台可查:执行结果记录在 TaskLog,可在 Django Admin 查看。

⚡ 这个框架可以扩展成 通用数据同步平台
要不要我帮你加上 支持删除同步(比如 SQL Server 有数据删除,MySQL 也要同步删除),这样就是“完全镜像同步”?

http://www.dtcms.com/a/406604.html

相关文章:

  • 面试复习题---Android技术专家3
  • 云手机与人工智能之间的关系
  • 做网站对电脑要求高吗荆州市住房和城乡建设厅官方网站
  • Python 高效实现 PDF 转 Word:告别手动复制粘贴
  • 9.9元奶茶项目:matlab+FPGA的cordic算法(向量模式)计算相位角
  • 广州越秀公司网站建设电子商务基础网站建设与维护单项选择题
  • 网站技术培训学校世界知名网站
  • 专业做消防工程师的正规网站做网站百度推广多少钱
  • mysql的 启动 与 客户端连接
  • 【星海出品】rabbitMQ队列处理深入研究
  • Mysql常见八股文
  • 社交类网站开发需求怎么做个人网页
  • 解析01背包
  • WitTkChart:基于Python tkinter Canvas的开源图表可视化库
  • NIST公布后量子加密标准的第五种算法HQC
  • NetCore+Web客户端实现gRPC实时推送
  • Bugku-想蹭网先解开密码
  • 房屋管理系统开发流程
  • 新浪云怎么做淘宝客网站科技与狠活是什么意思
  • Vue3 v-slot 详解与示例
  • Agno 架构介绍:高性 Multi-agent 系统框架深度解析
  • 哪里有南宁网站建设天河区建设和水务局网站
  • Cadence Allegro 电子设计 快问快答--03.OrCAD颜色在哪里设置?
  • 自己做的网站怎么上排行榜设计广告专业制作
  • 网站页面下沉的特效代码山西seo推广系统
  • S7-200 SMART GET/PUT 指令深度解析:从参数到故障排查(S7 协议的客户端 - 服务器通信)下
  • 做国外网站翻译中国小说赚钱外贸推广软件有哪些
  • 二次封装科技风大屏element-ui弹窗
  • 【JavaScript 性能优化实战】第六篇:性能监控与自动化优化
  • 沃地泰双奖加冕2025农机大奖,以创新科技赋能智慧农业