异步协程中基于 aiomysql 的异步数据库操作
本文介绍如何在 异步协程 中使用异步数据库操作,结合 aiomysql
库和异步连接池实现轻量级的异步数据库封装,适用于 asyncio
协程和异步编程。
1. 项目背景
在 Python 的异步编程中,aiomysql
是一款非常流行的异步 MySQL 数据库驱动。通过结合 asyncio
库和 aiomysql
,可以高效地处理并发数据库操作。
在大规模并发系统中,数据库操作的效率往往成为瓶颈。为了优化数据库性能和实现高并发,本教程将通过以下内容进行演示:
- 使用异步连接池
- 通过异步协程执行数据库操作
- 处理基本的数据库增删改查(CRUD)操作
2. 安装依赖
首先,确保你已安装必要的库。使用以下命令来安装:
pip install aiomysql
3. 基于 aiomysql
封装异步连接池
我们将封装一个异步数据库连接池,供后续操作调用。以下是 AsyncConnection
类的实现,它使用 aiomysql
创建连接池,并提供查询、插入、更新等操作。
代码实现
import asyncio
import aiomysql
import logging
import tracebackclass AsyncConnection:"""基于 aiomysql 的异步数据库连接封装"""def __init__(self, host, database, user=None, password=None,port=3306, connect_timeout=10, charset="utf8mb4",sql_mode="TRADITIONAL"):self.host = hostself.database = databaseself.user = userself.password = passwordself.port = portself.connect_timeout = connect_timeoutself.charset = charsetself.sql_mode = sql_modeself._pool = Noneasync def connect(self):"""建立连接池"""self._pool = await aiomysql.create_pool(host=self.host,port=self.port,user=self.user,password=self.password,db=self.database,charset=self.charset,autocommit=True,connect_timeout=self.connect_timeout,sql_mode=self.sql_mode,cursorclass=aiomysql.DictCursor)async def close(self):"""关闭连接池"""if self._pool:self._pool.close()await self._pool.wait_closed()async def query(self, sql, *args):"""执行查询,返回多行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)result = await cur.fetchall()return resultasync def get(self, sql, *args):"""执行查询,返回一行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)return await cur.fetchone()async def execute(self, sql, *args):"""执行写操作,返回最后插入的ID"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:try:await cur.execute(sql, args)return cur.lastrowidexcept aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("忽略重复插入: %s", e)else:logging.error("SQL执行失败: %s\nSQL: %s", e, sql)raise## =============== 高级封装 ===================async def table_has(self, table_name, field, value):"""检查表中是否有对应字段值"""sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return await self.get(sql, value)async def table_insert(self, table_name, item: dict):"""插入一条数据"""fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return await self.execute(sql, *values)except aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重复插入被跳过")else:logging.error("插入数据出错: %s\n数据: %s", e, item)raiseasync def table_update(self, table_name, updates: dict, field_where: str, value_where):"""更新一条记录"""set_clause = ', '.join([f"{k}=%s" for k in updates.keys()])values = list(updates.values())values.append(value_where)sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"await self.execute(sql, *values)
4. 多协程并发执行数据库操作
接下来,我们使用 asyncio.gather
来并发执行多个异步数据库操作,每个协程模拟一个用户执行数据库增删改查(CRUD)操作。
完整的多协程测试代码
import asyncio
import logging
from ezpymysql_async import AsyncConnection# 日志配置
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")# 初始化连接
db = AsyncConnection(host='localhost',database='test_db',user='root',password='5201314',port=3306
)async def run_user_task(user_id):name = f"User-{user_id}"age = 20 + user_idtry:# 1. 插入数据await db.table_insert("test_table", {"name": name, "age": age})logging.info(f"[{name}] 插入成功")# 2. 查询所有all_data = await db.query("SELECT * FROM test_table")logging.info(f"[{name}] 当前所有数据行数: {len(all_data)}")# 3. 查询单条one = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 查询单条: {one}")# 4. 更新await db.table_update("test_table", {"age": age + 10}, "name", name)logging.info(f"[{name}] 年龄更新完成")# 5. 再查询updated = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 更新后: {updated}")# 6. 检查存在exists = await db.table_has("test_table", "name", name)logging.info(f"[{name}] 是否存在: {exists is not None}")# 7. 删除await db.execute("DELETE FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 删除成功")# 8. 再插入并测试 lastrowidlast_id = await db.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", f"{name}_new", age)logging.info(f"[{name}] 插入新数据成功, ID: {last_id}")except Exception as e:logging.error(f"[{name}] 出现异常: {e}")async def main():# 建立连接池await db.connect()# 创建多个协程任务tasks = [run_user_task(i) for i in range(100)]# 并发执行任务await asyncio.gather(*tasks)# 关闭连接池await db.close()logging.info("所有协程任务已完成")if __name__ == "__main__":asyncio.run(main())
5. 结果与总结
通过上述的异步数据库封装和多协程测试,我们能够:
- 高效地进行数据库操作,避免了阻塞。
- 实现了多线程并发数据库操作,提升了数据库的访问性能。
- 灵活地使用 Python 的
asyncio
协程处理大量并发请求。
输出示例
通过本文,你可以将这种异步数据库操作封装到任何异步应用中,无论是 Web 开发还是爬虫等领域,都能有效提高数据库操作的性能。
如果你对本篇教程有任何疑问或建议,欢迎留言讨论!