python asyncio的各种用法与代码示例
下面是一份“由浅入深”的 asyncio 实战手册。先解释核心概念,再给出可直接运行的、循序渐进的示例代码。全部示例都只依赖标准库(除少数标注处),适配 Python 3.10+(如使用 TaskGroup、asyncio.timeout() 的段落需要 3.11+)。
1. 基础概念速览
async def:定义协程函数(coroutine function)。await:在协程中等待另一个可等待对象(协程、asyncio.Task、asyncio.Future等)。- 事件循环(Event Loop):调度协程、I/O 事件的核心;用
asyncio.run(main())启动。 - 并发 ≠ 并行:asyncio 是单线程协作式并发,靠 I/O 等待时让出控制权。
- 任务(Task):把协程“提交给”事件循环让其并发执行:
asyncio.create_task(coro())。
2. 最小可运行示例:async/await + asyncio.run
# demo_basic.py
import asyncioasync def io_job(name, delay):print(f"[{name}] start")await asyncio.sleep(delay) # 模拟I/O等待print(f"[{name}] done after {delay}s")return name, delayasync def main():r1 = await io_job("A", 1)r2 = await io_job("B", 2) # 串行等待,总耗时约3秒print("results:", r1, r2)if __name__ == "__main__":asyncio.run(main())
3. 并发执行:create_task、gather、as_completed
# demo_concurrency.py
import asyncio, randomasync def fetch(i):d = random.uniform(0.5, 2.0)await asyncio.sleep(d)return f"task-{i}", round(d, 2)async def main():# 3.1 create_task + awaitt1 = asyncio.create_task(fetch(1))t2 = asyncio.create_task(fetch(2))r1 = await t1r2 = await t2print("create_task results:", r1, r2)# 3.2 gather(顺序返回结果;遇异常默认会立刻抛出)results = await asyncio.gather(*(fetch(i) for i in range(3, 8)))print("gather results:", results)# 3.3 as_completed(谁先完成先拿谁)tasks = [asyncio.create_task(fetch(i)) for i in range(8, 13)]for fut in asyncio.as_completed(tasks):print("as_completed:", await fut)if __name__ == "__main__":asyncio.run(main())
4. 超时控制与取消:wait_for、asyncio.timeout、Task.cancel
# demo_timeout_cancel.py
import asyncioasync def slow():try:await asyncio.sleep(5)return "ok"except asyncio.CancelledError:print("slow() got cancelled!")raiseasync def main():# 4.1 wait_for(3.8+)try:res = await asyncio.wait_for(slow(), timeout=2)print("result:", res)except asyncio.TimeoutError:print("wait_for timeout!")# 4.2 asyncio.timeout 上下文(3.11+)try:async with asyncio.timeout(2): # 注:需 Python 3.11+await slow()except TimeoutError:print("context timeout!")# 4.3 手动取消t = asyncio.create_task(slow())await asyncio.sleep(1)t.cancel()try:await texcept asyncio.CancelledError:print("task cancelled confirmed")if __name__ == "__main__":asyncio.run(main())
小贴士:被取消的任务应正确处理
CancelledError,并在需要时做清理(finally)。
5. 限流与同步原语:Semaphore、Lock、Event、Condition
# demo_sync_primitives.py
import asyncio, randomsem = asyncio.Semaphore(3) # 同时最多3个并发
lock = asyncio.Lock()
evt = asyncio.Event()async def worker(i):async with sem: # 限制并发await asyncio.sleep(random.uniform(0.2, 1.0))async with lock: # 保护共享输出(示意)print(f"worker {i} done")async def notifier():await asyncio.sleep(1)evt.set() # 广播事件async def waiter():print("waiting for event...")await evt.wait()print("event received!")async def main():tasks = [asyncio.create_task(worker(i)) for i in range(10)]tasks += [asyncio.create_task(notifier()), asyncio.create_task(waiter())]await asyncio.gather(*tasks)if __name__ == "__main__":asyncio.run(main())
Condition适合更复杂的“等待某条件成立”的场景,用法与threading.Condition类似(只是换成async with/await)。
6. 生产者-消费者:asyncio.Queue
# demo_queue.py
import asyncio, randomasync def producer(q: asyncio.Queue):for i in range(10):await asyncio.sleep(random.uniform(0.1, 0.4))await q.put((i, f"data-{i}"))print(f"produced {i}")await q.put(None) # 结束哨兵async def consumer(q: asyncio.Queue):while True:item = await q.get()if item is None:q.task_done()breaki, data = itemawait asyncio.sleep(0.3)print(f"consumed {i} -> {data}")q.task_done()async def main():q = asyncio.Queue(maxsize=5)prod = asyncio.create_task(producer(q))cons = asyncio.create_task(consumer(q))await asyncio.gather(prod)await q.join() # 等全部消费完成await cons # 等消费者退出if __name__ == "__main__":asyncio.run(main())
7. 任务编组(Python 3.11+):asyncio.TaskGroup
# demo_taskgroup.py
import asyncio, randomasync def job(n):await asyncio.sleep(random.uniform(0.2, 1.0))if n == 3:raise RuntimeError("boom at 3")return nasync def main():try:async with asyncio.TaskGroup() as tg:tasks = [tg.create_task(job(i)) for i in range(5)]# 出错会自动取消其余任务并向外传播异常except* RuntimeError as eg: # PEP 654(ExceptionGroup)print("caught:", eg)if __name__ == "__main__":asyncio.run(main())
对比
gather:TaskGroup在结构化并发上更可靠,失败会自动收拢与传播。
8. 与线程/同步代码协作:to_thread、run_in_executor
# demo_thread_bridge.py
import asyncio, time, concurrent.futuresdef blocking_io(n):time.sleep(n)return f"blocking {n}s"async def main():# 8.1 Python 3.9+ 推荐:to_threadr1 = await asyncio.to_thread(blocking_io, 1)print("to_thread:", r1)# 8.2 传统:run_in_executorloop = asyncio.get_running_loop()with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:futs = [loop.run_in_executor(pool, blocking_io, i) for i in (1, 2, 1)]for r in await asyncio.gather(*futs):print("executor:", r)if __name__ == "__main__":asyncio.run(main())
原则:CPU 密集就放线程/进程池;I/O 密集用
await原生异步接口。
9. TCP/UDP 网络编程(内置 Streams / Protocols)
9.1 TCP Echo(Server & Client,基于 Streams)
# demo_tcp_echo.py
import asyncioasync def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):addr = writer.get_extra_info('peername')print(f"client connected: {addr}")try:while data := await reader.readline():msg = data.decode().rstrip()print(f"recv: {msg}")writer.write((msg + "\n").encode())await writer.drain()except asyncio.CancelledError:raisefinally:writer.close()await writer.wait_closed()print("client closed", addr)async def run_server():server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888)addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)print(f"Serving on {addrs}")async with server:await server.serve_forever()async def run_client():reader, writer = await asyncio.open_connection("127.0.0.1", 8888)for i in range(3):writer.write(f"hello {i}\n".encode())await writer.drain()echo = await reader.readline()print("echo:", echo.decode().rstrip())writer.close()await writer.wait_closed()async def main():server_task = asyncio.create_task(run_server())await asyncio.sleep(0.2)await run_client()server_task.cancel()with contextlib.suppress(asyncio.CancelledError):await server_taskif __name__ == "__main__":import contextlibasyncio.run(main())
9.2 UDP(Datagram)
# demo_udp.py
import asyncioclass EchoServer(asyncio.DatagramProtocol):def datagram_received(self, data, addr):print("server recv:", data, "from", addr)self.transport.sendto(data, addr)async def main():loop = asyncio.get_running_loop()transport, _ = await loop.create_datagram_endpoint(lambda: EchoServer(), local_addr=("127.0.0.1", 9999))# clienton_resp = loop.create_future()class Client(asyncio.DatagramProtocol):def datagram_received(self, data, addr):print("client recv:", data)on_resp.set_result(None)ctransport, _ = await loop.create_datagram_endpoint(lambda: Client(), remote_addr=("127.0.0.1", 9999))ctransport.sendto(b"hello-udp")await on_resptransport.close()ctransport.close()if __name__ == "__main__":asyncio.run(main())
10. 子进程(异步等待):asyncio.create_subprocess_exec
# demo_subprocess.py
import asyncio, sysasync def main():# 跨平台示例:调用 python -c 'print("hi")'proc = await asyncio.create_subprocess_exec(sys.executable, "-c", 'print("hi from child")',stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)out, err = await proc.communicate()print("stdout:", out.decode().strip(), "| code:", proc.returncode)if __name__ == "__main__":asyncio.run(main())
11. 异步上下文管理器与迭代器:async with、async for
# demo_async_with_for.py
import asyncio
from contextlib import asynccontextmanager@asynccontextmanager
async def open_resource():print("acquire resource")await asyncio.sleep(0.2)try:yield "RESOURCE"finally:await asyncio.sleep(0.2)print("release resource")class AsyncCounter:def __init__(self, n): self.n=n; self.i=0def __aiter__(self): return selfasync def __anext__(self):if self.i >= self.n:raise StopAsyncIterationawait asyncio.sleep(0.1)self.i += 1return self.iasync def main():async with open_resource() as r:print("using:", r)async for x in AsyncCounter(5):print("got:", x)if __name__ == "__main__":asyncio.run(main())
12. 超实用模式集
12.1 背压与批处理
# demo_backpressure.py
import asyncio, randomasync def producer(q):for i in range(30):await q.put(i) # maxsize 限制可形成背压await asyncio.sleep(0.05)await q.put(None)async def consumer(q):batch = []while True:item = await q.get()if item is None:if batch:print("flush batch:", batch)q.task_done()breakbatch.append(item)if len(batch) >= 8:# 模拟批量处理await asyncio.sleep(random.uniform(0.1, 0.3))print("process batch:", batch)batch.clear()q.task_done()async def main():q = asyncio.Queue(maxsize=10)await asyncio.gather(producer(q), consumer(q))await q.join()if __name__ == "__main__":asyncio.run(main())
12.2 幂等重试 + 指数退避
# demo_retry.py
import asyncio, randomasync def fragile_call():await asyncio.sleep(0.1)if random.random() < 0.7:raise RuntimeError("transient")return "ok"async def retry(coro_func, attempts=5, base=0.2):for n in range(attempts):try:return await coro_func()except Exception as e:if n == attempts - 1:raiseawait asyncio.sleep(base * (2 ** n)) # 退避raise RuntimeError("unreachable")async def main():try:r = await retry(fragile_call)print("result:", r)except Exception as e:print("failed:", e)if __name__ == "__main__":asyncio.run(main())
13. 常见坑与最佳实践
- 入口统一用
asyncio.run(main()),不要混用早期 API(如手动获取 loop、run_until_complete),除非有特殊需求。 - 避免阻塞调用(如
time.sleep()、重 CPU 任务)直接出现在协程里;改用await asyncio.sleep()或asyncio.to_thread()/进程池。 - 正确处理取消:在
try/except/finally中传播CancelledError,避免吞掉取消导致任务“僵尸化”。 - 使用限流:对外部服务/磁盘/网络做
Semaphore、队列背压,保护系统。 - 结构化并发:优先考虑
TaskGroup(3.11+)来让“任务生命周期”明确,异常聚合安全。 - 日志与超时:给关键 I/O 加
timeout,并使用asyncio.create_task()后保存句柄,便于监控和取消。
14. 进阶延伸(可选)
-
文件异步:标准库没有真正异步文件 I/O;可选三方库
aiofiles(仅示意):# pip install aiofiles import aiofiles, asyncio async def read_file(p):async with aiofiles.open(p, "r", encoding="utf-8") as f:return await f.read() -
HTTP 客户端/服务端:
aiohttp、httpx[http2]等第三方库更贴近真实网络场景。 -
与 GUI/Qt 的集成:可通过
qasync把 Qt 事件循环与 asyncio 融合(适合你在 QML/PySide6 下需要异步网络/IO 的情况)。
