Python多线程、锁、多进程、异步编程
多线程
import threading
import timedef task(name, delay):print(f"线程 {name} 开始执行")time.sleep(delay)print(f"线程 {name} 执行完成")# 创建线程
thread1 = threading.Thread(target=task, args=("A", 2))
thread2 = threading.Thread(target=task, args=("B", 1))# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()print("所有线程执行完毕")
#线程池
from concurrent.futures import ThreadPoolExecutor
import timedef task(name):print(f"任务 {name} 开始")time.sleep(1)return f"任务 {name} 完成"with ThreadPoolExecutor(max_workers=3) as executor:# 提交多个任务futures = [executor.submit(task, i) for i in range(5)]# 获取结果for future in futures:print(future.result())
锁机制
import threadingcounter = 0
lock = threading.Lock()def increment():global counterfor _ in range(100000):with lock: # 自动获取和释放锁counter += 1threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()print(f"最终计数器值: {counter}") # 应该是500000
生产消费者模型
import threading
import queue
import random
import time# 共享队列
q = queue.Queue(maxsize=5)
condition = threading.Condition()def producer():while True:with condition:if q.full():print("队列已满,生产者等待")condition.wait()item = random.randint(1, 100)q.put(item)print(f"生产者生产了: {item}")condition.notify()time.sleep(random.random())def consumer():while True:with condition:if q.empty():print("队列为空,消费者等待")condition.wait()item = q.get()print(f"消费者消费了: {item}")condition.notify()time.sleep(random.random())# 启动生产者和消费者
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)prod.start()
cons.start()
多进程编程
from multiprocessing import Process
import osdef task(name):print(f"子进程 {name} (PID: {os.getpid()}) 执行")time.sleep(2)if __name__ == '__main__':processes = []for i in range(3):p = Process(target=task, args=(i,))processes.append(p)p.start()for p in processes:p.join()print("所有子进程执行完毕")
#进程池
from multiprocessing import Pool
import timedef square(x):print(f"计算 {x} 的平方")time.sleep(1)return x * xif __name__ == '__main__':with Pool(processes=4) as pool:# 同步执行result = pool.map(square, range(10))print(result)# 异步执行async_result = pool.map_async(square, range(10))print(async_result.get())
#进程间通信
from multiprocessing import Process, Pipedef sender(conn):conn.send("Hello from sender")conn.close()def receiver(conn):msg = conn.recv()print(f"接收到的消息: {msg}")conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p1 = Process(target=sender, args=(child_conn,))p2 = Process(target=receiver, args=(parent_conn,))p1.start()p2.start()p1.join()p2.join()
异步
协程(Coroutine): 异步执行的任务单元
事件循环(Event Loop): 调度和执行协程的核心
Future: 表示异步操作的最终结果
Task: Future的子类,用于包装和管理协程的执行
import asyncioasync def main(): #携程函数定义print('Hello')await asyncio.sleep(1) #异步等待print('World')asyncio.run(main())
创建和运行协程
async def say_after(delay, message):await asyncio.sleep(delay)print(message)async def main():# 顺序执行await say_after(1, 'hello')await say_after(2, 'world')# 并发执行task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))await task1await task2asyncio.run(main())
并发运行多个协程
async def fetch_data(task_id, delay):print(f'Task {task_id} started')await asyncio.sleep(delay)print(f'Task {task_id} completed')return f'result-{task_id}'async def main():# 使用gather并发运行results = await asyncio.gather(fetch_data(1, 2),fetch_data(2, 1),fetch_data(3, 3))print(f'All results: {results}')asyncio.run(main())
异步上下文管理器
class AsyncResource:async def __aenter__(self):print('Acquiring resource')await asyncio.sleep(0.5)return selfasync def __aexit__(self, exc_type, exc, tb):print('Releasing resource')await asyncio.sleep(0.5)async def use_resource():async with AsyncResource() as resource:print('Using resource')await asyncio.sleep(1)asyncio.run(use_resource())
异步迭代器
class AsyncCounter:def __init__(self, stop):self.current = 0self.stop = stopdef __aiter__(self):return selfasync def __anext__(self):if self.current >= self.stop:raise StopAsyncIterationawait asyncio.sleep(0.5)self.current += 1return self.currentasync def main():async for number in AsyncCounter(5):print(number)asyncio.run(main())
异步Http请求
import aiohttpasync def fetch_url(session, url):async with session.get(url) as response:return await response.text()async def main():urls = ['https://www.python.org','https://www.google.com','https://www.github.com']async with aiohttp.ClientSession() as session:tasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)for url, content in zip(urls, results):print(f"{url} -> {len(content)} bytes")asyncio.run(main())
异步数据库访问
import asyncpgasync def fetch_data():conn = await asyncpg.connect(user='user', password='pass',database='db', host='localhost')try:# 执行查询result = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)print(result)finally:await conn.close()asyncio.run(fetch_data())