Python psycopg2 教程
psycopg2 是 Python 中最流行的 PostgreSQL 数据库适配器。本教程将介绍如何使用 psycopg2 连接和操作 PostgreSQL 数据库。
安装
pip install psycopg2-binary
或者安装完整版本(需要编译):
pip install psycopg2
基本连接
import psycopg2
from psycopg2 import OperationalErrordef create_connection():try:connection = psycopg2.connect(host="localhost",database="mydatabase",user="myuser",password="mypassword",port="5432")print("连接成功!")return connectionexcept OperationalError as e:print(f"连接错误: {e}")return None# 使用连接
conn = create_connection()
if conn:conn.close()
执行查询
创建表
def create_table(connection):create_table_query = '''CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);'''try:cursor = connection.cursor()cursor.execute(create_table_query)connection.commit()print("表创建成功!")except Exception as e:print(f"错误: {e}")connection.rollback()conn = create_connection()
if conn:create_table(conn)conn.close()
插入数据
def insert_user(connection, name, email, age):insert_query = """INSERT INTO users (name, email, age) VALUES (%s, %s, %s)RETURNING id;"""try:cursor = connection.cursor()cursor.execute(insert_query, (name, email, age))user_id = cursor.fetchone()[0]connection.commit()print(f"用户插入成功,ID: {user_id}")return user_idexcept Exception as e:print(f"插入错误: {e}")connection.rollback()return Noneconn = create_connection()
if conn:user_id = insert_user(conn, "张三", "zhangsan@example.com", 25)conn.close()
查询数据
def get_all_users(connection):select_query = "SELECT * FROM users;"try:cursor = connection.cursor()cursor.execute(select_query)users = cursor.fetchall()print("所有用户:")for user in users:print(user)return usersexcept Exception as e:print(f"查询错误: {e}")return []def get_user_by_id(connection, user_id):select_query = "SELECT * FROM users WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(select_query, (user_id,))user = cursor.fetchone()if user:print(f"找到用户: {user}")else:print("用户未找到")return userexcept Exception as e:print(f"查询错误: {e}")return Noneconn = create_connection()
if conn:get_all_users(conn)get_user_by_id(conn, 1)conn.close()
更新数据
def update_user_age(connection, user_id, new_age):update_query = "UPDATE users SET age = %s WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(update_query, (new_age, user_id))connection.commit()if cursor.rowcount > 0:print(f"用户 {user_id} 年龄更新为 {new_age}")else:print("用户未找到")except Exception as e:print(f"更新错误: {e}")connection.rollback()conn = create_connection()
if conn:update_user_age(conn, 1, 26)conn.close()
删除数据
def delete_user(connection, user_id):delete_query = "DELETE FROM users WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(delete_query, (user_id,))connection.commit()if cursor.rowcount > 0:print(f"用户 {user_id} 已删除")else:print("用户未找到")except Exception as e:print(f"删除错误: {e}")connection.rollback()conn = create_connection()
if conn:delete_user(conn, 1)conn.close()
事务管理
def transfer_money(connection, from_account, to_account, amount):"""转账操作的例子,展示事务的使用"""try:cursor = connection.cursor()# 检查发送方余额cursor.execute("SELECT balance FROM accounts WHERE id = %s", (from_account,))from_balance = cursor.fetchone()[0]if from_balance < amount:raise Exception("余额不足")# 扣款cursor.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",(amount, from_account))# 存款cursor.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",(amount, to_account))connection.commit()print("转账成功!")except Exception as e:connection.rollback()print(f"转账失败: {e}")
参数化查询
def search_users(connection, name_filter=None, min_age=None):"""使用参数化查询进行条件搜索"""query = "SELECT * FROM users WHERE 1=1"params = []if name_filter:query += " AND name ILIKE %s"params.append(f"%{name_filter}%")if min_age:query += " AND age >= %s"params.append(min_age)try:cursor = connection.cursor()cursor.execute(query, params)users = cursor.fetchall()print(f"找到 {len(users)} 个用户:")for user in users:print(user)return usersexcept Exception as e:print(f"搜索错误: {e}")return []conn = create_connection()
if conn:search_users(conn, name_filter="张", min_age=20)conn.close()
使用连接池
from psycopg2 import pool# 创建连接池
connection_pool = psycopg2.pool.SimpleConnectionPool(1, 10, # 最小连接数,最大连接数host="localhost",database="mydatabase",user="myuser",password="mypassword",port="5432"
)def get_user_with_pool(user_id):try:# 从连接池获取连接connection = connection_pool.getconn()cursor = connection.cursor()cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))user = cursor.fetchone()# 将连接返回连接池connection_pool.putconn(connection)return userexcept Exception as e:print(f"错误: {e}")return None# 使用连接池查询
user = get_user_with_pool(1)
print(user)# 程序结束时关闭连接池
connection_pool.closeall()
错误处理
import psycopg2
from psycopg2 import Error, OperationalErrordef safe_database_operation():connection = Nonetry:connection = psycopg2.connect(host="localhost",database="mydatabase",user="myuser",password="mypassword")cursor = connection.cursor()# 执行一些操作cursor.execute("SELECT * FROM nonexistent_table") # 这会引发错误connection.commit()except OperationalError as e:print(f"连接错误: {e}")except Error as e:print(f"数据库错误: {e}")if connection:connection.rollback()except Exception as e:print(f"其他错误: {e}")finally:if connection:connection.close()print("连接已关闭")safe_database_operation()
完整示例
import psycopg2
from psycopg2 import Errorclass DatabaseManager:def __init__(self, host, database, user, password, port="5432"):self.connection_params = {"host": host,"database": database,"user": user,"password": password,"port": port}self.connection = Nonedef connect(self):try:self.connection = psycopg2.connect(**self.connection_params)print("数据库连接成功")return Trueexcept Error as e:print(f"连接错误: {e}")return Falsedef disconnect(self):if self.connection:self.connection.close()print("数据库连接已关闭")def execute_query(self, query, params=None):try:cursor = self.connection.cursor()cursor.execute(query, params)if query.strip().upper().startswith('SELECT'):result = cursor.fetchall()else:self.connection.commit()result = cursor.rowcountcursor.close()return resultexcept Error as e:print(f"查询错误: {e}")self.connection.rollback()return Nonedef create_user_table(self):query = '''CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);'''return self.execute_query(query)def add_user(self, name, email, age):query = """INSERT INTO users (name, email, age) VALUES (%s, %s, %s)RETURNING id;"""result = self.execute_query(query, (name, email, age))if result:print(f"用户添加成功,ID: {result[0][0]}")return resultdef get_all_users(self):query = "SELECT * FROM users;"return self.execute_query(query)def __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()# 使用示例
if __name__ == "__main__":with DatabaseManager(host="localhost",database="mydatabase",user="myuser",password="mypassword") as db:# 创建表db.create_user_table()# 添加用户db.add_user("李四", "lisi@example.com", 30)db.add_user("王五", "wangwu@example.com", 25)# 查询所有用户users = db.get_all_users()print("所有用户:")for user in users:print(user)
最佳实践
始终使用参数化查询来防止 SQL 注入攻击
正确处理事务,确保数据一致性
使用连接池提高性能
及时关闭连接和游标
使用上下文管理器自动管理资源
实现适当的错误处理