django 使用绑定多个数据库实现数据的同步
文章目录
- 1. 配置多数据库
- 2. 定义模型
- 3. 跨库同步任务
- 4. 要点说明
- 5. 进一步优化
- 🔑 需求分解
- 📌 1. 模型定义
- 📌 2. 字段映射配置
- 📌 3. 增量批量同步函数
- 📌 4. Celery 定时任务
- 📌 5. 特点
- ⚡️ 建议
- 📌 1. Redis 配置
- 📌 2. 同步配置(多表映射)
- 📌 3. 通用同步服务
- 📌 4. Celery 任务
- 📌 5. Celery 定时配置
- 📌 6. 效果
明白了 ✅ 你的需求是:
-
项目里有两个数据库:
default
→ SQL Serverwms
→ MySQL
-
要把
default
中某个表的部分字段,同步到wms
中的某个表。
这类跨库同步在 Django 里完全可以做到,关键在于 多数据库配置 + 数据库路由/指定数据库操作。
1. 配置多数据库
在 settings.py
:
DATABASES = {"default": { # SQL Server"ENGINE": "mssql","NAME": "default_db","USER": "sa","PASSWORD": "your_password","HOST": "127.0.0.1","PORT": "1433","OPTIONS": {"driver": "ODBC Driver 17 for SQL Server",},},"wms": { # MySQL"ENGINE": "django.db.backends.mysql","NAME": "wms_db","USER": "root","PASSWORD": "your_password","HOST": "127.0.0.1","PORT": "3306",},
}
2. 定义模型
假设 default
里有 Customer
表(SQL Server),
wms
里有 WmsCustomer
表(MySQL)。
# myapp/models.py
from django.db import models# default 数据库(SQL Server)
class Customer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()class Meta:managed = False # 不要让 Django 管理(迁移),因为表已经存在db_table = "customer"app_label = "default_app"# wms 数据库(MySQL)
class WmsCustomer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()class Meta:managed = Falsedb_table = "wms_customer"app_label = "wms_app"
3. 跨库同步任务
写一个简单的管理命令(或者 Celery 任务):
# myapp/management/commands/sync_customers.py
from django.core.management.base import BaseCommand
from myapp.models import Customer, WmsCustomer
from django.db import transactionclass Command(BaseCommand):help = "同步 default.customer 到 wms.wms_customer"def handle(self, *args, **kwargs):customers = Customer.objects.using("default").all()with transaction.atomic(using="wms"):for c in customers:WmsCustomer.objects.using("wms").update_or_create(id=c.id,defaults={"name": c.name,"email": c.email,})self.stdout.write(self.style.SUCCESS("同步完成"))
运行:
python manage.py sync_customers
4. 要点说明
.using("xxx")
可以指定操作哪个数据库。transaction.atomic(using="wms")
保证同步到wms
时是原子性的。update_or_create
用于避免重复插入,按主键id
匹配。- 如果表结构不完全一致,可以只映射需要的字段。
5. 进一步优化
- 你可以把这个同步逻辑放到 Celery 定时任务,比如每分钟跑一次。
- 如果是大数据量,最好用 批量操作(
bulk_create
/bulk_update
)来提高效率。
要不要我帮你写一个 支持批量同步(每次 1000 条)的 Celery 定时任务版本,避免大表同步时性能问题?
👌 明白了,你的需求升级成了一个 跨库字段映射 + 增量同步 + 批量处理 + Celery 定时任务 的场景。这个在实际项目里非常常见(例如:从 ERP / WMS / CRM 系统同步数据)。我帮你设计一个通用的方案。
🔑 需求分解
- 多数据库:
default
(SQL Server) →wms
(MySQL)。 - 字段映射:源表 10 个字段,目标表只取其中 5 个。
- 增量同步:基于
last_modified
字段(源库记录更新时间)。 - 批量同步:每次批量插入/更新 1000 条。
- 定时任务:Celery Beat 定时触发。
📌 1. 模型定义
# myapp/models.py
from django.db import models# SQL Server 源表 (default 数据库)
class Customer(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=100)email = models.EmailField()phone = models.CharField(max_length=20)address = models.TextField()last_modified = models.DateTimeField() # 用来做增量同步class Meta:managed = Falsedb_table = "customer"app_label = "default_app"# MySQL 目标表 (wms 数据库)
class WmsCustomer(models.Model):id = models.AutoField(primary_key=True)full_name = models.CharField(max_length=100)email = models.EmailField()contact = models.CharField(max_length=20)updated_at = models.DateTimeField()class Meta:managed = Falsedb_table = "wms_customer"app_label = "wms_app"
📌 2. 字段映射配置
可以用一个字典配置 源字段 → 目标字段:
FIELD_MAPPING = {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",
}
📌 3. 增量批量同步函数
# myapp/services/sync_service.py
from django.db import transaction
from myapp.models import Customer, WmsCustomer
from django.utils.timezone import nowFIELD_MAPPING = {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",
}BATCH_SIZE = 1000def sync_customers(last_sync_time=None):"""增量批量同步"""queryset = Customer.objects.using("default").all()if last_sync_time:queryset = queryset.filter(last_modified__gt=last_sync_time)total = queryset.count()print(f"需要同步 {total} 条数据")for start in range(0, total, BATCH_SIZE):batch = queryset.order_by("last_modified")[start:start+BATCH_SIZE]objs = []for src in batch:mapped_data = {FIELD_MAPPING[src_field]: getattr(src, src_field)for src_field in FIELD_MAPPING.keys()}objs.append(WmsCustomer(**mapped_data))with transaction.atomic(using="wms"):WmsCustomer.objects.using("wms").bulk_create(objs, batch_size=BATCH_SIZE, ignore_conflicts=True)return total
📌 4. Celery 定时任务
# myapp/tasks.py
from celery import shared_task
from django.utils.timezone import now
from .services.sync_service import sync_customers
from myapp.models import TaskLog@shared_task
def sync_customers_task():try:# 假设 last_sync_time 可以存储在某个表或缓存中last_sync_time = None # TODO: 从缓存或数据库中取上次同步时间total = sync_customers(last_sync_time=last_sync_time)# TODO: 同步成功后,更新 last_sync_timeTaskLog.objects.using("wms").create(task_name="sync_customers_task",status="success",message=f"成功同步 {total} 条数据")except Exception as e:TaskLog.objects.using("wms").create(task_name="sync_customers_task",status="failure",message=str(e))raise
在 celery.py
里添加定时配置:
from celery.schedules import crontabapp.conf.beat_schedule = {"sync-customers-every-5-min": {"task": "myapp.tasks.sync_customers_task","schedule": crontab(minute="*/5"), # 每 5 分钟执行一次},
}
📌 5. 特点
- 批量写入:
bulk_create
,一次性提交。 - 增量同步:根据
last_modified
,只同步变化的。 - 字段映射:通过字典配置,不需要硬编码。
- Celery 定时:可配置为 5 秒、5 分钟或每天定时。
- 日志记录:用
TaskLog
表存储执行状态,可在 Django Admin 查看。
⚡️ 建议
如果 数据量特别大(几十万+),可以考虑:
- 先在 SQL Server 里加索引(针对
last_modified
)。 - 用
iterator()
避免一次性加载所有数据。 - 如果目标库里可能有数据更新,考虑用
bulk_update
替代bulk_create
。
要不要我帮你把这个 同步服务 改成支持 配置多个表的映射规则(比如一个配置文件里定义:源表 A → 目标表 B,源表 C → 目标表 D),这样就能用一套框架同步任意表了?
好主意 ✅
你想要的就是做一个 通用的跨库同步服务:
- 支持 多个表的配置化同步
- 每个表的字段映射可以配置
- 增量同步:基于
last_modified
- Redis 存储每个表的 last_sync_time,避免落地数据库
我给你一个完整的方案:
📌 1. Redis 配置
在 settings.py
:
# Redis 用于保存每个表的 last_sync_time
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 1
新建一个 redis 客户端工具 utils/redis_client.py
:
import redis
from django.conf import settingsredis_client = redis.StrictRedis(host=settings.REDIS_HOST,port=settings.REDIS_PORT,db=settings.REDIS_DB,decode_responses=True
)
📌 2. 同步配置(多表映射)
在 myapp/sync_config.py
:
SYNC_CONFIG = {"customer": {"source_model": "myapp.Customer","target_model": "myapp.WmsCustomer","field_mapping": {"name": "full_name","email": "email","phone": "contact","last_modified": "updated_at",},"batch_size": 1000,},"order": {"source_model": "myapp.Order","target_model": "myapp.WmsOrder","field_mapping": {"order_no": "order_code","amount": "total_price","last_modified": "updated_at",},"batch_size": 500,}
}
📌 3. 通用同步服务
在 myapp/services/sync_service.py
:
import importlib
from django.db import transaction
from django.utils.dateparse import parse_datetime
from utils.redis_client import redis_clientdef load_model(model_path):"""动态加载模型: e.g. 'myapp.Customer'"""app, model = model_path.split(".")return getattr(importlib.import_module(app + ".models"), model)def sync_table(table_name, config):"""同步单个表"""source_model = load_model(config["source_model"])target_model = load_model(config["target_model"])field_mapping = config["field_mapping"]batch_size = config.get("batch_size", 1000)# 从 Redis 获取上次同步时间last_sync_str = redis_client.get(f"sync:{table_name}:last_sync")last_sync_time = parse_datetime(last_sync_str) if last_sync_str else None# 增量查询queryset = source_model.objects.using("default").all()if last_sync_time:queryset = queryset.filter(last_modified__gt=last_sync_time)total = queryset.count()print(f"[{table_name}] 待同步 {total} 条数据")last_updated = Nonefor start in range(0, total, batch_size):batch = queryset.order_by("last_modified")[start:start+batch_size]objs = []for src in batch:mapped_data = {field_mapping[src_field]: getattr(src, src_field)for src_field in field_mapping.keys()}objs.append(target_model(**mapped_data))last_updated = getattr(src, "last_modified", None)with transaction.atomic(using="wms"):target_model.objects.using("wms").bulk_create(objs, batch_size=batch_size, ignore_conflicts=True)# 更新 Redis 中的 last_sync_timeif last_updated:redis_client.set(f"sync:{table_name}:last_sync", last_updated.isoformat())return total
📌 4. Celery 任务
在 myapp/tasks.py
:
from celery import shared_task
from myapp.sync_config import SYNC_CONFIG
from myapp.services.sync_service import sync_table
from myapp.models import TaskLog@shared_task
def sync_all_tables_task():for table_name, config in SYNC_CONFIG.items():try:total = sync_table(table_name, config)TaskLog.objects.using("wms").create(task_name=f"sync_{table_name}",status="success",message=f"同步 {total} 条数据")except Exception as e:TaskLog.objects.using("wms").create(task_name=f"sync_{table_name}",status="failure",message=str(e))raise
📌 5. Celery 定时配置
在 celery.py
:
from celery.schedules import crontabapp.conf.beat_schedule = {"sync-all-tables-every-5-min": {"task": "myapp.tasks.sync_all_tables_task","schedule": crontab(minute="*/5"), # 每 5 分钟同步一次},
}
📌 6. 效果
- 配置化:
sync_config.py
里定义任意表的同步规则(源表、目标表、字段映射)。 - 增量同步:基于
last_modified
,只同步有变动的数据。 - 批量处理:
bulk_create
分批写入。 - Redis 记录:每个表的
last_sync_time
都存在 Redis,不污染数据库。 - 后台可查:执行结果记录在
TaskLog
,可在 Django Admin 查看。
⚡ 这个框架可以扩展成 通用数据同步平台。
要不要我帮你加上 支持删除同步(比如 SQL Server 有数据删除,MySQL 也要同步删除),这样就是“完全镜像同步”?