Python实现Amazon Redshift数据库元数据提取类
安装psycopg2-binary、redshift-connector和pandas库。
pip install psycopg2-binary redshift-connector pandas
创建redshift_utility.py
import redshift_connector
import pandas as pddef connect_to_redshift(host, port, user, password, dbname):"""Connect to Amazon Redshift using the provided parameters.Parameters:- host: Redshift server address.- port: Redshift server port.- user: Redshift database username.- password: Redshift database password.- dbname: Name of the Redshift database to connect to.Returns:- conn: A Redshift database connection object."""conn = redshift_connector.connect(host=host, port=port, user=user, password=password, database=dbname)# Create a cursor using the connection objectcursor = conn.cursor()return conn, cursordef query_redshift_to_pandas(cursor, query):"""Query Amazon Redshift using the provided connection object and query statement.Parameters:- cursor: A Redshift database cursor object.- query: The SQL query statement to execute.Returns:- df: The result of the query as a Pandas Dataframe."""# Execute the querycursor.execute(query)# Fetch all results into a list of tuplesresults = cursor.fetchall()# Convert the list of tuples to a Pandas Dataframedf = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])return dfdef print_dataframe(df):"""Print Pandas DataFrame to the console.Parameters:- df: Pandas DataFrame to be printed."""print(df.to_string(index=False, sep='\t'))def get_table_definition(cursor, schema_name, table_name):"""获取Amazon Redshift表中定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the table is located.- table_name: The name of the table to retrieve the definition for.Returns:- The table definition as a string."""# 执行SHOW TABLE语句来获取表结构信息cursor.execute(f"SHOW TABLE {schema_name}.{table_name}")table_info = cursor.fetchall()# 将表结构信息转换为字符串格式并返回table_definition = "\n".join([f"{column[0]}: {column[1]}" for column in table_info])return table_definitiondef get_view_definition(cursor, schema_name, view_name):"""获取Amazon Redshift中视图的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the view is located.- view_name: The name of the view to retrieve the definition for.Returns:- The view definition as a string."""# 执行SHOW VIEW语句来获取视图定义信息cursor.execute(f"SHOW VIEW {schema_name}.{view_name}")view_info = cursor.fetchall()# 将视图定义信息转换为字符串格式并返回view_definition = "\n".join([f"{column[0]}: {column[1]}" for column in view_info])return view_definitiondef get_procedure_definition(cursor, schema_name, procedure_name):"""获取Amazon Redshift中存储过程的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the procedure is located.- procedure_name: The name of the procedure to retrieve the definition for.Returns:- The procedure definition as a string."""# 执行SHOW PROCEDURE语句来获取存储过程定义信息cursor.execute(f"SHOW PROCEDURE {schema_name}.{procedure_name}")procedure_info = cursor.fetchall()# 将存储过程定义信息转换为字符串格式并返回procedure_definition = "\n".join([f"{column[0]}: {column[1]}" for column in procedure_info])return procedure_definitiondef get_user_function_definition(cursor, schema_name, function_name):"""获取Amazon Redshift中用户定义函数的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the function is located.- function_name: The name of the function to retrieve the definition for.Returns:- The function definition as a string."""# 执行SHOW FUNCTION语句来获取用户定义函数定义信息cursor.execute(f"SHOW FUNCTION {schema_name}.{function_name}")function_info = cursor.fetchall()# 将用户定义函数定义信息转换为字符串格式并返回function_definition = "\n".join([f"{column[0]}: {column[1]}" for column in function_info])return function_definitiondef get_trigger_definition(cursor, schema_name, trigger_name):"""获取Amazon Redshift中触发器的定义语句。Parameters:- cursor: A Redshift database cursor object.- schema_name: The name of the schema where the trigger is located.- trigger_name: The name of the trigger to retrieve the definition for.Returns:- The trigger definition as a string."""# 执行SHOW TRIGGER语句来获取触发器定义信息cursor.execute(f"SHOW TRIGGER {schema_name}.{trigger_name}")trigger_info = cursor.fetchall()# 将触发器定义信息转换为字符串格式并返回trigger_definition = "\n".join([f"{column[0]}: {column[1]}" for column in trigger_info])return trigger_definitiondef get_single_value(conn, query):"""获取查询结果集中的单个值Parameters:- conn: A Redshift database connection object.- query (str): 要执行的查询语句。返回:int: 结果集中的行数。"""df = pd.read_sql(query, conn)return df.shape[0]def execute_sql_command(cursor, sql_command):"""执行SQL命令语句并返回影响的行数。Parameters:- cursor: A Redshift database cursor object.- sql_command: The SQL command to execute.Returns:- The number of rows affected by the command, or -1 if the execution fails."""try:# 执行SQL命令语句cursor.execute(sql_command)# 获取影响的行数affected_rows = cursor.rowcount# 返回影响的行数或-1(如果执行失败)return affected_rowsexcept Exception as e:print(f"Error executing SQL command: {e}")return -1def get_table_definitions(cursor, schema_name):query = f"""select table_name AS TableName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'BASE TABLE'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)table_definitions = []for _,row in result_df.iterrows():table_name = row['TableName']table_definition = get_table_definition(cursor, schema_name, table_name)table_definitions.append(table_definition)return table_definitionsdef get_view_definitions(cursor, schema_name):query = f"""select table_name AS ViewName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'VIEW'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)view_definitions = []for _,row in result_df.iterrows():view_name = row['ViewName']view_definition = get_view_definition(cursor, schema_name, view_name)view_definitions.append(view_definition)return view_definitions
测试代码:
# 定义连接参数
host = 'your_redshift_host'
port = 5439 # Amazon Redshift的默认端口是5439
user = 'your_username'
password = 'your_password'
dbname = 'your_database_name'query = "SELECT * FROM your_table" # 你的查询语句。
conn, cursor = connect_to_redshift(host, port, user, password, dbname)
# 使用函数查询并获取结果集为Pandas Dataframe。
result_df = query_redshift_to_pandas(cursor, query)# 显示前5行数据。
print(result_df.head())# 调用函数并打印数据表到控制台
print_dataframe(result_df)schema_name = "your_schema" # 表所在的schema名称。
table_name = "your_table" # 表名。# 调用函数获取表定义并打印。
table_definition = get_table_definition(cursor, schema_name, table_name)
print(table_definition)view_name = "your_view" # 视图名# 调用函数获取视图定义并打印。
view_definition = get_view_definition(cursor, schema_name, view_name)
print(view_definition)procedure_name = "your_procedure" # 存储过程名。# 调用函数获取存储过程定义并打印。
procedure_definition = get_procedure_definition(cursor, schema_name, procedure_name)
print(procedure_definition)function_name = "your_function" # 用户定义函数名。# 调用函数获取用户定义函数定义并打印。
function_definition = get_user_function_definition(cursor, schema_name, function_name)
print(function_definition)trigger_name = "your_trigger" # 触发器名。# 调用函数获取触发器定义并打印。
trigger_definition = get_trigger_definition(cursor, schema_name, trigger_name)
print(trigger_definition)# 执行查询
query = "SELECT COUNT(*) FROM your_table"
row_count = get_single_value(conn, query)print(f"结果集中的行数: {row_count}")sql = "YOUR SQL COMMAND HERE" # 你的SQL命令语句。# 调用函数执行SQL命令并获取影响的行数。
affected_rows = execute_sql_command(cursor, sql)
print(f"Number of rows affected by the command: {affected_rows}")# 获取所有表的定义语句
definitions = get_table_definitions(cursor, schema_name) # 例如 'public'
for definition in definitions:print(definition)# 获取视图的定义语句
definitions = get_view_definitions(cursor, schema_name) # 例如 'public'
for definition in definitions:print(definition)query = f"""select table_name AS TableName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'BASE TABLE'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)for index,row in result_df.iterrows():table_name = row['TableName']print(table_name)table_definition = get_table_definition(cursor, schema_name, table_name)print(table_definition)query = f"""select table_name AS ViewName
from information_schema.tables
where table_schema not in ('information_schema', 'pg_catalog')and table_type = 'VIEW'
where table_schema = '{schema_name}';"""result_df = query_redshift_to_pandas(cursor, query)for index,row in result_df.iterrows():view_name = row['ViewName']print(view_name)view_definition = get_view_definition(cursor, schema_name, view_name)print(view_definition)query = f"""select proname AS FunctionName from pg_proc_info where prokind='f';"""result_df = query_redshift_to_pandas(cursor, cursor, query)for index,row in result_df.iterrows():function_name = row['FunctionName']print(function_name)function_definition = get_user_function_definition(cursor, schema_name, view_name)print(function_definition)query = f"""select proname AS ProcedureName from pg_proc_info where prokind='p';"""result_df = query_redshift_to_pandas(cursor, cursor, query)for index,row in result_df.iterrows():function_name = row['ProcedureName']print(function_name)function_definition = get_procedure_definition(cursor, schema_name, view_name)print(function_definition)# 关闭连接
cursor.close()
conn.close()