当前位置: 首页 > news >正文

PySpark 与 Pandas 的较量:Databricks 中 SQL Server 到 Snowflake 的数据迁移之旅


文章目录

  • 1. 场景设定:数据之旅
    • 1.2. 步骤 1:建立连接
    • 1.3. 步骤 2:检索表
    • 1.3 步骤 3:生成 Snowflake 表结构
    • 1.4 步骤 4:将数据从 SQL Server 复制到 Snowflake
    • 1.5 步骤 5:关闭连接
    • 1.5 步骤 5:关闭连接
  • 2 结论


将大量数据从一个平台传输到另一个平台是现代数据工程中一项基本技能。随着 Snowflake 等云数据库的兴起,许多组织正在寻求高效的方式,将数据从 SQL Server 等传统系统导入。本文将详细介绍如何在 Databricks 环境中使用 Python 和 PySpark 将数据从 SQL Server 实例导入 Snowflake。在此过程中,我们将探讨模板和变量如何使我们的代码更灵活和可重用。

1. 场景设定:数据之旅

假设你的任务是将数据从本地 SQL Server 数据库 AxProduction 迁移到云数据仓库 Snowflake。你选择的工具是 Python、PySpark 和 Databricks。目标是:

  1. 连接到 SQL Server。
  2. 检索元数据和表结构。
  3. 在 Snowflake 中创建匹配的表。
  4. 将数据从 SQL Server 传输到 Snowflake。

此过程将通过一系列步骤完成,利用 Python 库和 API,例如用于 SQL Server 连接的 pyodbc、用于 Snowflake 的 snowflake-connector-python,以及用于数据提取的 pandas

1.2. 步骤 1:建立连接

我们首先需要能够连接到 SQL Server 和 Snowflake。我们将连接参数设置为模板变量,以便以后可以轻松修改。

import pyodbc  
import snowflake.connector  
from tqdm import tqdm  
import pandas as pd  
from snowflake.connector.pandas_tools import write_pandas# Define connection parameters as variables for flexibility  
sql_server_params = {  'driver': '{ODBC Driver 17 for SQL Server}',  'server': 'your_server,your_port',  'database': 'your_database',  'uid': 'your_user',  'pwd': 'your_password',  'encrypt': 'yes',  'trust_cert': 'yes'  
}
sf_params = {  'user': 'your_user',  'account': 'your_account',  'authenticator': 'externalbrowser',  'database': 'your_database',  'schema': 'your_schema'  
}
# SQL Server Connection  
sql_server_conn = pyodbc.connect(  f"DRIVER={sql_server_params['driver']};"  f"SERVER={sql_server_params['server']};"  f"DATABASE={sql_server_params['database']};"  f"UID={sql_server_params['uid']};"  f"PWD={sql_server_params['pwd']};"  f"ENCRYPT={sql_server_params['encrypt']};"  f"TrustServerCertificate={sql_server_params['trust_cert']};"  
)  
sql_cursor = sql_server_conn.cursor()# Snowflake Connection  
sf_conn = snowflake.connector.connect(  user=sf_params['user'],  account=sf_params['account'],  authenticator=sf_params['authenticator'],  database=sf_params['database'],  schema=sf_params['schema']  
)  
sf_cursor = sf_conn.cursor()

通过使用这些变量,我们使代码更易于维护且不易出错,因为如果任何凭据或配置发生更改,我们可以在一处更新参数。

1.3. 步骤 2:检索表

为了了解 SQL Server 数据库的结构,我们将从 INFORMATION_SCHEMA 中检索所有表名。这相当于查看地图以了解我们需要去哪里。

tables_query = """  SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES   WHERE TABLE_TYPE = 'BASE TABLE'  
"""  
sql_cursor.execute(tables_query)  
tables = [row[0] for row in sql_cursor.fetchall()]

一旦我们有了表的列表,我们就可以开始传输每个表的结构和数据。但首先,我们需要了解数据的结构。

1.3 步骤 3:生成 Snowflake 表结构

对于 SQL Server 中的每个表,我们将动态生成一个用于 Snowflake 的 CREATE TABLE 语句,将每列映射到其适当的类型。这需要从 SQL Server 检索 schema 并将其转换为 Snowflake 兼容的数据类型。

for table in tqdm(tables, desc="Processing Tables"):  schema_query = f"""  SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH   FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'  """  sql_cursor.execute(schema_query)  columns = sql_cursor.fetchall()create_table_query = f"CREATE OR REPLACE TABLE AX_{table} ("  column_definitions = []  for col in columns:  col_name, col_type, col_length = col  if col_type in ['varchar', 'nvarchar', 'char', 'text']:  col_def = f'"{col_name}" STRING'  elif col_type in ['int', 'bigint', 'smallint', 'tinyint']:  col_def = f'"{col_name}" INTEGER'  elif col_type in ['decimal', 'numeric', 'money', 'smallmoney']:  col_def = f'"{col_name}" FLOAT'  elif col_type in ['datetime', 'smalldatetime', 'date', 'time']:  col_def = f'"{col_name}" TIMESTAMP'  else:  col_def = f'"{col_name}" STRING'  column_definitions.append(col_def)create_table_query += ', '.join(column_definitions) + ')'  sf_cursor.execute(create_table_query)

在这里,我们处理来自 SQL Server 的不同列类型,并将它们映射到 Snowflake 中兼容的类型。这确保了当数据导入 Snowflake 时,它保持完整并被高效查询。

1.4 步骤 4:将数据从 SQL Server 复制到 Snowflake

一旦我们定义了 Snowflake 表,我们就可以开始传输数据。为了保持效率,我们限制了每次插入操作的行数。我们使用 Pandas 从 SQL Server 读取数据,然后使用 Snowflake 连接器中的 write_pandas 函数将数据插入 Snowflake。

for table in tables:  df = pd.read_sql(f"SELECT * FROM {table}", sql_server_conn)  success, nchunks, nrows, _ = write_pandas(sf_conn, df, f"AX_{table}")  if success:  print(f"Successfully wrote {nrows} rows to AX_{table}.")  else:  print(f"Failed to write to AX_{table}.")

**write_pandas** 函数特别有用,因为它能高效地处理数据批量加载到 Snowflake。通过将数据限制为较小的块,我们可以避免遇到性能瓶颈或内存问题,尤其是在具有分布式处理的 Databricks 环境中。

1.5 步骤 5:关闭连接

数据成功传输后,通过关闭 SQL Server 和 Snowflake 连接进行清理非常重要。

# Close Connections  
sql_cursor.close()  
sql_server_conn.close()  
sf_cursor.close()  
sf_conn.close()

1.5 步骤 5:关闭连接

最后,数据传输完成后,我们关闭数据库连接以清理资源。

# Close Connections  
sql_cursor.close()  
sql_server_conn.close()  
sf_cursor.close()  
sf_conn.close()

2 结论

在本指南中,我们详细介绍了在 Databricks 环境中使用 PySpark 将数据从 SQL Server 迁移到 Snowflake 的过程。通过利用模板变量和动态 schema 生成,我们确保了代码的灵活性和可重用性,适用于不同的表和环境。

这种方法的关键优势:

  • 可伸缩性:PySpark 实现了分布式处理,使其非常适合大型数据集。
  • 灵活性:使用模板变量,我们使代码易于调整以适应新环境或凭据。
  • 效率:Snowflake Spark 连接器高效处理批量数据加载,减少了数据导入所需的时间和资源。

此解决方案非常适合数据工程师或任何需要将大量数据从 SQL Server 等本地系统迁移到 Snowflake 等云数据平台的人,以最小的摩擦和最高的效率完成。

在整个过程中,我们使用了模板变量来使代码灵活,确保我们可以轻松地在不同数据库或环境之间切换。这种方法在 Databricks 这样的云生态系统中特别有用,我们可能需要在不改变底层逻辑的情况下修改配置。无论你直接使用 Python、PySpark 还是 SQL,关键是了解适合工作的最佳工具——在这种情况下,Python、Pandas 和 Snowflake 连接器完美地协同工作,实现高效的数据传输。

从 SQL Server 到 Snowflake 的这段旅程不仅仅是一项技术任务;它关乎确保数据流畅、安全地传输,而凭借 Databricks、Python 和 Snowflake 的强大功能,可能性是无限的。


本篇文章从 SQL Server 到 Snowflake:在 Databricks 中的 Python 和 PySpark 冒险适合希望了解数据迁移的读者。文章的技术亮点在于使用 Python 和 PySpark 实现高效的数据迁移,利用模板变量增强代码灵活性。方法适用场景包括从传统 SQL Server 迁移到现代云数据仓库 Snowflake,适合需要处理大规模数据的工程师。实际案例展示了如何通过动态生成表结构和使用分布式计算来优化数据传输,确保数据完整性和查询效率。


文章转载自:

http://PUithKAr.jxLnr.cn
http://io7ukaKc.jxLnr.cn
http://XBslPMLC.jxLnr.cn
http://6Sy2sNEy.jxLnr.cn
http://BQwn6nJV.jxLnr.cn
http://iIKVsvRK.jxLnr.cn
http://3u5GR7iT.jxLnr.cn
http://9M0ZtrnP.jxLnr.cn
http://ZkujR97l.jxLnr.cn
http://YeKj5FYL.jxLnr.cn
http://2tZ4q5pA.jxLnr.cn
http://6IoryAvT.jxLnr.cn
http://uCXmDv0d.jxLnr.cn
http://wjsI1ZV2.jxLnr.cn
http://5vHd9RdK.jxLnr.cn
http://2syTkLNU.jxLnr.cn
http://DXveZMuf.jxLnr.cn
http://BDPuPxVu.jxLnr.cn
http://KIlSTjPp.jxLnr.cn
http://Tv6AcPfQ.jxLnr.cn
http://ZQfXE2kc.jxLnr.cn
http://8wL1jHH5.jxLnr.cn
http://bonECb5a.jxLnr.cn
http://l6ooEBAS.jxLnr.cn
http://VEe4pU7g.jxLnr.cn
http://ohqp7koZ.jxLnr.cn
http://nBgXFwER.jxLnr.cn
http://GrDFmLNu.jxLnr.cn
http://6zc3rF72.jxLnr.cn
http://6AV9JtPc.jxLnr.cn
http://www.dtcms.com/a/376375.html

相关文章:

  • ArcGIS软件安装。
  • 【Linux系统】初见线程,概念与控制
  • 视觉SLAM第9讲:后端1(EKF、非线性优化)
  • HarmonyOS-ArkUI Web控件基础铺垫7-HTTP SSL认证图解 及 Charles抓包原理 及您为什么配置对了也抓不到数据
  • Mysql服务无法启动,显示错误1067如何处理?
  • Redis主从模式和集群模式的区别
  • 基于51单片机水塔水箱液水位WIFI监控报警设计
  • AR消防头盔:火场救援的智能“透视眼”
  • 【MFC】对话框:位置属性(居中、绝对对齐、X位置Y位置)应用示例
  • 路由器无线桥接二级验证网络(初始密码和网页登录个人账号和密码)
  • 【MFC】对话框属性:X Pos(X位置),Y Pos(Y位置)
  • 工程师 - Onion Architecture in Software Development
  • Golang单例模式和工厂模式详解
  • Redis 分布式锁:从原理到实战的完整指南
  • 计算机网络——第一章 计算机网络体系结构
  • 【公共数据】《公共数据资源授权运营实施指南》核心观点
  • 姓名+身份证号码+人像实名认证接口-三要素身份证实名认证api
  • Linux编程笔记1-概念数据类型输入输出
  • 认知语义学对人工智能自然语言处理的影响与启示:从理论融合到未来展望
  • Markdown 介绍和使用教程
  • 实习——配置电源管理策略
  • Es6新特性总结
  • 【云原生网络篇】从 Private Endpoint 到 K8s Pod 对外注册:一次网络底层的全面探究
  • 老梁聊全栈系列:(阶段一)从单体到云原生的演进脉络
  • AI 模型训练过程中参数用BF16转向FP16的原因
  • win11,安装c++版OpenCV,带cuda
  • openEuler 24.03 (LTS-SP2)简单KVM安装+桥接模式
  • websocket 服务器往客户端发送的数据要加掩码覆盖吗?
  • LLM大语言模型部署到本地(个人总结)
  • TanStack Query Vue -vue的Axios Hooks