当前位置: 首页 > news >正文

SqlServer整库迁移至Oracle


import pandas as pd
from sqlalchemy import create_engine, text
import cx_Oracle
from sqlalchemy.exc import DatabaseError
import traceback

# SQL Server 配置
sql_server_conn_str = 'mssql+pyodbc://用户名:密码@数据库地址:端口/库名?driver=ODBC+Driver+11+for+SQL+Server'
sql_server_engine = create_engine(sql_server_conn_str)

# Oracle 配置
oracle_conn_str = 'oracle+cx_oracle://用户名:密码@数据库地址:端口/库名'
oracle_engine = create_engine(oracle_conn_str,
                              connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"})

def get_sqlserver_columns(table_name):
    """获取SQL Server表的列定义"""
    with sql_server_engine.connect() as conn:
        columns = conn.execute(text(f"""
            SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_NAME = '{table_name}'
        """)).fetchall()
    return columns

def get_numeric_columns(table_name):
    """获取需要转换为整数的列(NUMBER类型)"""
    with sql_server_engine.connect() as conn:
        result = conn.execute(text(f"""
            SELECT COLUMN_NAME 
            FROM INFORMATION_SCHEMA.COLUMNS 
            WHERE TABLE_NAME = '{table_name}'
            AND DATA_TYPE IN ('int', 'smallint', 'tinyint', 'bigint')
        """))
        return [row.COLUMN_NAME for row in result]

def map_oracle_type(sqlserver_type, max_length):
    """SQL Server到Oracle类型映射"""
    sqlserver_type = sqlserver_type.lower()
    if max_length and max_length < 1:
        max_length = None
    type_map = {
        'varchar': lambda: 'CLOB' if max_length is None else f'VARCHAR2({min(max_length, 4000)})',
        'nvarchar': lambda: 'NCLOB' if max_length is None else f'NVARCHAR2({min(max_length, 2000)})',
        'text': 'CLOB',
        'char': lambda: f'CHAR({max_length})' if max_length else 'CHAR(1)',
        'nchar': lambda: f'NCHAR({max_length})' if max_length else 'NCHAR(1)',
        'int': 'NUMBER(10)',
        'bigint': 'NUMBER(19)',
        'smallint': 'NUMBER(5)',
        'tinyint': 'NUMBER(3)',
        'decimal': 'NUMBER',
        'numeric': 'NUMBER',
        'float': 'BINARY_DOUBLE',
        'real': 'BINARY_FLOAT',
        'datetime': 'TIMESTAMP(6)',
        'datetime2': 'TIMESTAMP(6)',
        'date': 'DATE',
        'time': 'TIMESTAMP(6)',
        'bit': 'NUMBER(1)'
    }
    if sqlserver_type in type_map:
        return type_map[sqlserver_type]() if callable(type_map[sqlserver_type]) else type_map[sqlserver_type]
    return 'CLOB'

def migrate_table(table_name):
    try:
        oracle_table_name = table_name.upper()
        columns_info = get_sqlserver_columns(table_name)
        numeric_cols = get_numeric_columns(table_name)  # 获取需要转换的列

        # 构建Oracle表结构
        columns_with_types = [
            f'"{col.COLUMN_NAME}" {map_oracle_type(col.DATA_TYPE, col.CHARACTER_MAXIMUM_LENGTH)}'
            for col in columns_info
        ]

        with oracle_engine.connect() as conn:
            if conn.execute(text("SELECT 1 FROM user_tables WHERE table_name = :name"),
                            {'name': oracle_table_name}).scalar():
                print(f"删除旧表 {oracle_table_name}...")
                conn.execute(text(f'DROP TABLE "{oracle_table_name}" PURGE'))
                conn.commit()

            create_sql = f'CREATE TABLE "{oracle_table_name}" ({", ".join(columns_with_types)})'
            print(f"\n[DEBUG] 建表SQL:\n{create_sql}")
            conn.execute(text(create_sql))
            conn.commit()

            chunks = pd.read_sql_table(
                table_name,
                sql_server_engine,
                chunksize=10000
            )

            for chunk_idx, chunk_df in enumerate(chunks):
                # 空数据直接跳过
                if len(chunk_df) == 0:
                    print(f"跳过空数据批次:{chunk_idx + 1}")
                    continue

                # 空值处理
                chunk_df = chunk_df.where(pd.notnull(chunk_df), None)

                # 动态转换数值列
                for col in numeric_cols:
                    if col in chunk_df.columns:
                        chunk_df[col] = chunk_df[col].fillna(0).astype('int64')

                # 日期类型转换
                datetime_cols = [col for col in chunk_df.columns
                               if pd.api.types.is_datetime64_any_dtype(chunk_df[col])]
                for col in datetime_cols:
                    chunk_df[col] = chunk_df[col].dt.tz_localize(None)

                # 构建插入SQL
                columns = ', '.join([f'"{col}"' for col in chunk_df.columns])
                placeholders = ', '.join([f':{col}' for col in chunk_df.columns])
                insert_sql = text(f"""
                    INSERT INTO "{oracle_table_name}" ({columns}) 
                    VALUES ({placeholders})
                """)

                try:
                    conn.execute(insert_sql, chunk_df.to_dict(orient='records'))
                    conn.commit()
                    print(f"批次 {chunk_idx + 1}: 成功插入 {len(chunk_df)} 行")
                except DatabaseError as e:
                    conn.rollback()
                    print(f"插入失败: {str(e)}")
                    print("问题数据样例:", chunk_df.iloc[0].to_dict())
                    return

    except Exception as e:
        print(f"严重错误: {str(e)}")
        traceback.print_exc()

def migrate_database():
    with sql_server_engine.connect() as conn:
        tables = conn.execute(text("""
            SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES 
            WHERE TABLE_TYPE='BASE TABLE'
        """)).fetchall()

    exclude_tables = ['表名']  # 要过滤的表

    for table in tables:
        table_name = table[0]

        # 跳过特定表(不区分大小写)
        if table_name.upper() in [t.upper() for t in exclude_tables]:
            print(f"\n{'=' * 30} 跳过表 {table_name} {'=' * 30}")
            continue

        print(f"\n{'=' * 30} 迁移表 {table_name} {'=' * 30}")
        migrate_table(table_name)

if __name__ == '__main__':
    # migrate_table('表名')  # 单表测试
    migrate_database()  # 全库迁移
    print("\n迁移完成")


上面代码主要是解决整库迁移过程中相关表的创建(备注:不同数据库之间数据类型的映射转换),还有读取原始数据的类型转换 和 分批插入优化。

注意事项:

Oracle

1.在pycharm搜索不到cx_Oracle的库,通过cmd的方式进入python安装环境的目录Scripts下,然后pip install cx_Oracle 进行导入。

2.配置 Oracle Instant Client

1).根据Oracle的版本和操作系统进行相应下载  

官方地址:Oracle Instant Client Downloads

2).配置环境变量

  • Windows:将解压路径添加到系统 Path 变量。

SqlServer

1.根据SqlServer的版本进行驱动下载安装

官方地址:Download ODBC Driver for SQL Server - ODBC Driver for SQL Server | Microsoft Learn

http://www.dtcms.com/a/106661.html

相关文章:

  • 鹧鸪云光伏仿真软件场外设计功能:构建系统级工程闭环
  • time.sleep(10)和 async 区别
  • 通信算法之251: 时频图谱spectrogram(如短时傅里叶变换STFT)
  • 数据结构——队列的实现
  • LeetCode算法题(Go语言实现)_26
  • LLM大模型之精度问题(FP16,FP32,BF16)详解与实践
  • Mapreduce的使用
  • 深入理解归并排序:分治艺术的经典实践
  • 【AI产品分享】面向图片的原始位置翻译功能
  • Redisson中BitMap位图的基本操作
  • CORS与OPTIONS请求
  • 蓝桥杯 游戏 6251 单调队列
  • .NET 创建MCP使用大模型对话
  • 【计网速通】计算机网络核心知识点与高频考点——数据链路层(二)
  • kafka消息可靠性传输语义
  • 大语言模型开发框架——LangChain
  • 使用PyTorch实现LeNet-5并在Fashion-MNIST数据集上训练
  • 【Linux】内核驱动学习笔记(二)
  • 基于Spring AI与Ollama构建本地DeepSeek对话机器人
  • 数据库分库分表中间件及对比
  • ensp 网络模拟器 思科华为基于VLANIF的公司网络搭建
  • 2025.4.2总结
  • Go语言GC:三色标记法工程启示|Go语言进阶(3)
  • K-means算法
  • 从零搭建微服务项目Pro(第7-1章——分布式雪花算法)
  • cmake(11):list 选项 排序 SORT,定义宏 add_definitions,cmake 里预定义的 8 个宏
  • Git 命令大全:通俗易懂的指南
  • 基于大模型预测风湿性心脏病二尖瓣病变的多维度诊疗研究报告
  • 内网隔离环境下Java实现图片预览的三大解决方案
  • 【Django开发】前后端分离django美多商城项目第15篇:商品搜索,1. Haystack介绍和安装配置【附代码文档】