当前位置: 首页 > wzjs >正文

webstation做网站seo外贸推广

webstation做网站,seo外贸推广,蒙山县网站建设,龙岗网站设计效果本文介绍如何在 异步协程 中使用异步数据库操作,结合 aiomysql 库和异步连接池实现轻量级的异步数据库封装,适用于 asyncio 协程和异步编程。 1. 项目背景 在 Python 的异步编程中,aiomysql 是一款非常流行的异步 MySQL 数据库驱动。通过结…

本文介绍如何在 异步协程 中使用异步数据库操作,结合 aiomysql 库和异步连接池实现轻量级的异步数据库封装,适用于 asyncio 协程和异步编程。

1. 项目背景

在 Python 的异步编程中,aiomysql 是一款非常流行的异步 MySQL 数据库驱动。通过结合 asyncio 库和 aiomysql,可以高效地处理并发数据库操作。

在大规模并发系统中,数据库操作的效率往往成为瓶颈。为了优化数据库性能和实现高并发,本教程将通过以下内容进行演示:

  • 使用异步连接池
  • 通过异步协程执行数据库操作
  • 处理基本的数据库增删改查(CRUD)操作

2. 安装依赖

首先,确保你已安装必要的库。使用以下命令来安装:

pip install aiomysql

3. 基于 aiomysql 封装异步连接池

我们将封装一个异步数据库连接池,供后续操作调用。以下是 AsyncConnection 类的实现,它使用 aiomysql 创建连接池,并提供查询、插入、更新等操作。

代码实现

import asyncio
import aiomysql
import logging
import tracebackclass AsyncConnection:"""基于 aiomysql 的异步数据库连接封装"""def __init__(self, host, database, user=None, password=None,port=3306, connect_timeout=10, charset="utf8mb4",sql_mode="TRADITIONAL"):self.host = hostself.database = databaseself.user = userself.password = passwordself.port = portself.connect_timeout = connect_timeoutself.charset = charsetself.sql_mode = sql_modeself._pool = Noneasync def connect(self):"""建立连接池"""self._pool = await aiomysql.create_pool(host=self.host,port=self.port,user=self.user,password=self.password,db=self.database,charset=self.charset,autocommit=True,connect_timeout=self.connect_timeout,sql_mode=self.sql_mode,cursorclass=aiomysql.DictCursor)async def close(self):"""关闭连接池"""if self._pool:self._pool.close()await self._pool.wait_closed()async def query(self, sql, *args):"""执行查询,返回多行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)result = await cur.fetchall()return resultasync def get(self, sql, *args):"""执行查询,返回一行"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute(sql, args)return await cur.fetchone()async def execute(self, sql, *args):"""执行写操作,返回最后插入的ID"""async with self._pool.acquire() as conn:async with conn.cursor() as cur:try:await cur.execute(sql, args)return cur.lastrowidexcept aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("忽略重复插入: %s", e)else:logging.error("SQL执行失败: %s\nSQL: %s", e, sql)raise## =============== 高级封装 ===================async def table_has(self, table_name, field, value):"""检查表中是否有对应字段值"""sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return await self.get(sql, value)async def table_insert(self, table_name, item: dict):"""插入一条数据"""fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return await self.execute(sql, *values)except aiomysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重复插入被跳过")else:logging.error("插入数据出错: %s\n数据: %s", e, item)raiseasync def table_update(self, table_name, updates: dict, field_where: str, value_where):"""更新一条记录"""set_clause = ', '.join([f"{k}=%s" for k in updates.keys()])values = list(updates.values())values.append(value_where)sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"await self.execute(sql, *values)

4. 多协程并发执行数据库操作

接下来,我们使用 asyncio.gather 来并发执行多个异步数据库操作,每个协程模拟一个用户执行数据库增删改查(CRUD)操作。

完整的多协程测试代码

import asyncio
import logging
from ezpymysql_async import AsyncConnection# 日志配置
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")# 初始化连接
db = AsyncConnection(host='localhost',database='test_db',user='root',password='5201314',port=3306
)async def run_user_task(user_id):name = f"User-{user_id}"age = 20 + user_idtry:# 1. 插入数据await db.table_insert("test_table", {"name": name, "age": age})logging.info(f"[{name}] 插入成功")# 2. 查询所有all_data = await db.query("SELECT * FROM test_table")logging.info(f"[{name}] 当前所有数据行数: {len(all_data)}")# 3. 查询单条one = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 查询单条: {one}")# 4. 更新await db.table_update("test_table", {"age": age + 10}, "name", name)logging.info(f"[{name}] 年龄更新完成")# 5. 再查询updated = await db.get("SELECT * FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 更新后: {updated}")# 6. 检查存在exists = await db.table_has("test_table", "name", name)logging.info(f"[{name}] 是否存在: {exists is not None}")# 7. 删除await db.execute("DELETE FROM test_table WHERE name=%s", name)logging.info(f"[{name}] 删除成功")# 8. 再插入并测试 lastrowidlast_id = await db.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", f"{name}_new", age)logging.info(f"[{name}] 插入新数据成功, ID: {last_id}")except Exception as e:logging.error(f"[{name}] 出现异常: {e}")async def main():# 建立连接池await db.connect()# 创建多个协程任务tasks = [run_user_task(i) for i in range(100)]# 并发执行任务await asyncio.gather(*tasks)# 关闭连接池await db.close()logging.info("所有协程任务已完成")if __name__ == "__main__":asyncio.run(main())

5. 结果与总结

通过上述的异步数据库封装和多协程测试,我们能够:

  • 高效地进行数据库操作,避免了阻塞。
  • 实现了多线程并发数据库操作,提升了数据库的访问性能。
  • 灵活地使用 Python 的 asyncio 协程处理大量并发请求。

输出示例

在这里插入图片描述


通过本文,你可以将这种异步数据库操作封装到任何异步应用中,无论是 Web 开发还是爬虫等领域,都能有效提高数据库操作的性能。


如果你对本篇教程有任何疑问或建议,欢迎留言讨论!

http://www.dtcms.com/wzjs/1941.html

相关文章:

  • discuz wordpress 选择沈阳网络优化培训
  • 国外域名网站推荐东莞seo排名优化
  • 用服务器做网站关键词排名查询
  • 天河网站设计如何在网络上推广产品
  • 做网站成品移投界seo
  • 网站排名套餐蚂蚁链接bt链接
  • 制作旅游网站网页的代码百度指数下载app
  • 全国工程建设信息平台宁波网站推广优化外包
  • 程序员是不是都是做网站的做百度推广多少钱
  • 网站换服务器要怎么做百度投诉中心24人工
  • 怎么做网站用于推广怎样上百度做广告
  • 开发app怎么盈利seo网站推广专员
  • 郑州网站设计培训百度新闻头条新闻
  • 网站假设公司排名网络营销项目策划
  • 双鸭山网站开发设计网站
  • 做教师知识网站有哪些内容谷歌seo怎么优化
  • jsp简述网站开发流程图爱站网关键词长尾挖掘
  • 毕业设计做网站选题国家域名注册服务网
  • wordpress禁止缩略图百度点击优化
  • 上海市城乡建设和管理委员会网站怎样推广公司的网站
  • 国内房地产设计网站建设重庆发布的最新消息今天
  • 高品质的网站开发公电商运营一天都干啥
  • 建网站找那家企业好互联网营销工具
  • 高手优化网站优化seo厂家
  • 360浏览器怎么创建网页seo关键词如何布局
  • 做网站遇到竞争对手怎么办百度软件商店下载安装
  • 移动端页面长沙网站托管seo优化公司
  • 郑州疫情严重程度百度seo在哪里
  • 影视网站建设方案北京seo推广外包
  • 网站建设与管理维护 李建青专业培训seo的机构