当前位置: 首页 > news >正文

python asyncio的各种用法与代码示例

下面是一份“由浅入深”的 asyncio 实战手册。先解释核心概念,再给出可直接运行的、循序渐进的示例代码。全部示例都只依赖标准库(除少数标注处),适配 Python 3.10+(如使用 TaskGroupasyncio.timeout() 的段落需要 3.11+)。


1. 基础概念速览

  • async def:定义协程函数(coroutine function)。
  • await:在协程中等待另一个可等待对象(协程、asyncio.Taskasyncio.Future 等)。
  • 事件循环(Event Loop):调度协程、I/O 事件的核心;用 asyncio.run(main()) 启动。
  • 并发 ≠ 并行:asyncio 是单线程协作式并发,靠 I/O 等待时让出控制权。
  • 任务(Task):把协程“提交给”事件循环让其并发执行:asyncio.create_task(coro())

2. 最小可运行示例:async/await + asyncio.run

# demo_basic.py
import asyncioasync def io_job(name, delay):print(f"[{name}] start")await asyncio.sleep(delay)  # 模拟I/O等待print(f"[{name}] done after {delay}s")return name, delayasync def main():r1 = await io_job("A", 1)r2 = await io_job("B", 2)  # 串行等待,总耗时约3秒print("results:", r1, r2)if __name__ == "__main__":asyncio.run(main())

3. 并发执行:create_taskgatheras_completed

# demo_concurrency.py
import asyncio, randomasync def fetch(i):d = random.uniform(0.5, 2.0)await asyncio.sleep(d)return f"task-{i}", round(d, 2)async def main():# 3.1 create_task + awaitt1 = asyncio.create_task(fetch(1))t2 = asyncio.create_task(fetch(2))r1 = await t1r2 = await t2print("create_task results:", r1, r2)# 3.2 gather(顺序返回结果;遇异常默认会立刻抛出)results = await asyncio.gather(*(fetch(i) for i in range(3, 8)))print("gather results:", results)# 3.3 as_completed(谁先完成先拿谁)tasks = [asyncio.create_task(fetch(i)) for i in range(8, 13)]for fut in asyncio.as_completed(tasks):print("as_completed:", await fut)if __name__ == "__main__":asyncio.run(main())

4. 超时控制与取消:wait_forasyncio.timeoutTask.cancel

# demo_timeout_cancel.py
import asyncioasync def slow():try:await asyncio.sleep(5)return "ok"except asyncio.CancelledError:print("slow() got cancelled!")raiseasync def main():# 4.1 wait_for(3.8+)try:res = await asyncio.wait_for(slow(), timeout=2)print("result:", res)except asyncio.TimeoutError:print("wait_for timeout!")# 4.2 asyncio.timeout 上下文(3.11+)try:async with asyncio.timeout(2):  # 注:需 Python 3.11+await slow()except TimeoutError:print("context timeout!")# 4.3 手动取消t = asyncio.create_task(slow())await asyncio.sleep(1)t.cancel()try:await texcept asyncio.CancelledError:print("task cancelled confirmed")if __name__ == "__main__":asyncio.run(main())

小贴士:被取消的任务应正确处理 CancelledError,并在需要时做清理(finally)。


5. 限流与同步原语:SemaphoreLockEventCondition

# demo_sync_primitives.py
import asyncio, randomsem = asyncio.Semaphore(3)  # 同时最多3个并发
lock = asyncio.Lock()
evt = asyncio.Event()async def worker(i):async with sem:  # 限制并发await asyncio.sleep(random.uniform(0.2, 1.0))async with lock:  # 保护共享输出(示意)print(f"worker {i} done")async def notifier():await asyncio.sleep(1)evt.set()  # 广播事件async def waiter():print("waiting for event...")await evt.wait()print("event received!")async def main():tasks = [asyncio.create_task(worker(i)) for i in range(10)]tasks += [asyncio.create_task(notifier()), asyncio.create_task(waiter())]await asyncio.gather(*tasks)if __name__ == "__main__":asyncio.run(main())

Condition 适合更复杂的“等待某条件成立”的场景,用法与 threading.Condition 类似(只是换成 async with / await)。


6. 生产者-消费者:asyncio.Queue

# demo_queue.py
import asyncio, randomasync def producer(q: asyncio.Queue):for i in range(10):await asyncio.sleep(random.uniform(0.1, 0.4))await q.put((i, f"data-{i}"))print(f"produced {i}")await q.put(None)  # 结束哨兵async def consumer(q: asyncio.Queue):while True:item = await q.get()if item is None:q.task_done()breaki, data = itemawait asyncio.sleep(0.3)print(f"consumed {i} -> {data}")q.task_done()async def main():q = asyncio.Queue(maxsize=5)prod = asyncio.create_task(producer(q))cons = asyncio.create_task(consumer(q))await asyncio.gather(prod)await q.join()        # 等全部消费完成await cons            # 等消费者退出if __name__ == "__main__":asyncio.run(main())

7. 任务编组(Python 3.11+):asyncio.TaskGroup

# demo_taskgroup.py
import asyncio, randomasync def job(n):await asyncio.sleep(random.uniform(0.2, 1.0))if n == 3:raise RuntimeError("boom at 3")return nasync def main():try:async with asyncio.TaskGroup() as tg:tasks = [tg.create_task(job(i)) for i in range(5)]# 出错会自动取消其余任务并向外传播异常except* RuntimeError as eg:  # PEP 654(ExceptionGroup)print("caught:", eg)if __name__ == "__main__":asyncio.run(main())

对比 gatherTaskGroup 在结构化并发上更可靠,失败会自动收拢与传播。


8. 与线程/同步代码协作:to_threadrun_in_executor

# demo_thread_bridge.py
import asyncio, time, concurrent.futuresdef blocking_io(n):time.sleep(n)return f"blocking {n}s"async def main():# 8.1 Python 3.9+ 推荐:to_threadr1 = await asyncio.to_thread(blocking_io, 1)print("to_thread:", r1)# 8.2 传统:run_in_executorloop = asyncio.get_running_loop()with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:futs = [loop.run_in_executor(pool, blocking_io, i) for i in (1, 2, 1)]for r in await asyncio.gather(*futs):print("executor:", r)if __name__ == "__main__":asyncio.run(main())

原则:CPU 密集就放线程/进程池;I/O 密集await 原生异步接口。


9. TCP/UDP 网络编程(内置 Streams / Protocols)

9.1 TCP Echo(Server & Client,基于 Streams)

# demo_tcp_echo.py
import asyncioasync def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):addr = writer.get_extra_info('peername')print(f"client connected: {addr}")try:while data := await reader.readline():msg = data.decode().rstrip()print(f"recv: {msg}")writer.write((msg + "\n").encode())await writer.drain()except asyncio.CancelledError:raisefinally:writer.close()await writer.wait_closed()print("client closed", addr)async def run_server():server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888)addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)print(f"Serving on {addrs}")async with server:await server.serve_forever()async def run_client():reader, writer = await asyncio.open_connection("127.0.0.1", 8888)for i in range(3):writer.write(f"hello {i}\n".encode())await writer.drain()echo = await reader.readline()print("echo:", echo.decode().rstrip())writer.close()await writer.wait_closed()async def main():server_task = asyncio.create_task(run_server())await asyncio.sleep(0.2)await run_client()server_task.cancel()with contextlib.suppress(asyncio.CancelledError):await server_taskif __name__ == "__main__":import contextlibasyncio.run(main())

9.2 UDP(Datagram)

# demo_udp.py
import asyncioclass EchoServer(asyncio.DatagramProtocol):def datagram_received(self, data, addr):print("server recv:", data, "from", addr)self.transport.sendto(data, addr)async def main():loop = asyncio.get_running_loop()transport, _ = await loop.create_datagram_endpoint(lambda: EchoServer(), local_addr=("127.0.0.1", 9999))# clienton_resp = loop.create_future()class Client(asyncio.DatagramProtocol):def datagram_received(self, data, addr):print("client recv:", data)on_resp.set_result(None)ctransport, _ = await loop.create_datagram_endpoint(lambda: Client(), remote_addr=("127.0.0.1", 9999))ctransport.sendto(b"hello-udp")await on_resptransport.close()ctransport.close()if __name__ == "__main__":asyncio.run(main())

10. 子进程(异步等待):asyncio.create_subprocess_exec

# demo_subprocess.py
import asyncio, sysasync def main():# 跨平台示例:调用 python -c 'print("hi")'proc = await asyncio.create_subprocess_exec(sys.executable, "-c", 'print("hi from child")',stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)out, err = await proc.communicate()print("stdout:", out.decode().strip(), "| code:", proc.returncode)if __name__ == "__main__":asyncio.run(main())

11. 异步上下文管理器与迭代器:async withasync for

# demo_async_with_for.py
import asyncio
from contextlib import asynccontextmanager@asynccontextmanager
async def open_resource():print("acquire resource")await asyncio.sleep(0.2)try:yield "RESOURCE"finally:await asyncio.sleep(0.2)print("release resource")class AsyncCounter:def __init__(self, n): self.n=n; self.i=0def __aiter__(self): return selfasync def __anext__(self):if self.i >= self.n:raise StopAsyncIterationawait asyncio.sleep(0.1)self.i += 1return self.iasync def main():async with open_resource() as r:print("using:", r)async for x in AsyncCounter(5):print("got:", x)if __name__ == "__main__":asyncio.run(main())

12. 超实用模式集

12.1 背压与批处理

# demo_backpressure.py
import asyncio, randomasync def producer(q):for i in range(30):await q.put(i)             # maxsize 限制可形成背压await asyncio.sleep(0.05)await q.put(None)async def consumer(q):batch = []while True:item = await q.get()if item is None:if batch:print("flush batch:", batch)q.task_done()breakbatch.append(item)if len(batch) >= 8:# 模拟批量处理await asyncio.sleep(random.uniform(0.1, 0.3))print("process batch:", batch)batch.clear()q.task_done()async def main():q = asyncio.Queue(maxsize=10)await asyncio.gather(producer(q), consumer(q))await q.join()if __name__ == "__main__":asyncio.run(main())

12.2 幂等重试 + 指数退避

# demo_retry.py
import asyncio, randomasync def fragile_call():await asyncio.sleep(0.1)if random.random() < 0.7:raise RuntimeError("transient")return "ok"async def retry(coro_func, attempts=5, base=0.2):for n in range(attempts):try:return await coro_func()except Exception as e:if n == attempts - 1:raiseawait asyncio.sleep(base * (2 ** n))  # 退避raise RuntimeError("unreachable")async def main():try:r = await retry(fragile_call)print("result:", r)except Exception as e:print("failed:", e)if __name__ == "__main__":asyncio.run(main())

13. 常见坑与最佳实践

  1. 入口统一用 asyncio.run(main()),不要混用早期 API(如手动获取 loop、run_until_complete),除非有特殊需求。
  2. 避免阻塞调用(如 time.sleep()、重 CPU 任务)直接出现在协程里;改用 await asyncio.sleep()asyncio.to_thread()/进程池。
  3. 正确处理取消:在 try/except/finally 中传播 CancelledError,避免吞掉取消导致任务“僵尸化”。
  4. 使用限流:对外部服务/磁盘/网络做 Semaphore、队列背压,保护系统。
  5. 结构化并发:优先考虑 TaskGroup(3.11+)来让“任务生命周期”明确,异常聚合安全。
  6. 日志与超时:给关键 I/O 加 timeout,并使用 asyncio.create_task() 后保存句柄,便于监控和取消。

14. 进阶延伸(可选)

  • 文件异步:标准库没有真正异步文件 I/O;可选三方库 aiofiles(仅示意):

    # pip install aiofiles
    import aiofiles, asyncio
    async def read_file(p):async with aiofiles.open(p, "r", encoding="utf-8") as f:return await f.read()
    
  • HTTP 客户端/服务端:aiohttphttpx[http2] 等第三方库更贴近真实网络场景。

  • 与 GUI/Qt 的集成:可通过 qasync 把 Qt 事件循环与 asyncio 融合(适合你在 QML/PySide6 下需要异步网络/IO 的情况)。

http://www.dtcms.com/a/521805.html

相关文章:

  • 深圳网站营销型建设免费网络电话呼叫系统
  • Linux-基础IO(1)
  • 如何上传网站网站开发价格明细
  • 深圳英文网站制作定西建设厅网站
  • 面向边缘AI视觉系统的低成本硬件方案
  • 医疗网站建设市场网站维护中是怎么回事
  • 网站开发费如何入账课程培训网站建设
  • dw做的网站怎么放到服务器上网站设计应遵循的原则
  • 南宁工程建设网站有哪些网站建设中模板
  • php网站开发用什么工具wordpress 创建分类
  • xml的网站地图织梦制作网站建设7
  • 乌兰察布市建设工程造价网站网站注册域名
  • 北京网站备案要求吗运营实力 网站建设
  • 41.渗透-Kali Linux-工具-Xhydra(爆破攻击)
  • 公众号视频网站开发外国教程网站有哪些
  • seo网站页面f布局免费的网页设计代码模板
  • 制作商城版网站开发手机百度网页版登录入口
  • 搭建网站硬件要求外贸建站 知乎
  • 网站标准字体企业网站开发项目策划书基本框架
  • 从 0 到 1 团队落地仓颉语言:一份可复制的工程化改造与度量驱动实践!
  • 国外域名建站WordPress5分钟建站
  • [Java数据结构与算法] 哈希表(Hash Table)
  • 嘉兴模板建站代理网站广告模板代码
  • 济南网站建设认可搜点网络能容桂做pc端网站
  • 百度手机模板网站软文推广渠道
  • 国外特效网站汕头网站建设制作公司
  • 如何解决 pip install 安装报错 invalid command ‘bdist_wheel’(缺少 wheel)问题
  • 个人网站如何搭建南山做网站公司哪家值得合作
  • 小九源码-springboot097-java付费自习室管理系统
  • 厦门大型服装商城网站建设哪个做企业网站