使用FastAPI+Sqlalchemy从一个数据库向另一个数据库更新数据(sql语句版)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# 配置数据库连接(示例为PostgreSQL->MySQL)
SRC_DB_URL = 'postgresql://user:pass@source_host:5432/source_db'
DST_DB_URL = 'mysql+pymysql://user:pass@dest_host:3306/dest_db'
# 创建引擎和会话
src_engine = create_engine(SRC_DB_URL)
dst_engine = create_engine(DST_DB_URL)
SrcSession = sessionmaker(bind=src_engine)
DstSession = sessionmaker(bind=dst_engine)
def migrate_with_raw_sql():
with SrcSession() as src_session, DstSession() as dst_session:
# 从源数据库查询数据(使用原生SQL)
query = text("SELECT id, name, age FROM users WHERE updated_at > :last_update")
src_data = src_session.execute(query, {"last_update": "2025-01-01"}).fetchall()
# 构建批量更新SQL语句
update_sql = text("""
UPDATE users
SET name = :name, age = :age
WHERE id = :id
""")
# 执行批量更新
for row in src_data:
dst_session.execute(
update_sql,
{"id": row.id, "name": row.name, "age": row.age}
)
dst_session.commit()
print(f"通过原生SQL语句完成 {len(src_data)} 条记录更新")
if __name__ == '__main__':
migrate_with_raw_sql()