Python 第二十六节 多线程应用详细介绍及使用注意事项
前言
本文详细介绍了Python中的多线程编程方法。主要内容包括:多线程基础概念(线程与进程的区别、线程类型)、threading模块的使用方法(创建线程、继承Thread类、线程池),以及线程同步机制(Lock、Semaphore、Event、Condition)。通过代码示例展示了如何实现线程创建、线程池管理和线程同步,解决并发编程中的资源共享问题。文章为Python开发者提供了全面的多线程编程指南,帮助实现高效的并发程序。
1. 多线程基础概念
1.1 什么是多线程
多线程允许在单个程序中同时运行多个线程,每个线程可以执行不同的任务,共享相同的内存空间。
1.2 线程 vs 进程
进程:独立的执行环境,有自己独立的内存空间
线程:轻量级进程,共享相同的内存空间,创建和切换成本更低
线程可以分为
内核线程:由操作系统内核创建和撤销
用户线程:不需要内核支持而在用户程序中实现的线程。
python3 线程中常用的两个模块为
_thread(已废弃)
threading(推荐使用)
threading库中常用的方法:
threading.current_thread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的列表。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.active_count(): 返回正在运行的线程数量,与 len(threading.enumerate()) 有相同的结果。
threading.Thread(target, args=(), kwargs={}, daemon=None):
1、创建Thread类的实例。
2、target:线程将要执行的目标函数。
3、args:目标函数的参数,以元组形式传递。
4、kwargs:目标函数的关键字参数,以字典形式传递。
5、daemon:指定线程是否为守护线程。
2. Python 多线程实现方法
2.1 使用 threading 模块
基本线程创建
import threading
import timedef print_numbers():for i in range(5):time.sleep(1)print(f"Number: {i}")def print_letters():for letter in ['A', 'B', 'C', 'D', 'E']:time.sleep(1)print(f"Letter: {letter}")# 创建线程
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)# 启动线程
t1.start()
t2.start()# 等待线程完成
t1.join()
t2.join()print("所有线程执行完毕")
继承 Thread 类
import threadingclass MyThread(threading.Thread):def __init__(self, name, delay):super().__init__()self.name = nameself.delay = delaydef run(self):print(f"线程 {self.name} 开始执行")for i in range(5):time.sleep(self.delay)print(f"线程 {self.name}: {i}")print(f"线程 {self.name} 执行完毕")# 使用自定义线程类
t1 = MyThread("Thread-1", 1)
t2 = MyThread("Thread-2", 0.5)t1.start()
t2.start()
t1.join()
t2.join()
2.2 线程池的使用
from concurrent.futures import ThreadPoolExecutor
import timedef task(name, duration):print(f"任务 {name} 开始")time.sleep(duration)print(f"任务 {name} 完成")return f"任务 {name} 结果"# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务future1 = executor.submit(task, "A", 2)future2 = executor.submit(task, "B", 1)future3 = executor.submit(task, "C", 3)# 获取结果results = [future1.result(), future2.result(), future3.result()]print(f"所有任务结果: {results}")# 使用 map 方法
with ThreadPoolExecutor(max_workers=2) as executor:tasks = [("任务1", 2), ("任务2", 1), ("任务3", 3)]results = list(executor.map(lambda x: task(*x), tasks))print(results)
3. 线程同步机制
3.1 锁 (Lock)
import threadingclass BankAccount:def __init__(self, initial_balance=0):self.balance = initial_balanceself.lock = threading.Lock()def deposit(self, amount):with self.lock: # 自动获取和释放锁old_balance = self.balanceself.balance += amountprint(f"存款: {amount}, 旧余额: {old_balance}, 新余额: {self.balance}")def withdraw(self, amount):with self.lock:if self.balance >= amount:old_balance = self.balanceself.balance -= amountprint(f"取款: {amount}, 旧余额: {old_balance}, 新余额: {self.balance}")return Truereturn False# 测试线程安全的银行账户
account = BankAccount(1000)def customer_operations(customer_id):for _ in range(3):account.deposit(100)account.withdraw(50)threads = []
for i in range(5):t = threading.Thread(target=customer_operations, args=(i,))threads.append(t)t.start()for t in threads:t.join()print(f"最终余额: {account.balance}")
3.2 信号量 (Semaphore)
import threading
import time
import random# 限制同时访问资源的线程数量
semaphore = threading.Semaphore(3) # 最多3个线程同时访问def access_resource(thread_id):print(f"线程 {thread_id} 等待访问资源")with semaphore:print(f"线程 {thread_id} 获得访问权限")time.sleep(random.uniform(1, 3))print(f"线程 {thread_id} 释放资源")threads = []
for i in range(10):t = threading.Thread(target=access_resource, args=(i,))threads.append(t)t.start()for t in threads:t.join()
3.3 事件 (Event)
import threading
import time# 事件用于线程间通信
event = threading.Event()def waiter():print("等待者: 等待事件发生")event.wait() # 阻塞直到事件被设置print("等待者: 事件已发生,继续执行")def setter():print("设置者: 正在处理任务")time.sleep(3)print("设置者: 设置事件")event.set() # 设置事件,唤醒所有等待的线程t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)t1.start()
t2.start()t1.join()
t2.join()
3.4 条件变量 (Condition)
import threading
import time
import collectionsclass BoundedBuffer:def __init__(self, capacity):self.capacity = capacityself.buffer = collections.deque()self.condition = threading.Condition()def produce(self, item):with self.condition:while len(self.buffer) >= self.capacity:print("缓冲区已满,生产者等待")self.condition.wait()self.buffer.append(item)print(f"生产: {item}, 缓冲区大小: {len(self.buffer)}")self.condition.notify_all() # 通知消费者def consume(self):with self.condition:while len(self.buffer) == 0:print("缓冲区为空,消费者等待")self.condition.wait()item = self.buffer.popleft()print(f"消费: {item}, 缓冲区大小: {len(self.buffer)}")self.condition.notify_all() # 通知生产者return item# 生产者-消费者示例
buffer = BoundedBuffer(5)def producer():for i in range(10):buffer.produce(i)time.sleep(0.5)def consumer():for i in range(10):buffer.consume()time.sleep(1)t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)t1.start()
t2.start()
t1.join()
t2.join()
4. 实际应用案例
4.1 网页爬虫
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
import timeclass WebCrawler:def __init__(self, max_workers=5):self.visited = set()self.lock = threading.Lock()self.max_workers = max_workersdef crawl_page(self, url):with self.lock:if url in self.visited:return []self.visited.add(url)try:print(f"爬取: {url}")# 模拟网络请求time.sleep(1)# 这里应该是实际的 requests.get(url)# response = requests.get(url, timeout=5)# 模拟找到的链接fake_links = [f"{url}/page{i}" for i in range(3) if f"{url}/page{i}" not in self.visited]return fake_linksexcept Exception as e:print(f"爬取 {url} 时出错: {e}")return []def crawl(self, start_urls, max_pages=20):with ThreadPoolExecutor(max_workers=self.max_workers) as executor:# 初始URL集合urls_to_crawl = set(start_urls)while urls_to_crawl and len(self.visited) < max_pages:# 提交当前批次的URLfutures = {executor.submit(self.crawl_page, url): url for url in urls_to_crawl}urls_to_crawl.clear()# 收集新发现的URLfor future in futures:try:new_urls = future.result()urls_to_crawl.update(new_urls)except Exception as e:print(f"任务执行出错: {e}")print(f"爬取完成,总共访问了 {len(self.visited)} 个页面")# 使用爬虫
crawler = WebCrawler(max_workers=3)
start_urls = ["http://example.com", "http://example.org"]
crawler.crawl(start_urls, max_pages=10)
4.2 数据处理管道
import threading
import queue
import time
import randomclass DataProcessingPipeline:def __init__(self):self.raw_data_queue = queue.Queue()self.processed_data_queue = queue.Queue()self.results = []self.lock = threading.Lock()def data_producer(self, num_items):"""生产原始数据"""for i in range(num_items):data = {'id': i,'value': random.randint(1, 100),'timestamp': time.time()}self.raw_data_queue.put(data)print(f"生产数据: {data}")time.sleep(0.1)# 发送结束信号for _ in range(2): # 有两个消费者self.raw_data_queue.put(None)def data_processor(self, worker_id):"""处理数据"""while True:data = self.raw_data_queue.get()if data is None:self.raw_data_queue.put(None) # 让其他处理器也能收到结束信号break# 模拟数据处理processed_data = {'id': data['id'],'processed_value': data['value'] * 2,'worker': worker_id,'processed_at': time.time()}self.processed_data_queue.put(processed_data)print(f"处理器 {worker_id} 处理数据: {processed_data}")self.raw_data_queue.task_done()def result_collector(self):"""收集结果"""count = 0while True:result = self.processed_data_queue.get()if result is None:breakwith self.lock:self.results.append(result)count += 1print(f"收集器收到结果 {count}: {result}")self.processed_data_queue.task_done()# 发送结束信号给其他可能的收集器self.processed_data_queue.put(None)def run_pipeline(self, num_items=10):# 创建线程producer_thread = threading.Thread(target=self.data_producer, args=(num_items,))processor_threads = [threading.Thread(target=self.data_processor, args=(i,))for i in range(2)]collector_thread = threading.Thread(target=self.result_collector)# 启动线程producer_thread.start()for thread in processor_threads:thread.start()collector_thread.start()# 等待生产者完成producer_thread.join()# 等待所有数据处理完成self.raw_data_queue.join()self.processed_data_queue.put(None) # 通知收集器结束# 等待所有线程完成for thread in processor_threads:thread.join()collector_thread.join()print(f"管道处理完成,共处理 {len(self.results)} 条数据")return self.results# 运行数据处理管道
pipeline = DataProcessingPipeline()
results = pipeline.run_pipeline(10)
5. GIL(全局解释器锁)的影响
5.1 GIL 的限制
import threading
import timedef cpu_intensive_task(n):count = 0for i in range(n):count += ireturn count# CPU密集型任务 - 多线程可能不会加速
def test_cpu_intensive():start_time = time.time()# 单线程执行result1 = cpu_intensive_task(10**7)result2 = cpu_intensive_task(10**7)single_thread_time = time.time() - start_timeprint(f"单线程执行时间: {single_thread_time:.2f}秒")# 多线程执行start_time = time.time()thread1 = threading.Thread(target=cpu_intensive_task, args=(10**7,))thread2 = threading.Thread(target=cpu_intensive_task, args=(10**7,))thread1.start()thread2.start()thread1.join()thread2.join()multi_thread_time = time.time() - start_timeprint(f"多线程执行时间: {multi_thread_time:.2f}秒")print(f"加速比: {single_thread_time/multi_thread_time:.2f}")test_cpu_intensive()
5.2 I/O 密集型任务的优势
import threading
import timedef io_intensive_task(task_id, duration):print(f"任务 {task_id} 开始")time.sleep(duration) # 模拟I/O操作print(f"任务 {task_id} 完成")return f"任务 {task_id} 结果"def test_io_intensive():start_time = time.time()# 单线程执行results = []for i in range(5):results.append(io_intensive_task(i, 1))single_thread_time = time.time() - start_timeprint(f"单线程执行时间: {single_thread_time:.2f}秒")# 多线程执行start_time = time.time()threads = []for i in range(5):thread = threading.Thread(target=io_intensive_task, args=(i, 1))threads.append(thread)thread.start()for thread in threads:thread.join()multi_thread_time = time.time() - start_timeprint(f"多线程执行时间: {multi_thread_time:.2f}秒")print(f"加速比: {single_thread_time/multi_thread_time:.2f}")test_io_intensive()
6. 注意事项和最佳实践
6.1 常见问题及解决方案
死锁预防
import threading
import time# 错误的做法 - 可能导致死锁
def transfer_money_wrong(account1, account2, amount):with account1.lock:with account2.lock:if account1.balance >= amount:account1.balance -= amountaccount2.balance += amountreturn Truereturn False# 正确的做法 - 按固定顺序获取锁
def transfer_money_right(account1, account2, amount):# 按ID排序确保一致的锁获取顺序lock1, lock2 = sorted([account1.lock, account2.lock], key=id)with lock1:with lock2:if account1.balance >= amount:account1.balance -= amountaccount2.balance += amountreturn Truereturn False
线程局部数据
import threading# 线程局部存储
thread_local = threading.local()def show_thread_data():try:value = thread_local.valueprint(f"线程 {threading.current_thread().name} 的值: {value}")except AttributeError:print(f"线程 {threading.current_thread().name} 没有设置值")def worker(value):thread_local.value = valueshow_thread_data()threads = []
for i in range(3):t = threading.Thread(target=worker, args=(i,), name=f"Worker-{i}")threads.append(t)t.start()for t in threads:t.join()
6.2 最佳实践总结
合理使用锁:
- 只在必要时使用锁
- 锁的粒度要适当
- 避免嵌套锁,防止死锁
使用线程安全的数据结构:
import queue
from collections import defaultdict
import threading# 线程安全的队列
safe_queue = queue.Queue()# 使用锁保护的非线程安全数据结构
shared_dict = defaultdict(list)
dict_lock = threading.Lock()
正确处理异常:
def safe_worker():try:# 工作代码passexcept Exception as e:print(f"线程异常: {e}")# 记录日志或采取恢复措施
合理设置线程数量:
I/O密集型:可以设置较多线程CPU密集型:由于GIL,多线程可能不会带来性能提升
使用线程池:
避免频繁创建销毁线程
更好地管理资源
7. 总结
Python多线程主要适用于I/O密集型任务,如网络请求、文件操作等。由于GIL的存在,对于CPU密集型任务,多线程可能不会带来性能提升,此时应考虑使用多进程。
关键要点:
- 使用
threading模块进行线程管理 - 合理使用同步原语(
Lock、Semaphore、Event、Condition) - 对于并发任务,优先考虑
ThreadPoolExecutor - 注意线程安全和死锁预防
- 理解GIL的影响,选择合适的并发方案
