当前位置: 首页 > news >正文

Python psycopg2 教程

psycopg2 是 Python 中最流行的 PostgreSQL 数据库适配器。本教程将介绍如何使用 psycopg2 连接和操作 PostgreSQL 数据库。


安装

pip install psycopg2-binary

或者安装完整版本(需要编译):

pip install psycopg2

基本连接

import psycopg2
from psycopg2 import OperationalErrordef create_connection():try:connection = psycopg2.connect(host="localhost",database="mydatabase",user="myuser",password="mypassword",port="5432")print("连接成功!")return connectionexcept OperationalError as e:print(f"连接错误: {e}")return None# 使用连接
conn = create_connection()
if conn:conn.close()

执行查询

创建表

def create_table(connection):create_table_query = '''CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);'''try:cursor = connection.cursor()cursor.execute(create_table_query)connection.commit()print("表创建成功!")except Exception as e:print(f"错误: {e}")connection.rollback()conn = create_connection()
if conn:create_table(conn)conn.close()

插入数据

def insert_user(connection, name, email, age):insert_query = """INSERT INTO users (name, email, age) VALUES (%s, %s, %s)RETURNING id;"""try:cursor = connection.cursor()cursor.execute(insert_query, (name, email, age))user_id = cursor.fetchone()[0]connection.commit()print(f"用户插入成功,ID: {user_id}")return user_idexcept Exception as e:print(f"插入错误: {e}")connection.rollback()return Noneconn = create_connection()
if conn:user_id = insert_user(conn, "张三", "zhangsan@example.com", 25)conn.close()

查询数据

def get_all_users(connection):select_query = "SELECT * FROM users;"try:cursor = connection.cursor()cursor.execute(select_query)users = cursor.fetchall()print("所有用户:")for user in users:print(user)return usersexcept Exception as e:print(f"查询错误: {e}")return []def get_user_by_id(connection, user_id):select_query = "SELECT * FROM users WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(select_query, (user_id,))user = cursor.fetchone()if user:print(f"找到用户: {user}")else:print("用户未找到")return userexcept Exception as e:print(f"查询错误: {e}")return Noneconn = create_connection()
if conn:get_all_users(conn)get_user_by_id(conn, 1)conn.close()

更新数据

def update_user_age(connection, user_id, new_age):update_query = "UPDATE users SET age = %s WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(update_query, (new_age, user_id))connection.commit()if cursor.rowcount > 0:print(f"用户 {user_id} 年龄更新为 {new_age}")else:print("用户未找到")except Exception as e:print(f"更新错误: {e}")connection.rollback()conn = create_connection()
if conn:update_user_age(conn, 1, 26)conn.close()

删除数据

def delete_user(connection, user_id):delete_query = "DELETE FROM users WHERE id = %s;"try:cursor = connection.cursor()cursor.execute(delete_query, (user_id,))connection.commit()if cursor.rowcount > 0:print(f"用户 {user_id} 已删除")else:print("用户未找到")except Exception as e:print(f"删除错误: {e}")connection.rollback()conn = create_connection()
if conn:delete_user(conn, 1)conn.close()

事务管理

def transfer_money(connection, from_account, to_account, amount):"""转账操作的例子,展示事务的使用"""try:cursor = connection.cursor()# 检查发送方余额cursor.execute("SELECT balance FROM accounts WHERE id = %s", (from_account,))from_balance = cursor.fetchone()[0]if from_balance < amount:raise Exception("余额不足")# 扣款cursor.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",(amount, from_account))# 存款cursor.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",(amount, to_account))connection.commit()print("转账成功!")except Exception as e:connection.rollback()print(f"转账失败: {e}")

参数化查询

def search_users(connection, name_filter=None, min_age=None):"""使用参数化查询进行条件搜索"""query = "SELECT * FROM users WHERE 1=1"params = []if name_filter:query += " AND name ILIKE %s"params.append(f"%{name_filter}%")if min_age:query += " AND age >= %s"params.append(min_age)try:cursor = connection.cursor()cursor.execute(query, params)users = cursor.fetchall()print(f"找到 {len(users)} 个用户:")for user in users:print(user)return usersexcept Exception as e:print(f"搜索错误: {e}")return []conn = create_connection()
if conn:search_users(conn, name_filter="张", min_age=20)conn.close()

使用连接池

from psycopg2 import pool# 创建连接池
connection_pool = psycopg2.pool.SimpleConnectionPool(1, 10,  # 最小连接数,最大连接数host="localhost",database="mydatabase",user="myuser",password="mypassword",port="5432"
)def get_user_with_pool(user_id):try:# 从连接池获取连接connection = connection_pool.getconn()cursor = connection.cursor()cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))user = cursor.fetchone()# 将连接返回连接池connection_pool.putconn(connection)return userexcept Exception as e:print(f"错误: {e}")return None# 使用连接池查询
user = get_user_with_pool(1)
print(user)# 程序结束时关闭连接池
connection_pool.closeall()

错误处理

import psycopg2
from psycopg2 import Error, OperationalErrordef safe_database_operation():connection = Nonetry:connection = psycopg2.connect(host="localhost",database="mydatabase",user="myuser",password="mypassword")cursor = connection.cursor()# 执行一些操作cursor.execute("SELECT * FROM nonexistent_table")  # 这会引发错误connection.commit()except OperationalError as e:print(f"连接错误: {e}")except Error as e:print(f"数据库错误: {e}")if connection:connection.rollback()except Exception as e:print(f"其他错误: {e}")finally:if connection:connection.close()print("连接已关闭")safe_database_operation()

完整示例

import psycopg2
from psycopg2 import Errorclass DatabaseManager:def __init__(self, host, database, user, password, port="5432"):self.connection_params = {"host": host,"database": database,"user": user,"password": password,"port": port}self.connection = Nonedef connect(self):try:self.connection = psycopg2.connect(**self.connection_params)print("数据库连接成功")return Trueexcept Error as e:print(f"连接错误: {e}")return Falsedef disconnect(self):if self.connection:self.connection.close()print("数据库连接已关闭")def execute_query(self, query, params=None):try:cursor = self.connection.cursor()cursor.execute(query, params)if query.strip().upper().startswith('SELECT'):result = cursor.fetchall()else:self.connection.commit()result = cursor.rowcountcursor.close()return resultexcept Error as e:print(f"查询错误: {e}")self.connection.rollback()return Nonedef create_user_table(self):query = '''CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);'''return self.execute_query(query)def add_user(self, name, email, age):query = """INSERT INTO users (name, email, age) VALUES (%s, %s, %s)RETURNING id;"""result = self.execute_query(query, (name, email, age))if result:print(f"用户添加成功,ID: {result[0][0]}")return resultdef get_all_users(self):query = "SELECT * FROM users;"return self.execute_query(query)def __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()# 使用示例
if __name__ == "__main__":with DatabaseManager(host="localhost",database="mydatabase",user="myuser",password="mypassword") as db:# 创建表db.create_user_table()# 添加用户db.add_user("李四", "lisi@example.com", 30)db.add_user("王五", "wangwu@example.com", 25)# 查询所有用户users = db.get_all_users()print("所有用户:")for user in users:print(user)

最佳实践

  1. 始终使用参数化查询来防止 SQL 注入攻击

  2. 正确处理事务,确保数据一致性

  3. 使用连接池提高性能

  4. 及时关闭连接和游标

  5. 使用上下文管理器自动管理资源

  6. 实现适当的错误处理

http://www.dtcms.com/a/478100.html

相关文章:

  • 5CEBA2U15I7N 阿尔特拉 Altera Cyclone V FPGA
  • 辉县市工程建设网站建设网站做宣传的免费渠道有那种
  • 2025年10月13日总结
  • perl-Test-Simple-1.302195-5.fc39.noarch.rpm 怎么安装?Fedora 39 安装步骤讲解
  • 图像处理之浓度(AI 调研)
  • 问答网站建设怎么提问郑州网站建设网络推广
  • T:堆的基本介绍
  • Spide - Personal Blog Magazine WordPress Theme Download
  • 使用江科大串口发送函数发送freertos的vTaskList出现跑飞
  • 关于做书的网站中国人做跨电商有什么网站
  • asp网站上传到服务器上之后一打开就是download嘟嘟嘟在线观看播放免费
  • 网站域名审核时间兰州做网站一咨询兰州做网站公司
  • Transformer实战(22)——使用FLAIR进行语义相似性评估
  • Kubernetes:初始化集群(导入Rancher2)
  • 通用:JVM垃圾回收机制
  • Shell脚本技巧:去除文件中字符串两端空白
  • python内置模块-re模块介绍使用
  • JavaWeb后端实战(事务文件上传[本地上传与阿里云OSS上传])
  • USB通讯学习
  • 成都哪里可以做网站涿州网站建设天峰
  • 最新MPAS跨尺度、可变分辨率模式实践技术应用及典型案例分析
  • DSP EDMA3使用
  • 做网站在哪里租服务器家用电脑做网站服务器
  • 第四篇《通信的“世界语“:为什么网络需要HTTP、FTP、DNS等协议?》
  • Helm 与 Ansible 深度对比解析文档
  • 网站域名使用费多少集客crm
  • 阜新市建设小学网站wordpress php 5.2.17
  • 2025年--Lc182--sql(排序和分组)--Java版
  • 生产环境下,前端项目为什么要部署
  • 免费图片素材网站推荐怎样建立一个免费的网站