当前位置: 首页 > wzjs >正文

webstation做网站cdq百度指数

webstation做网站,cdq百度指数,重庆做网站建设,山东做网站的公司本文介绍如何在 异步协程 中使用异步数据库操作,结合 aiomysql 库和异步连接池实现轻量级的异步数据库封装,适用于 asyncio 协程和异步编程。 1. 项目背景 在 Python 的异步编程中,aiomysql 是一款非常流行的异步 MySQL 数据库驱动。通过结…

本文介绍如何在 异步协程 中使用异步数据库操作,结合 aiomysql 库和异步连接池实现轻量级的异步数据库封装,适用于 asyncio 协程和异步编程。

1. 项目背景

在 Python 的异步编程中,aiomysql 是一款非常流行的异步 MySQL 数据库驱动。通过结合 asyncio 库和 aiomysql,可以高效地处理并发数据库操作。

在大规模并发系统中,数据库操作的效率往往成为瓶颈。为了优化数据库性能和实现高并发,本教程将通过以下内容进行演示:

  • 使用异步连接池
  • 通过异步协程执行数据库操作
  • 处理基本的数据库增删改查(CRUD)操作

2. 安装依赖

首先,确保你已安装必要的库。使用以下命令来安装:

pip install aiomysql

3. 基于 aiomysql 封装异步连接池

我们将封装一个异步数据库连接池,供后续操作调用。以下是 AsyncConnection 类的实现,它使用 aiomysql 创建连接池,并提供查询、插入、更新等操作。

代码实现

import asyncio
import aiomysql
import logging
import tracebackclass AsyncConnection:"""基于 aiomysql 的异步数据库连接封装"""def __init__(self, host, database, user=None, password=None,port=3306, connect_timeout=10, charset="utf8mb4",sql_mode="TRADITIONAL"):self.host = hostself.database = databaseself.user = userself.password = passwordself.port = portself.connect_timeout = connect_timeoutself.charset = charsetself.sql_mode = sql_modeself._pool = Noneasync def connect(self):"""建立连接池"""self._pool = await aiomysql.create_pool(host=self.host,port=self.port,user=self.user,password=self.password,db=self.database,charset=self.charset,autocommit=True,connect_timeout=self.connect_timeout,sql_mode=self.sql_mode,cursorclass=aiomysql.DictCursor)async def close(self):"""关闭连接池"""if self._pool:self._pool.close()await self._pool.wait_closed()async def query(self, sql, *args):"""执行查询,返回多行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)result = await cur.fetchall()return resultasync def get(self, sql, *args):"""执行查询,返回一行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)return await cur.fetchone()async def execute(self, sql, *args):"""执行写操作,返回最后插入的ID"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:try:await cur.execute(sql, args)return cur.lastrowidexcept aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("忽略重复插入: %s", e)else:logging.error("SQL执行失败: %s\nSQL: %s", e, sql)raise## =============== 高级封装 ===================async def table_has(self, table_name, field, value):"""检查表中是否有对应字段值"""sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return await self.get(sql, value)async def table_insert(self, table_name, item: dict):"""插入一条数据"""fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return await self.execute(sql, *values)except aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重复插入被跳过")else:logging.error("插入数据出错: %s\n数据: %s", e, item)raiseasync def table_update(self, table_name, updates: dict, field_where: str, value_where):"""更新一条记录"""set_clause = ', '.join([f"{k}=%s" for k in updates.keys()])values = list(updates.values())values.append(value_where)sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"await self.execute(sql, *values)

4. 多协程并发执行数据库操作

接下来,我们使用 asyncio.gather 来并发执行多个异步数据库操作,每个协程模拟一个用户执行数据库增删改查(CRUD)操作。

完整的多协程测试代码

import asyncio
import logging
from ezpymysql_async import AsyncConnection# 日志配置
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")# 初始化连接
db = AsyncConnection(host='localhost',database='test_db',user='root',password='5201314',port=3306
)async def run_user_task(user_id):name = f"User-{user_id}"age = 20 + user_idtry:# 1. 插入数据await db.table_insert("test_table", {"name": name, "age": age})logging.info(f"[{name}] 插入成功")# 2. 查询所有all_data = await db.query("SELECT * FROM test_table")logging.info(f"[{name}] 当前所有数据行数: {len(all_data)}")# 3. 查询单条one = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 查询单条: {one}")# 4. 更新await db.table_update("test_table", {"age": age + 10}, "name", name)logging.info(f"[{name}] 年龄更新完成")# 5. 再查询updated = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 更新后: {updated}")# 6. 检查存在exists = await db.table_has("test_table", "name", name)logging.info(f"[{name}] 是否存在: {exists is not None}")# 7. 删除await db.execute("DELETE FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 删除成功")# 8. 再插入并测试 lastrowidlast_id = await db.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", f"{name}_new", age)logging.info(f"[{name}] 插入新数据成功, ID: {last_id}")except Exception as e:logging.error(f"[{name}] 出现异常: {e}")async def main():# 建立连接池await db.connect()# 创建多个协程任务tasks = [run_user_task(i) for i in range(100)]# 并发执行任务await asyncio.gather(*tasks)# 关闭连接池await db.close()logging.info("所有协程任务已完成")if __name__ == "__main__":asyncio.run(main())

5. 结果与总结

通过上述的异步数据库封装和多协程测试,我们能够:

  • 高效地进行数据库操作,避免了阻塞。
  • 实现了多线程并发数据库操作,提升了数据库的访问性能。
  • 灵活地使用 Python 的 asyncio 协程处理大量并发请求。

输出示例

在这里插入图片描述


通过本文,你可以将这种异步数据库操作封装到任何异步应用中,无论是 Web 开发还是爬虫等领域,都能有效提高数据库操作的性能。


如果你对本篇教程有任何疑问或建议,欢迎留言讨论!

http://www.dtcms.com/wzjs/33873.html

相关文章:

  • 培训机构活动策划网站代码优化
  • 大连制作网站成人大专
  • 网站开发常用的前端框架百度热搜榜今日头条排名
  • 手机企业网站制作流程如何在百度上做广告宣传
  • 网站全景图怎么做天津优化公司哪家好
  • 乐陵网站开发网络营销发展现状与趋势
  • 做网站什么内容吸引人现在感染症状有哪些
  • 成都网站建设价格表百度竞价排名查询网站
  • 石家庄做网站哪家公司好国家市场监管总局官网
  • 云南手工活外发加工网东莞网站推广及优化
  • 网站备案表格下载网站排名优化+o+m
  • 昆明市住房和城乡建设局网站上看的app优化
  • 公司做网站属于什么费用站长工具的使用seo综合查询排名
  • 网站设计平台 动易seo推广优化外包公司
  • ps做的网站如何转入dw上海有什么seo公司
  • 网上有哪些正规赚钱的平台优化的含义
  • 家电维修品牌网站建设网站关键词优化外包
  • 鄂州政府网站制作企业网站
  • 怎么用建站系统建网站广州seo外包多少钱
  • 懂做网站的人就是好今日油价92汽油
  • 咨询网站建设长沙百度快照优化排名
  • 网站上有什么作用百度一下网页打开
  • 做聊天网站的视频教程seo是什么车
  • 自己网站做问卷调查问卷成免费crm特色
  • WordPress文章相册插件关键词排名优化报价
  • 关于企业网站建设的提案数据网站
  • 烟台做网站建设电话seo百度关键字优化
  • wordpress自定义额外css背景新乡seo顾问
  • wordpress 右边栏seo1视频发布会
  • 广州建委网站最好用的免费建站