【Python】多线程详解:从基础概念到实战应用
文章目录
- 一、概念
- 什么是多线程
- 什么时候使用多线程
- Python 的多线程
- 二、Python 线程
- 使用threading模块创建线程
- 方法1:创建Thread实例
- 方法2:继承Thread类
- 线程的基本方法
- 三、线程同步
- 锁(Lock)
- 可重入锁(RLock)
- 条件变量(Condition)
- 信号量(Semaphore)
- 事件(Event)
- 队列
- 五、GIL(全局解释器锁)
- 什么是GIL
- GIL的影响
- 如何绕过GIL的限制
- 五、线程池
- 六、线程局部数据
- 七、线程的注意事项
- 八、实际应用示例
- 多线程下载器
- 多线程爬虫
- 九、调试多线程程序
- 十、总结
一、概念
先简单介绍一下多线程的基本概念:
什么是多线程
多线程(Multithreading)是指在一个进程中同时运行多个线程的技术。每个线程可以看作是一个独立的执行流,它们共享同一进程的内存空间和资源,但拥有各自的执行栈和程序计数器。
什么时候使用多线程
-
I/O密集型任务:如网络请求、文件读写、数据库查询等任务,这些操作往往会涉及到等待外部设备响应。使用多线程可以在等待响应的过程中,执行其他任务,从而提高程序的效率。
-
提高程序响应性:在图形用户界面(GUI)应用程序中,如果不使用多线程,长时间的计算任务会导致界面冻结。通过多线程,可以将计算任务放到后台执行,确保界面响应用户的操作。
-
并行执行多个独立任务:当一个程序中需要同时执行多个任务时,使用多线程可以将这些任务并行化,减少执行时间。
Python 的多线程
Python中的多线程是通过threading
模块实现的。由于Python的Global Interpreter Lock (GIL)
会限制同一时刻只有一个线程可以执行Python字节码,因此在CPU密集型任务中,Python的多线程效果不如多进程。
对于I/O密集型任务,Python的多线程可以提供很好的并行效果。
二、Python 线程
Python提供了两个主要模块来实现多线程:
threading
模块(推荐使用)_thread
模块(底层模块,不推荐直接使用)
下面是一些具体的代码实例,代码均在pycharm运行;
使用threading模块创建线程
方法1:创建Thread实例
接下来编写练习代码:通过多线程并发执行任务,模拟了三个线程同时工作并等待其执行完成。
import threading
import timedef worker(num):print(f"线程{num}开始")time.sleep(2) # 模拟耗时操作print(f"线程{num}结束")# 创建线程
threads = []
for i in range(3):t = threading.Thread(target=worker, args=(i,))threads.append(t)t.start()# 等待所有线程完成
for t in threads:t.join()print("所有线程执行完毕")
方法2:继承Thread类
基于刚刚的代码,创建一个类继承 threading.Thread
,并初始化后实现内部的run函数,后创建刚刚的自定义类进行启动。
import threadingclass MyThread(threading.Thread):def __init__(self, num):threading.Thread.__init__(self)self.num = numdef run(self):print(f"线程{self.num}开始")# 执行任务...print(f"线程{self.num}结束")# 使用自定义线程类
threads = []
for i in range(3):t = MyThread(i)threads.append(t)t.start()for t in threads:t.join()
线程的基本方法
start()
-
定义:
start()
方法用于启动线程,触发线程的run()
方法。 -
参数:无
-
说明:该方法会将线程状态设置为“活跃”,并且会执行
run()
方法。thread.start()
run()
-
定义:
run()
方法是线程执行的具体内容。我们可以重写此方法来定义线程的任务。 -
参数:无
-
说明:如果没有重写此方法,默认会执行
target
函数。def run(self):pass
join(timeout=None)
-
定义:
join()
方法用于阻塞当前线程,直到目标线程执行完毕。可以设置超时时间。 -
参数:
timeout
:可选,超时秒数,默认为None
,即不设置超时。
-
说明:若
timeout
参数指定时间内目标线程未完成,当前线程仍会继续。thread.join(timeout=None)
is_alive()
-
定义:
is_alive()
方法用于检查线程是否仍在执行中。 -
参数:无
-
返回值:
True
如果线程处于活动状态,否则返回False
。thread.is_alive()
name
-
定义:
name
是线程的名称属性,可以设置或获取线程的名称。 -
参数:无(读取)或可设置为一个字符串(写入)
-
返回值:线程名称字符串
thread.name = "MyThread"
ident
-
定义:
ident
是线程的唯一标识符。 -
参数:无
-
返回值:线程的标识符(通常为一个整数)
thread.ident
daemon
-
定义:
daemon
是线程是否为守护线程的属性。设置为True
时,主线程退出时,守护线程会立即终止。 -
参数:布尔值 (
True
或False
) -
说明:默认情况下为
False
,表示非守护线程。thread.daemon = True
三、线程同步
由于线程共享内存空间,当多个线程同时访问共享资源时,可能会导致数据不一致的问题。Python提供了多种同步机制:
锁(Lock)
在多线程编程中,多个线程可能会同时访问共享资源,这会导致数据竞争和不一致的问题。为了避免这种情况,可以使用 锁(Lock)来确保在同一时刻只有一个线程能够访问共享资源。
Lock
是一种同步原语,允许线程获取和释放资源。当一个线程获取锁时,其他线程必须等待直到锁被释放。这样可以保证对共享资源的访问是互斥的。
import threadingshared_resource = 0 # 共享资源
lock = threading.Lock()# 自增函数
def incr():global shared_resourcefor _ in range(10):lock.acquire() # 获取锁shared_resource += 1lock.release() # 释放锁threads = []
for i in range(5):t = threading.Thread(target=incr())threads.append(t)t.start()for t in threads:t.join()# 打印结果为50
print(f"最终结果: {shared_resource}")
可重入锁(RLock)
可重入锁(RLock)是一种特殊类型的锁,它允许同一个线程多次获取同一个锁,而不会导致死锁。在传统的锁(Lock)中,线程如果在已经持有锁的情况下再次请求该锁,会导致死锁。
在 RLock 中,允许同一个线程多次获取同一个锁,前提是每次获取锁后都需要相应地释放锁。
lock = threading.RLock()def func1():with lock:func2()def func2():with lock: # 不会死锁print("Inside func2")func1()
条件变量(Condition)
条件变量(Condition) 是用于线程间同步和通信的一种机制。它允许一个线程等待某个条件的发生,直到另一个线程通知它可以继续执行。
condition
通常用于生产者-消费者问题,其中一个线程(生产者)生成数据并通知另一个线程(消费者)开始处理数据。
import threadingshared_resource = [] # 共享资源def consumer(cond):with cond:cond.wait()print("消费者准备消费...")for i in range(3):print(f"消费者消费了{i}")shared_resource.remove(i)def producer(cond):with cond:print("生产者准备生产...")for i in range(3):print(f"生产者生产了{i}")shared_resource.append(i)cond.notify() # 通知消费者condition = threading.Condition()
c = threading.Thread(name="consumer", target=consumer, args=(condition,))
p = threading.Thread(name="producer", target=producer, args=(condition,))c.start()
p.start()
信号量(Semaphore)
信号量(Semaphore) 是一种用于控制访问共享资源的线程同步机制。信号量通过维护一个计数器来控制同时能够访问特定资源的线程数量。
信号量通常用于限制线程对某些资源的并发访问,比如限制同时访问数据库连接池的线程数。
import threadingsemaphore = threading.Semaphore(3) # 最多允许3个线程同时访问def access_resource(thread_id):print(f"线程{thread_id}尝试获取资源")with semaphore:print(f"线程{thread_id}获取到资源")time.sleep(2)print(f"线程{thread_id}释放资源")threads = []
for i in range(5):t = threading.Thread(target=access_resource, args=(i,))threads.append(t)t.start()for t in threads:t.join()
下面是输出结果,根据结果也可以看出,最多只有三个线程同时获得资源。
线程0尝试获取资源
线程0获取到资源
线程1尝试获取资源
线程1获取到资源
线程2尝试获取资源
线程2获取到资源
线程3尝试获取资源
线程4尝试获取资源
线程0释放资源线程3获取到资源线程1释放资源线程4获取到资源线程2释放资源
线程3释放资源
线程4释放资源
事件(Event)
事件(Event) 是 Python 中线程间通信的一种简单机制。事件对象允许一个线程通过等待某个事件的发生来阻塞自己,直到其他线程触发该事件。
事件主要用于协调多个线程的执行顺序,常常用在生产者-消费者模型、线程同步等场景中。
import threadingevent = threading.Event()def waiter():print("等待者等待事件")event.wait() # 阻塞直到事件被设置print("等待者收到事件")def setter():print("设置者准备设置事件")time.sleep(2)event.set() # 设置事件print("事件已设置")t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)t1.start()
t2.start()t1.join()
t2.join()
队列
可以使用 queue.Queue 来实现线程间的通信和数据交换。
import threading
import queue
import timedef producer(q):for i in range(5):print(f"生产者生产了产品{i}")q.put(i)time.sleep(1)q.put(None) # 结束信号def consumer(q):while True:item = q.get()if item is None:breakprint(f"消费者消费了产品{item}")q.task_done()q = queue.Queue()
prod = threading.Thread(target=producer, args=(q,))
cons = threading.Thread(target=consumer, args=(q,))prod.start()
cons.start()prod.join()
cons.join()
print("生产消费结束")
五、GIL(全局解释器锁)
什么是GIL
GIL(Global Interpreter Lock)是Python解释器中的一个机制,它确保任何时候只有一个线程在执行Python字节码。这意味着即使在多核CPU上,Python的多线程也无法实现真正的并行执行。
GIL的影响
- 优点:简化了CPython的实现,使对象模型(包括关键的内建类型如字典)隐式地避免了并发访问的竞争条件
- 缺点:限制了多线程程序的并行性能,特别是在CPU密集型任务中
如何绕过GIL的限制
- 使用多进程:
multiprocessing
模块可以绕过GIL,因为每个进程有自己的Python解释器和内存空间 - 使用C扩展:将计算密集型部分用C/C++实现
- 使用其他Python实现:如Jython或IronPython没有GIL
- 使用异步编程:对于I/O密集型任务,可以使用
asyncio
五、线程池
Python提供了concurrent.futures
模块来方便地管理线程池:
from concurrent.futures import ThreadPoolExecutor
import timedef task(name):print(f"任务{name}开始")time.sleep(2)print(f"任务{name}结束")return f"结果{name}"# 创建线程池(最多3个线程)
with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务futures = [executor.submit(task, i) for i in range(5)]# 获取结果for future in futures:print(future.result())print("所有任务完成")
六、线程局部数据
有时需要让每个线程拥有自己的数据副本
通过threading.local()
实现线程局部存储,每个线程都拥有自己独立的数据,不会与其他线程共享。
import threading# 创建线程局部数据
local_data = threading.local()def show_data():print(f"{threading.current_thread().name}: {local_data.value}")def thread_func(value):local_data.value = valueshow_data()threads = []
for i in range(3):t = threading.Thread(target=thread_func, args=(i,), name=f"Thread-{i}")threads.append(t)t.start()for t in threads:t.join()
七、线程的注意事项
- 避免死锁:确保获取锁的顺序一致,避免循环等待
- 不要过度使用线程:线程创建和切换有开销,太多线程反而会降低性能
- 区分I/O密集型和CPU密集型任务:
- I/O密集型:多线程可以显著提高性能
- CPU密集型:多线程可能不会提高性能(由于GIL),考虑使用多进程
- 线程安全:注意共享资源的访问,使用适当的同步机制
- 守护线程:设置
daemon=True
可以让线程在主线程结束时自动退出
八、实际应用示例
多线程下载器
实现多线程的url下载器:并发地下载多个文件,并且在文件已存在时跳过下载,避免重复工作。通过 requests
来处理HTTP请求,使用 threading
来实现多线程并发下载,增强下载效率
import threading
import requests
import os
import timedef download_file(url, filename):print(f"开始下载 {filename}")response = requests.get(url, stream=True)with open(filename, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):if chunk:f.write(chunk)print(f"完成下载 {filename}")def main():urls = [("https://example.com/file1.zip", "file1.zip"),("https://example.com/file2.zip", "file2.zip"),("https://example.com/file3.zip", "file3.zip"),]threads = []for url, filename in urls:if os.path.exists(filename):print(f"{filename} 已存在,跳过下载")continuet = threading.Thread(target=download_file, args=(url, filename))threads.append(t)t.start()for t in threads:t.join()print("所有下载任务完成")if __name__ == "__main__":start = time.time()main()print(f"总耗时: {time.time() - start:.2f}秒")
多线程爬虫
根据多线程实现一个简单的多线程爬虫程序,使用 requests
和 BeautifulSoup
进行网页抓取与解析,结合 threading
模块进行并发抓取,提高了爬取效率。通过队列管理待抓取的URL,保证每个URL只被抓取一次。
import threading
import requests
from bs4 import BeautifulSoup
import queueclass Crawler:def __init__(self, start_url, max_threads=5):self.visited = set()self.queue = queue.Queue()self.queue.put(start_url)self.max_threads = max_threadsself.lock = threading.Lock()def crawl(self):while True:try:url = self.queue.get(timeout=1)except queue.Empty:breakif url in self.visited:self.queue.task_done()continueprint(f"抓取: {url}")try:response = requests.get(url)soup = BeautifulSoup(response.text, 'html.parser')# 处理页面内容...print(f"标题: {soup.title.string if soup.title else '无标题'}")# 找到新链接for link in soup.find_all('a'):href = link.get('href')if href and href.startswith('http'):with self.lock:if href not in self.visited:self.queue.put(href)with self.lock:self.visited.add(url)except Exception as e:print(f"抓取 {url} 出错: {e}")self.queue.task_done()def start(self):threads = []for _ in range(self.max_threads):t = threading.Thread(target=self.crawl)t.start()threads.append(t)self.queue.join()for _ in range(self.max_threads):self.queue.put(None)for t in threads:t.join()print(f"抓取完成,共访问了 {len(self.visited)} 个页面")if __name__ == "__main__":crawler = Crawler("https://example.com")crawler.start()
九、调试多线程程序
调试多线程程序比单线程更复杂,以下是一些技巧:
-
使用日志:记录线程ID和时间戳
import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s [%(threadName)s] %(message)s' )
-
使用
threading.enumerate()
:查看所有活动线程for thread in threading.enumerate():print(thread.name, thread.is_alive())
-
使用调试器:如pdb,但要注意线程切换
-
简化问题:先减少线程数量,重现问题
十、总结
Python的多线程编程虽然受到GIL的限制,但在I/O密集型任务中仍然非常有用。通过合理使用线程同步机制和线程间通信,可以构建高效的多线程应用程序。对于CPU密集型任务,应考虑使用多进程或其他解决方案。
记住多线程编程的关键原则:
- 识别真正的并行需求
- 合理设计线程间的交互
- 谨慎处理共享资源
- 选择适当的同步机制
- 注意线程安全和死锁问题