asyncio.Lock 的使用
asyncio.Lock
是 asyncio 中用于协程间同步的互斥锁,防止多个协程同时访问共享资源。
1. 基本锁示例
import asyncioasync def worker(lock, name, delay):async with lock: # 自动获取和释放锁print(f"{name} 获取了锁")await asyncio.sleep(delay) # 模拟工作print(f"{name} 释放了锁")async def main():lock = asyncio.Lock()# 创建多个并发任务tasks = [worker(lock, "Worker 1", 2),worker(lock, "Worker 2", 1),worker(lock, "Worker 3", 1.5)]await asyncio.gather(*tasks)asyncio.run(main())
2. 手动获取和释放锁 (不推荐)
async def worker_manual(lock, name, delay):await lock.acquire() # 手动获取锁try:print(f"{name} 获取了锁")await asyncio.sleep(delay)print(f"{name} 释放了锁")finally:lock.release() # 确保锁被释放async def main():lock = asyncio.Lock()await asyncio.gather(worker_manual(lock, "Worker A", 1),worker_manual(lock, "Worker B", 1))
实际应用场景
1. 保护共享数据
import asyncioclass BankAccount:def __init__(self, balance):self.balance = balanceself.lock = asyncio.Lock()async def transfer(self, amount, to_account):async with self.lock:# 确保余额检查、扣款、存款是原子操作if self.balance >= amount:await asyncio.sleep(0.1) # 模拟网络延迟self.balance -= amountto_account.balance += amountprint(f"转账 {amount} 成功")return Trueelse:print(f"转账 {amount} 失败,余额不足")return Falseasync def main():account_a = BankAccount(1000)account_b = BankAccount(500)# 并发转账,可能产生竞争条件tasks = [account_a.transfer(200, account_b),account_a.transfer(300, account_b),account_a.transfer(400, account_b),account_a.transfer(500, account_b)]await asyncio.gather(*tasks)print(f"账户A余额: {account_a.balance}")print(f"账户B余额: {account_b.balance}")asyncio.run(main())
2. 限制资源访问
import asyncioclass ResourcePool:def __init__(self, resources):self.resources = resourcesself.lock = asyncio.Lock()async def acquire_resource(self, user):async with self.lock:if self.resources:resource = self.resources.pop()print(f"{user} 获取了资源 {resource}")return resourceelse:print(f"{user} 无法获取资源,池已空")return Noneasync def release_resource(self, resource, user):async with self.lock:self.resources.append(resource)print(f"{user} 释放了资源 {resource}")async def user_task(pool, user_name):resource = await pool.acquire_resource(user_name)if resource:await asyncio.sleep(1) # 使用资源await pool.release_resource(resource, user_name)async def main():pool = ResourcePool(['A', 'B', 'C'])tasks = [user_task(pool, f"User-{i}") for i in range(5)]await asyncio.gather(*tasks)asyncio.run(main())
高级用法
1. 带超时的锁获取
async def worker_with_timeout(lock, name, timeout):try:# 尝试在指定时间内获取锁async with asyncio.timeout(timeout):async with lock:print(f"{name} 成功获取锁")await asyncio.sleep(2)print(f"{name} 完成工作")except asyncio.TimeoutError:print(f"{name} 获取锁超时")async def main():lock = asyncio.Lock()# 第一个任务持有锁 2 秒task1 = asyncio.create_task(worker_with_timeout(lock, "Worker1", 5))# 第二个任务等待锁超时(只有 1 秒)await asyncio.sleep(0.1) # 确保第一个任务先获取锁task2 = asyncio.create_task(worker_with_timeout(lock, "Worker2", 1))await asyncio.gather(task1, task2, return_exceptions=True)asyncio.run(main())
2. 嵌套锁(可重入锁)
import asyncioasync def nested_operations(lock, name):async with lock:print(f"{name} - 外层操作开始")# 在同一个协程中再次获取同一个锁async with lock: # 这会成功,因为 asyncio.Lock 是可重入的print(f"{name} - 内层操作")await asyncio.sleep(1)print(f"{name} - 外层操作结束")async def main():lock = asyncio.Lock()await asyncio.gather(nested_operations(lock, "Task1"),nested_operations(lock, "Task2"))asyncio.run(main())