Python 并发编程全面指南(多线程 多进程 进程池 线程池 协程和异步编程) 队列
Python 并发编程全面指南
多线程、多进程、进程池、线程池、协程与异步区别
多线程(Multithreading)
特点: 多线程是在同一进程中创建多个线程,这些线程共享同一进程的内存空间。每个线程是独立执行的,但它们共享相同的资源,如变量和文件句柄。
适用场景: 适用于 I/O 密集型任务,因为线程可以在等待 I/O 操作时释放 GIL(全局解释器锁)。
缺点: 受 GIL 限制,不适用于 CPU 密集型任务。线程之间需要进行同步和协调,可能涉及到锁和其他同步机制。
多进程(Multiprocessing)
特点: 多进程是在同一计算机上创建多个独立的进程,每个进程都有自己的内存空间。进程之间通过进程间通信(IPC)来进行数据交换。
适用场景: 适用于 CPU 密集型任务,因为每个进程有自己的 GIL,不受全局解释器锁的限制。
缺点: 进程之间的通信复杂,开销较大。不如多线程节省资源,因为每个进程都需要独立的内存空间。
进程池(Multiprocessing Pool)
特点: 进程池是一种使用固定数量的进程来执行任务的机制。每个进程独立执行任务,任务分配给不同的进程并行执行。
适用场景: 适用于需要并行执行多个相似任务的情况,例如批量处理数据。
优点: 可以有效利用多核处理器,不受 GIL 限制。相对于手动创建和管理进程,进程池提供了更高层次的抽象,更容易使用。
线程池(Multithreading Pool)
特点: 线程池是一种使用固定数量的线程来执行任务的机制。每个线程独立执行任务,任务分配给不同的线程并行执行。
适用场景: 适用于需要并行执行多个相似任务的情况,例如批量处理数据。但由于 GIL 的存在,不适用于 CPU 密集型任务。
优点: 相对于手动创建和管理线程,线程池提供了更高层次的抽象,更容易使用。可以在一定程度上缓解 GIL 的影响。
虽然线程池和多线程都涉及到使用多个线程来实现并发,但它们是不同的概念,有一些关键的区别。
多线程:多线程是一种编程模型,指的是在同一进程中创建多个线程,这些线程共享同一进程的内存空间。多线程的目标是通过并发执行来提高程序的性能,但在某些情况下,由于全局解释器锁(GIL)的存在,Python 中的多线程并不能充分利用多核处理器。
线程池:线程池是一种并发编程的机制,它是对多线程的一种组织和管理方式。线程池在应用程序启动时创建一组预先初始化的线程,并将它们放入一个池中,这些线程可以等待执行任务。与手动创建和管理线程相比,线程池提供了更高层次的抽象,使得可以重复使用这些线程来执行多个任务,而不需要频繁地创建和销毁线程,从而降低了线程创建和销毁的开销。
虽然线程池涉及到使用多线程,但线程池更侧重于组织和管理线程的方式,以提高并发执行的效率。多线程是一种更一般的概念,指的是在同一进程中创建多个线程。
协程(Coroutine)
定义: 协程是一种轻量级的并发编程结构,用于在单一线程中实现并发。协程允许在程序执行的特定点挂起和恢复执行,而无需阻塞整个线程。在 Python 中,协程是通过使用 async 和 await 关键字定义的。
特点: 协程允许在执行过程中主动挂起自己,并在稍后的时间点恢复执行。这使得可以在一个线程中交替执行多个协程,提高程序的效率和响应性。
示例:
async def my_coroutine():print("Start Coroutine")await asyncio.sleep(2) # 挂起协程print("Coroutine resumed")
异步编程(Asynchronous Programming)
定义: 异步编程是一种编程模型,用于处理可能引起等待的操作而不阻塞整个程序的执行。异步编程通常与协程一起使用,通过非阻塞的方式处理 I/O 操作,例如读写文件、网络通信等。
特点: 在异步编程中,任务在等待潜在的等待操作时不会阻塞,而是切换到执行其他任务。这样可以提高程序的效率,充分利用等待时的空闲时间。
示例:
import asyncioasync def main():await asyncio.gather(my_coroutine(), my_other_coroutine())
总的来说,协程是一种并发编程的结构,而异步编程是一种处理可能引起等待的操作的编程模型。在异步编程中,协程通常被用作处理异步任务的工具,通过使用异步事件循环(如 asyncio)来协调和执行这些协程。通过使用协程和异步编程,可以实现高效的并发处理,特别适用于处理大量的 I/O 操作。
总体而言,选择多线程、多进程、进程池或线程池取决于具体的应用场景和任务类型。多线程适用于 I/O 密集型任务,多进程适用于 CPU 密集型任务,而进程池和线程池则提供了更高层次的抽象,使得并发编程更加方便。
协程是异步编程的一种实现方式。异步编程是一种广义的概念,而协程是异步编程的一种具体技术。
1. 多线程 (Threading)
多线程适合 I/O 密集型任务,如网络请求、文件读写等。
基本使用
import threading
import timedef print_numbers():for i in range(1, 6):time.sleep(1)print(f"Number: {i}")def print_letters():for letter in ['A', 'B', 'C', 'D', 'E']:time.sleep(1.5)print(f"Letter: {letter}")# 创建线程
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)# 启动线程
t1.start()
t2.start()# 等待线程完成
t1.join()
t2.join()print("所有线程执行完毕")
线程同步与锁
import threading
import timeclass BankAccount:def __init__(self):self.balance = 1000self.lock = threading.Lock()def deposit(self, amount):with self.lock: # 使用锁确保线程安全current_balance = self.balancetime.sleep(0.1) # 模拟处理时间self.balance = current_balance - amountprint(f"取出¥ {amount}, 余额¥: {self.balance}")def perform_transactions(account):for _ in range(5):account.deposit(10)account = BankAccount()
threads = []for _ in range(3):t = threading.Thread(target=perform_transactions, args=(account,))threads.append(t)t.start()for t in threads:t.join()print(f"最终余额¥: {account.balance}")
2. 多进程 (Multiprocessing)
多进程适合 CPU 密集型任务,如图像处理、计算等。
基本使用
import multiprocessing
import time
import osdef calculate_square(numbers):for n in numbers:time.sleep(0.2)print(f"进程 {os.getpid()} 计算: {n} * {n} = {n*n}")def calculate_cube(numbers):for n in numbers:time.sleep(0.2)print(f"进程 {os.getpid()} 计算: {n} * {n} * {n} = {n*n*n}")if __name__ == "__main__":numbers = [2, 3, 4, 5, 6]# 创建进程p1 = multiprocessing.Process(target=calculate_square, args=(numbers,))p2 = multiprocessing.Process(target=calculate_cube, args=(numbers,))# 启动进程p1.start()p2.start()# 等待进程完成p1.join()p2.join()print("所有进程执行完毕")
进程间通信
import multiprocessingdef producer(queue):for i in range(5):item = f"项目 {i}"queue.put(item)print(f"生产: {item}")queue.put(None) # 结束信号def consumer(queue):while True:item = queue.get()if item is None:breakprint(f"消费: {item}")if __name__ == "__main__":queue = multiprocessing.Queue()p1 = multiprocessing.Process(target=producer, args=(queue,))p2 = multiprocessing.Process(target=consumer, args=(queue,))p1.start()p2.start()p1.join()p2.join()print("生产消费完成")
3. 线程池和进程池
使用池可以方便地管理多个线程或进程。
线程池
from concurrent.futures import ThreadPoolExecutor
import time
import randomdef task(name):sleep_time = random.uniform(0.5, 2.0)time.sleep(sleep_time)return f"任务 {name} 完成,耗时 {sleep_time:.2f} 秒"def use_thread_pool():with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务# futures = [executor.submit(task, i) for i in range(10)]# 获取结果# for future in futures:# result = future.result()# print(result)futures = executor.map(task, range(10))for future in futures:print(future)'''两种方法的主要区别如下:submit方法:为每个任务返回一个Future对象可以使用as_completed()按任务完成顺序获取结果更灵活,可以提交不同类型的任务函数和参数组合可以单独处理每个任务的结果和异常map方法:类似于内置的map()函数,但并发执行返回结果的顺序与输入参数的顺序一致,而不是按完成顺序适用于对同一函数应用不同的参数当任何任务抛出异常时,会立即抛出异常并停止处'''if __name__ == "__main__":use_thread_pool()
进程池
from concurrent.futures import ProcessPoolExecutor
import math
import timedef is_prime(n):if n < 2:return Falsefor i in range(2, int(math.sqrt(n)) + 1):if n % i == 0:return Falsereturn Truedef find_primes_in_range(start, end):primes = []for num in range(start, end):if is_prime(num):primes.append(num)return primesdef use_process_pool():ranges = [(1, 1000), (1000, 2000), (2000, 3000), (3000, 4000)]with ProcessPoolExecutor() as executor:# 使用map方法results = executor.map(find_primes_in_range, [r[0] for r in ranges], [r[1] for r in ranges])primes = []for result in results:primes.extend(result)print(f"找到 {len(primes)} 个质数")print(f"前10个质数: {sorted(primes)[:10]}")if __name__ == "__main__":start_time = time.time()use_process_pool()end_time = time.time()print(f"耗时: {end_time - start_time:.2f} 秒")
4. 协程和异步编程 (Asyncio)
协程适合高并发的 I/O 密集型任务。
基本使用
import asyncio
import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"开始时间: {time.strftime('%X')}")# 顺序执行await say_after(1, '你好')await say_after(2, '世界')print(f"结束时间: {time.strftime('%X')}")async def main_concurrent():print(f"开始时间: {time.strftime('%X')}")# 并发执行task1 = asyncio.create_task(say_after(1, '你好'))task2 = asyncio.create_task(say_after(2, '世界'))await task1await task2print(f"结束时间: {time.strftime('%X')}")# 运行顺序执行的例子
print("顺序执行:")
asyncio.run(main())print("\n并发执行:")
asyncio.run(main_concurrent())
异步网络请求
import aiohttp
import asyncio
import timeasync def fetch_url(session, url):async with session.get(url) as response:return await response.text()async def main():urls = ['http://httpbin.org/delay/1','http://httpbin.org/delay/2','http://httpbin.org/delay/3','http://httpbin.org/delay/1','http://httpbin.org/delay/2']async with aiohttp.ClientSession() as session:tasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)for i, result in enumerate(results):print(f"URL {i+1} 响应长度: {len(result)}")if __name__ == "__main__":start_time = time.time()asyncio.run(main())end_time = time.time()print(f"总耗时: {end_time - start_time:.2f} 秒")
5. 队列 (Queue)
队列是并发编程中重要的同步机制。
线程安全队列
import queue
import threading
import time
import randomdef producer(q, id):for i in range(5):item = f"生产者 {id} - 项目 {i}"q.put(item)print(f"生产: {item}")time.sleep(random.uniform(0.1, 0.5))def consumer(q, id):while True:try:item = q.get(timeout=3)print(f"消费者 {id} 消费: {item}")time.sleep(random.uniform(0.2, 0.7))q.task_done()except queue.Empty:print(f"消费者 {id} 超时,退出")breakdef use_queue():q = queue.Queue(maxsize=10)# 创建生产者和消费者producers = []consumers = []for i in range(3):p = threading.Thread(target=producer, args=(q, i))producers.append(p)for i in range(2):c = threading.Thread(target=consumer, args=(q, i))consumers.append(c)# 启动所有线程for p in producers:p.start()for c in consumers:c.start()# 等待生产者完成for p in producers:p.join()# 等待队列清空q.join()# 通知消费者退出for _ in consumers:q.put(None)for c in consumers:c.join()print("所有任务完成")if __name__ == "__main__":use_queue()
优先级队列
import queue
import threading
import timedef worker(q, id):while True:priority, task = q.get()if task is None:q.task_done()breakprint(f"工人 {id} 处理任务: {task} (优先级: {priority})")time.sleep(0.5)q.task_done()def use_priority_queue():q = queue.PriorityQueue()# 添加任务(优先级,任务)tasks = [(3, "低优先级任务1"),(1, "高优先级任务1"),(2, "中优先级任务1"),(1, "高优先级任务2"),(3, "低优先级任务2"),(2, "中优先级任务2")]for task in tasks:q.put(task)# 创建工人线程workers = []for i in range(2):w = threading.Thread(target=worker, args=(q, i))workers.append(w)w.start()# 等待所有任务完成q.join()# 通知工人退出for _ in workers:q.put((0, None))for w in workers:w.join()print("所有任务处理完成")if __name__ == "__main__":use_priority_queue()
6. 综合案例:Web 爬虫
使用多种并发技术实现一个简单的网页爬虫。
import asyncio
import aiohttp
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
import time
from bs4 import BeautifulSoupclass WebCrawler:def __init__(self, max_concurrent=10):self.visited = set()self.to_visit = set()self.max_concurrent = max_concurrentself.session = Noneasync def fetch(self, url):try:async with self.session.get(url, timeout=5) as response:if response.status == 200:return await response.text()else:print(f"错误状态码 {response.status} 对于 {url}")return Noneexcept Exception as e:print(f"获取 {url} 时出错: {e}")return Nonedef parse_links(self, html, base_url):if not html:return []soup = BeautifulSoup(html, 'html.parser')links = []for a_tag in soup.find_all('a', href=True):href = a_tag['href']full_url = urljoin(base_url, href)# 只处理同域的链接if urlparse(full_url).netloc == urlparse(base_url).netloc:links.append(full_url)return linksasync def process_page(self, url):if url in self.visited:returnprint(f"处理: {url}")self.visited.add(url)html = await self.fetch(url)if not html:return# 使用线程池解析HTML(避免阻塞事件循环)with ThreadPoolExecutor() as executor:loop = asyncio.get_event_loop()links = await loop.run_in_executor(executor, self.parse_links, html, url)for link in links:if link not in self.visited and link not in self.to_visit:self.to_visit.add(link)async def crawl(self, start_url, max_pages=20):self.to_visit.add(start_url)self.session = aiohttp.ClientSession()try:while self.to_visit and len(self.visited) < max_pages:# 获取一批URL进行处理batch = list(self.to_visit)[:self.max_concurrent]self.to_visit -= set(batch)# 并发处理URLtasks = [self.process_page(url) for url in batch]await asyncio.gather(*tasks)finally:await self.session.close()print(f"爬取完成! 总共访问了 {len(self.visited)} 个页面")async def main():crawler = WebCrawler(max_concurrent=5)start_time = time.time()await crawler.crawl('http://books.toscrape.com/', max_pages=30)end_time = time.time()print(f"总耗时: {end_time - start_time:.2f} 秒")if __name__ == "__main__":asyncio.run(main())
最后提醒:
- 多线程:适合I/O密集型任务,但受GIL限制,不适合CPU密集型任务
- 多进程:适合CPU密集型任务,可以充分利用多核CPU
- 线程池/进程池:简化并发任务管理,控制并发数量
- 协程/异步编程:适合高并发I/O操作,资源消耗小,性能高
- 队列:线程/进程间通信的重要机制,确保数据安全传递
选择哪种并发方式取决于你的具体需求:
- I/O密集型:优先考虑异步编程或多线程
- CPU密集型:使用多进程
- 需要简单管理:使用线程池/进程池
- 需要任务间通信:使用队列