当前位置: 首页 > news >正文

Python实现Amazon Redshift数据库元数据提取类

安装psycopg2-binary、redshift-connector和pandas库。

pip install psycopg2-binary redshift-connector pandas

创建redshift_utility.py

import redshift_connector
import pandas as pddef connect_to_redshift(host, port, user, password, dbname):"""Connect to Amazon Redshift using the provided parameters.Parameters:- host: Redshift server address.- port: Redshift server port.- user: Redshift database username.- password: Redshift database password.- dbname: Name of the Redshift database to connect to.Returns:- conn: A Redshift database connection object."""conn = redshift_connector.connect(host=host, port=port, user=user, password=password, database=dbname)# Create a cursor using the connection objectcursor = conn.cursor()return conn, cursordef query_redshift_to_pandas(cursor, query):"""Query Amazon Redshift using the provided connection object and query statement.Parameters:- cursor: A Redshift database cursor object.- query: The SQL query statement to execute.Returns:- df: The result of the query as a Pandas Dataframe."""# Execute the querycursor.execute(query)# Fetch all results into a list of tuplesresults = cursor.fetchall()# Convert the list of tuples to a Pandas Dataframedf = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])return dfdef print_dataframe(df):"""Print Pandas DataFrame to the console.Parameters:- df: Pandas DataFrame to be printed."""print(df.to_string(index=False, sep='\t'))def get_table_definition(cursor, schema_name, table_name):"""获取Amazon Redshift表中定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the table is located.- table_name: The name of the table to retrieve the definition for.Returns:- The table definition as a string."""# 执行SHOW TABLE语句来获取表结构信息cursor.execute(f"SHOW TABLE {schema_name}.{table_name}")table_info = cursor.fetchall()# 将表结构信息转换为字符串格式并返回table_definition = "\n".join([f"{column[0]}: {column[1]}" for column in table_info])return table_definitiondef get_view_definition(cursor, schema_name, view_name):"""获取Amazon Redshift中视图的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the view is located.- view_name: The name of the view to retrieve the definition for.Returns:- The view definition as a string."""# 执行SHOW VIEW语句来获取视图定义信息cursor.execute(f"SHOW VIEW {schema_name}.{view_name}")view_info = cursor.fetchall()# 将视图定义信息转换为字符串格式并返回view_definition = "\n".join([f"{column[0]}: {column[1]}" for column in view_info])return view_definitiondef get_procedure_definition(cursor, schema_name, procedure_name):"""获取Amazon Redshift中存储过程的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the procedure is located.- procedure_name: The name of the procedure to retrieve the definition for.Returns:- The procedure definition as a string."""# 执行SHOW PROCEDURE语句来获取存储过程定义信息cursor.execute(f"SHOW PROCEDURE {schema_name}.{procedure_name}")procedure_info = cursor.fetchall()# 将存储过程定义信息转换为字符串格式并返回procedure_definition = "\n".join([f"{column[0]}: {column[1]}" for column in procedure_info])return procedure_definitiondef get_user_function_definition(cursor, schema_name, function_name):"""获取Amazon Redshift中用户定义函数的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the function is located.- function_name: The name of the function to retrieve the definition for.Returns:- The function definition as a string."""# 执行SHOW FUNCTION语句来获取用户定义函数定义信息cursor.execute(f"SHOW FUNCTION {schema_name}.{function_name}")function_info = cursor.fetchall()# 将用户定义函数定义信息转换为字符串格式并返回function_definition = "\n".join([f"{column[0]}: {column[1]}" for column in function_info])return function_definitiondef get_trigger_definition(cursor, schema_name, trigger_name):"""获取Amazon Redshift中触发器的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the trigger is located.- trigger_name: The name of the trigger to retrieve the definition for.Returns:- The trigger definition as a string."""# 执行SHOW TRIGGER语句来获取触发器定义信息cursor.execute(f"SHOW TRIGGER {schema_name}.{trigger_name}")trigger_info = cursor.fetchall()# 将触发器定义信息转换为字符串格式并返回trigger_definition = "\n".join([f"{column[0]}: {column[1]}" for column in trigger_info])return trigger_definitiondef get_single_value(conn, query):"""获取查询结果集中的单个值Parameters:- conn: A Redshift database connection object.- query (str): 要执行的查询语句。返回:int: 结果集中的行数。"""df = pd.read_sql(query, conn)return df.shape[0]def execute_sql_command(cursor, sql_command):"""执行SQL命令语句并返回影响的行数。Parameters:- cursor: A Redshift database cursor object.- sql_command: The SQL command to execute.Returns:- The number of rows affected by the command, or -1 if the execution fails."""try:# 执行SQL命令语句cursor.execute(sql_command)# 获取影响的行数affected_rows = cursor.rowcount# 返回影响的行数或-1(如果执行失败)return affected_rowsexcept Exception as e:print(f"Error executing SQL command: {e}")return -1def get_table_definitions(cursor, schema_name):query = f"""select table_name AS TableName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'BASE TABLE'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)table_definitions = []for _,row in result_df.iterrows():table_name = row['TableName']table_definition = get_table_definition(cursor, schema_name, table_name)table_definitions.append(table_definition)return table_definitionsdef get_view_definitions(cursor, schema_name):query = f"""select table_name AS ViewName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'VIEW'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)view_definitions = []for _,row in result_df.iterrows():view_name = row['ViewName']view_definition = get_view_definition(cursor, schema_name, view_name)view_definitions.append(view_definition)return view_definitions

测试代码:

# 定义连接参数
host = 'your_redshift_host'
port = 5439  # Amazon Redshift的默认端口是5439
user = 'your_username'
password = 'your_password'
dbname = 'your_database_name'query = "SELECT * FROM your_table"  # 你的查询语句。
conn, cursor = connect_to_redshift(host, port, user, password, dbname)
# 使用函数查询并获取结果集为Pandas Dataframe。
result_df = query_redshift_to_pandas(cursor, query)# 显示前5行数据。
print(result_df.head())# 调用函数并打印数据表到控制台
print_dataframe(result_df)schema_name = "your_schema"  # 表所在的schema名称。
table_name = "your_table"  # 表名。# 调用函数获取表定义并打印。
table_definition = get_table_definition(cursor, schema_name, table_name)
print(table_definition)view_name = "your_view"  # 视图名# 调用函数获取视图定义并打印。
view_definition = get_view_definition(cursor, schema_name, view_name)
print(view_definition)procedure_name = "your_procedure"  # 存储过程名。# 调用函数获取存储过程定义并打印。
procedure_definition = get_procedure_definition(cursor, schema_name, procedure_name)
print(procedure_definition)function_name = "your_function"  # 用户定义函数名。# 调用函数获取用户定义函数定义并打印。
function_definition = get_user_function_definition(cursor, schema_name, function_name)
print(function_definition)trigger_name = "your_trigger"  # 触发器名。# 调用函数获取触发器定义并打印。
trigger_definition = get_trigger_definition(cursor, schema_name, trigger_name)
print(trigger_definition)# 执行查询
query = "SELECT COUNT(*) FROM your_table"
row_count = get_single_value(conn, query)print(f"结果集中的行数: {row_count}")sql = "YOUR SQL COMMAND HERE"  # 你的SQL命令语句。# 调用函数执行SQL命令并获取影响的行数。
affected_rows = execute_sql_command(cursor, sql)
print(f"Number of rows affected by the command: {affected_rows}")# 获取所有表的定义语句
definitions = get_table_definitions(cursor, schema_name)  # 例如 'public'
for definition in definitions:print(definition)# 获取视图的定义语句
definitions = get_view_definitions(cursor, schema_name)  # 例如 'public'
for definition in definitions:print(definition)query = f"""select table_name AS TableName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'BASE TABLE'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)for index,row in result_df.iterrows():table_name = row['TableName']print(table_name)table_definition = get_table_definition(cursor, schema_name, table_name)print(table_definition)query = f"""select table_name AS ViewName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'VIEW'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)for index,row in result_df.iterrows():view_name = row['ViewName']print(view_name)view_definition = get_view_definition(cursor, schema_name, view_name)print(view_definition)query = f"""select proname AS FunctionName from pg_proc_info where prokind='f';"""result_df = query_redshift_to_pandas(cursor, cursor, query)for index,row in result_df.iterrows():function_name = row['FunctionName']print(function_name)function_definition = get_user_function_definition(cursor, schema_name, view_name)print(function_definition)query = f"""select proname AS ProcedureName from pg_proc_info where prokind='p';"""result_df = query_redshift_to_pandas(cursor, cursor, query)for index,row in result_df.iterrows():function_name = row['ProcedureName']print(function_name)function_definition = get_procedure_definition(cursor, schema_name, view_name)print(function_definition)# 关闭连接
cursor.close()
conn.close()
http://www.dtcms.com/a/327439.html

相关文章:

  • 分布式事务Seata使用不当引发的全局锁问题
  • 解锁Java线程池:从原理到实战的深度剖析
  • 无人机三维路径规划
  • 前端基础知识NodeJS系列 - 06( Node 中的 Stream 的理解?应用场景?)
  • 如何实现PostgreSQL的高可用性,包括主流的复制方案、负载均衡方法以及故障转移流程?
  • TensorBoard的使用 小土堆pytorch记录
  • 数据类型 list
  • 小白挑战一周上架元服务——ArkUI04
  • 前端最新Vue2+Vue3基础入门到实战项目全套教程,自学前端vue就选黑马程序员,一套全通关!笔记
  • Java面试宝典:G1垃圾收集器上
  • 超详细!VMware12 安装win7操作系统
  • react+vite来优化下每次使用hook函数都要引入的情况
  • Neo4j Cypher
  • 哪个视频播放器好用?视频播放器PotPlayer下载安装与调教教程
  • QGraphicsAnchorLayout测试pyside6和C++
  • 微内核与插件化设计思想及其在前端项目中的应用
  • 怎么写好汉语言文学专业的论文?
  • TongSearch3.0.6.0安装和使用指引(by lqw)
  • Day 38: Dataset类和DataLoader类
  • 三点估算法(Three-Point Estimation)
  • OpenHarmony介绍
  • 知识篇 | Oracle Active Data Guard(ADG)同步机制再学习
  • TCP服务器网络编程设计流程详解
  • 车规级霍尔开关芯片SC25891 | 为汽车安全带扣筑起高可靠性安全防线
  • FileLink:为企业跨网文件传输筑牢安全与效率基石
  • Go 语言中的结构体、切片与映射:构建高效数据模型的基石
  • apache+虚拟主机
  • windows git安装步骤
  • 深入剖析 React 合成事件:透过 onClick 看本质
  • Flutter UI Kits by Olayemi Garuba:免费开源的高质量UI组件库