PySpark 与 Pandas 的较量:Databricks 中 SQL Server 到 Snowflake 的数据迁移之旅
文章目录
- 1. 场景设定:数据之旅
- 1.2. 步骤 1:建立连接
- 1.3. 步骤 2:检索表
- 1.3 步骤 3:生成 Snowflake 表结构
- 1.4 步骤 4:将数据从 SQL Server 复制到 Snowflake
- 1.5 步骤 5:关闭连接
- 1.5 步骤 5:关闭连接
- 2 结论
将大量数据从一个平台传输到另一个平台是现代数据工程中一项基本技能。随着 Snowflake 等云数据库的兴起,许多组织正在寻求高效的方式,将数据从 SQL Server 等传统系统导入。本文将详细介绍如何在 Databricks 环境中使用 Python 和 PySpark 将数据从 SQL Server 实例导入 Snowflake。在此过程中,我们将探讨模板和变量如何使我们的代码更灵活和可重用。
1. 场景设定:数据之旅
假设你的任务是将数据从本地 SQL Server 数据库 AxProduction 迁移到云数据仓库 Snowflake。你选择的工具是 Python、PySpark 和 Databricks。目标是:
- 连接到 SQL Server。
- 检索元数据和表结构。
- 在 Snowflake 中创建匹配的表。
- 将数据从 SQL Server 传输到 Snowflake。
此过程将通过一系列步骤完成,利用 Python 库和 API,例如用于 SQL Server 连接的 pyodbc、用于 Snowflake 的 snowflake-connector-python,以及用于数据提取的 pandas。
1.2. 步骤 1:建立连接
我们首先需要能够连接到 SQL Server 和 Snowflake。我们将连接参数设置为模板变量,以便以后可以轻松修改。
import pyodbc
import snowflake.connector
from tqdm import tqdm
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas# Define connection parameters as variables for flexibility
sql_server_params = { 'driver': '{ODBC Driver 17 for SQL Server}', 'server': 'your_server,your_port', 'database': 'your_database', 'uid': 'your_user', 'pwd': 'your_password', 'encrypt': 'yes', 'trust_cert': 'yes'
}
sf_params = { 'user': 'your_user', 'account': 'your_account', 'authenticator': 'externalbrowser', 'database': 'your_database', 'schema': 'your_schema'
}
# SQL Server Connection
sql_server_conn = pyodbc.connect( f"DRIVER={sql_server_params['driver']};" f"SERVER={sql_server_params['server']};" f"DATABASE={sql_server_params['database']};" f"UID={sql_server_params['uid']};" f"PWD={sql_server_params['pwd']};" f"ENCRYPT={sql_server_params['encrypt']};" f"TrustServerCertificate={sql_server_params['trust_cert']};"
)
sql_cursor = sql_server_conn.cursor()# Snowflake Connection
sf_conn = snowflake.connector.connect( user=sf_params['user'], account=sf_params['account'], authenticator=sf_params['authenticator'], database=sf_params['database'], schema=sf_params['schema']
)
sf_cursor = sf_conn.cursor()
通过使用这些变量,我们使代码更易于维护且不易出错,因为如果任何凭据或配置发生更改,我们可以在一处更新参数。
1.3. 步骤 2:检索表
为了了解 SQL Server 数据库的结构,我们将从 INFORMATION_SCHEMA 中检索所有表名。这相当于查看地图以了解我们需要去哪里。
tables_query = """ SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'
"""
sql_cursor.execute(tables_query)
tables = [row[0] for row in sql_cursor.fetchall()]
一旦我们有了表的列表,我们就可以开始传输每个表的结构和数据。但首先,我们需要了解数据的结构。
1.3 步骤 3:生成 Snowflake 表结构
对于 SQL Server 中的每个表,我们将动态生成一个用于 Snowflake 的 CREATE TABLE 语句,将每列映射到其适当的类型。这需要从 SQL Server 检索 schema 并将其转换为 Snowflake 兼容的数据类型。
for table in tqdm(tables, desc="Processing Tables"): schema_query = f""" SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}' """ sql_cursor.execute(schema_query) columns = sql_cursor.fetchall()create_table_query = f"CREATE OR REPLACE TABLE AX_{table} (" column_definitions = [] for col in columns: col_name, col_type, col_length = col if col_type in ['varchar', 'nvarchar', 'char', 'text']: col_def = f'"{col_name}" STRING' elif col_type in ['int', 'bigint', 'smallint', 'tinyint']: col_def = f'"{col_name}" INTEGER' elif col_type in ['decimal', 'numeric', 'money', 'smallmoney']: col_def = f'"{col_name}" FLOAT' elif col_type in ['datetime', 'smalldatetime', 'date', 'time']: col_def = f'"{col_name}" TIMESTAMP' else: col_def = f'"{col_name}" STRING' column_definitions.append(col_def)create_table_query += ', '.join(column_definitions) + ')' sf_cursor.execute(create_table_query)
在这里,我们处理来自 SQL Server 的不同列类型,并将它们映射到 Snowflake 中兼容的类型。这确保了当数据导入 Snowflake 时,它保持完整并被高效查询。
1.4 步骤 4:将数据从 SQL Server 复制到 Snowflake
一旦我们定义了 Snowflake 表,我们就可以开始传输数据。为了保持效率,我们限制了每次插入操作的行数。我们使用 Pandas 从 SQL Server 读取数据,然后使用 Snowflake 连接器中的 write_pandas
函数将数据插入 Snowflake。
for table in tables: df = pd.read_sql(f"SELECT * FROM {table}", sql_server_conn) success, nchunks, nrows, _ = write_pandas(sf_conn, df, f"AX_{table}") if success: print(f"Successfully wrote {nrows} rows to AX_{table}.") else: print(f"Failed to write to AX_{table}.")
**write_pandas**
函数特别有用,因为它能高效地处理数据批量加载到 Snowflake。通过将数据限制为较小的块,我们可以避免遇到性能瓶颈或内存问题,尤其是在具有分布式处理的 Databricks 环境中。
1.5 步骤 5:关闭连接
数据成功传输后,通过关闭 SQL Server 和 Snowflake 连接进行清理非常重要。
# Close Connections
sql_cursor.close()
sql_server_conn.close()
sf_cursor.close()
sf_conn.close()
1.5 步骤 5:关闭连接
最后,数据传输完成后,我们关闭数据库连接以清理资源。
# Close Connections
sql_cursor.close()
sql_server_conn.close()
sf_cursor.close()
sf_conn.close()
2 结论
在本指南中,我们详细介绍了在 Databricks 环境中使用 PySpark 将数据从 SQL Server 迁移到 Snowflake 的过程。通过利用模板变量和动态 schema 生成,我们确保了代码的灵活性和可重用性,适用于不同的表和环境。
这种方法的关键优势:
- 可伸缩性:PySpark 实现了分布式处理,使其非常适合大型数据集。
- 灵活性:使用模板变量,我们使代码易于调整以适应新环境或凭据。
- 效率:Snowflake Spark 连接器高效处理批量数据加载,减少了数据导入所需的时间和资源。
此解决方案非常适合数据工程师或任何需要将大量数据从 SQL Server 等本地系统迁移到 Snowflake 等云数据平台的人,以最小的摩擦和最高的效率完成。
在整个过程中,我们使用了模板变量来使代码灵活,确保我们可以轻松地在不同数据库或环境之间切换。这种方法在 Databricks 这样的云生态系统中特别有用,我们可能需要在不改变底层逻辑的情况下修改配置。无论你直接使用 Python、PySpark 还是 SQL,关键是了解适合工作的最佳工具——在这种情况下,Python、Pandas 和 Snowflake 连接器完美地协同工作,实现高效的数据传输。
从 SQL Server 到 Snowflake 的这段旅程不仅仅是一项技术任务;它关乎确保数据流畅、安全地传输,而凭借 Databricks、Python 和 Snowflake 的强大功能,可能性是无限的。
本篇文章从 SQL Server 到 Snowflake:在 Databricks 中的 Python 和 PySpark 冒险适合希望了解数据迁移的读者。文章的技术亮点在于使用 Python 和 PySpark 实现高效的数据迁移,利用模板变量增强代码灵活性。方法适用场景包括从传统 SQL Server 迁移到现代云数据仓库 Snowflake,适合需要处理大规模数据的工程师。实际案例展示了如何通过动态生成表结构和使用分布式计算来优化数据传输,确保数据完整性和查询效率。