从外部平台通过接口获取数据,初始全量同步 和 后续带版本管理的增量同步
✅ 总体思路分为三步:
第一步:第一次全量同步
全量拉取数据写入表(如 order_model 和 order_detail_model)。
每条记录加上 version 字段(默认为 1)。
记录首次同步时间(SyncSettingModel 记录)。
第二步:增量同步(每日)
每次增量拉取数据后,对比是否有变化:
场景 | 动作 |
---|---|
订单为新订单(本地不存在) | 插入新记录,version=1 |
订单存在,但内容发生变化 | 插入新一条记录,version+=1 (历史保留) |
订单存在,且内容完全一样 | 跳过,不处理 |
第三步:查询时只查最新版本
添加字段:version 和 is_latest
每次有新版本时,将旧版本 is_latest=False,新记录 is_latest=True
查询业务数据时只加 is_latest=True 作为过滤条件
✅ 数据表设计建议
以 OrderModel 为例,加上版本控制字段:
class OrderModel(models.Model):order_id = models.CharField(primary_key=True) # 格式:ECCANG123456order_code = models.CharField()...version = models.IntegerField(default=1)is_latest = models.BooleanField(default=True)update_time = models.DateTimeField() # 数据实际更新时间sync_time = models.DateTimeField() # 我们同步时间
✅ 增量同步代码逻辑(伪代码)
# 假设 fetched_data 是 ECCANG 返回的一条订单数据
fetched_order_id = ECCANG + fetched_data["order_id"]
fetched_json = json.dumps(fetched_data, ensure_ascii=False)# 查找本地是否存在
latest_local = OrderModel.objects.filter(order_id=fetched_order_id, is_latest=True).first()if latest_local is None:# 新订单:version=1,is_latest=Trueinsert_new_order(fetched_data, version=1, is_latest=True)
else:# 比较内容是否相同if is_same(fetched_data, latest_local):pass # 不变,跳过else:# 标记旧版本为非最新latest_local.is_latest = Falselatest_local.save(update_fields=["is_latest"])# 插入新版本new_version = latest_local.version + 1insert_new_order(fetched_data, version=new_version, is_latest=True)
is_same() 可以使用关键字段进行哈希比较,或者字段级比较。
✅ 插入方法封装建议
def insert_new_order(data, version, is_latest):order = OrderModel(order_id=ECCANG + data["order_id"],order_code=data["order_code"],...version=version,is_latest=is_latest,update_time=convert_tr_to_non_native_datetime3(data.get("update_date")),sync_time=get_non_native_now())order.save()
✅ 查询数据时的注意点
所有业务查询都要限制:
OrderModel.objects.filter(is_latest=True)