Python 线程池
Python 线程池
flyfish
线程池的概念
线程池是一种多线程处理形式,它预先创建了一定数量的线程,这些线程会被保存在一个线程池中。当有新的任务提交时,线程池会从池中取出一个空闲的线程来执行该任务;若池中没有空闲线程,任务会被暂时放入任务队列等待,直到有线程空闲。使用线程池可以避免频繁创建和销毁线程带来的开销,提高程序的性能和资源利用率。
常用函数
1. submit(fn, *args, **kwargs)
- 功能:将可调用对象
fn
及其参数*args
和**kwargs
提交给线程池执行,返回一个Future
对象。通过这个Future
对象可以获取任务的执行结果、状态等信息。 - 示例:
from concurrent.futures import ThreadPoolExecutor
def add(a, b):
return a + b
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(add, 2, 3)
result = future.result()
print(result)
2. map(func, *iterables, timeout=None, chunksize=1)
- 功能:类似于内置的
map
函数,将可迭代对象iterables
中的每个元素依次作为参数传递给函数func
进行处理,返回一个迭代器。可以设置超时时间timeout
和分块大小chunksize
。 - 示例:
from concurrent.futures import ThreadPoolExecutor
def square(x):
return x * x
numbers = [1, 2, 3, 4]
with ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
3. shutdown(wait=True, cancel_futures=False)
- 功能:关闭线程池,阻止新任务的提交。
wait
参数为True
时,会等待所有已提交的任务完成;cancel_futures
参数为True
时,会尝试取消所有未开始的任务。 - 示例:
from concurrent.futures import ThreadPoolExecutor
def task():
print("执行任务")
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(task)
executor.shutdown(wait=True)
4. wait(fs, timeout=None, return_when=ALL_COMPLETED)
- 功能:等待指定的
Future
对象集合fs
中的任务完成。timeout
为可选的超时时间,return_when
可以取ALL_COMPLETED
(所有任务完成)、FIRST_COMPLETED
(第一个任务完成)、FIRST_EXCEPTION
(第一个任务抛出异常)。 - 示例:
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(task, 1), executor.submit(task, 2)]
done, not_done = wait(futures, return_when=ALL_COMPLETED)
for future in done:
print(future.result())
5. as_completed(fs, timeout=None)
- 功能:返回一个迭代器,当
Future
对象集合fs
中的任务完成时,会按完成顺序产生对应的Future
对象。可以设置超时时间timeout
。 - 示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(task, 2), executor.submit(task, 1)]
for future in as_completed(futures):
print(future.result())
6. Future
对象的方法
result(timeout=None)
:获取任务的执行结果。如果任务还未完成,会阻塞直到任务完成或超时(timeout
为可选的超时时间)。exception(timeout=None)
:获取任务抛出的异常。如果任务没有抛出异常,返回None
;如果任务还未完成,会阻塞直到任务完成或超时。add_done_callback(fn)
:为任务添加一个回调函数fn
,当任务完成时会自动调用该回调函数,并将Future
对象作为参数传递给它。cancel()
:尝试取消任务。如果任务还未开始,会成功取消;如果任务已经开始或完成,取消失败。返回True
表示取消成功,False
表示取消失败。cancelled()
:判断任务是否已被取消。返回True
表示已取消,False
表示未取消。running()
:判断任务是否正在执行。返回True
表示正在执行,False
表示未执行或已完成。done()
:判断任务是否已完成(包括正常完成、抛出异常或被取消)。返回True
表示已完成,False
表示未完成。
示例
1. example1_single_task.py
from concurrent.futures import ThreadPoolExecutor
def square(x):
return x * x
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(square, 5)
result = future.result()
print(result)
此代码的主要功能是向线程池提交单个任务,并获取该任务的执行结果。具体步骤为:
- 定义一个
square
函数,该函数用于计算输入值的平方。 - 创建一个仅包含 1 个工作线程的线程池。
- 利用
submit
方法将square
函数和参数5
提交给线程池,获取一个Future
对象。 - 调用
Future
对象的result
方法获取任务的执行结果并打印。
2. example2_multiple_tasks.py
from concurrent.futures import ThreadPoolExecutor
def multiply(x, y):
return x * y
tasks = [(2, 3), (4, 5), (6, 7)]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(multiply, *task) for task in tasks]
for future in futures:
print(future.result())
该代码的功能是向线程池提交多个任务,并获取每个任务的执行结果。详细步骤如下:
- 定义一个
multiply
函数,用于计算两个输入值的乘积。 - 准备一个包含多个任务参数元组的列表
tasks
。 - 创建一个包含 3 个工作线程的线程池。
- 使用列表推导式将每个任务参数元组提交给线程池,得到一个
Future
对象列表。 - 遍历
Future
对象列表,调用result
方法获取每个任务的执行结果并打印。
3. example3_map_function.py
from concurrent.futures import ThreadPoolExecutor
def add_one(x):
return x + 1
numbers = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(add_one, numbers)
for result in results:
print(result)
此代码使用线程池的 map
方法批量处理任务。具体操作如下:
- 定义一个
add_one
函数,用于将输入值加 1。 - 准备一个包含多个数值的列表
numbers
。 - 创建一个包含 3 个工作线程的线程池。
- 使用
map
方法将add_one
函数应用到numbers
列表的每个元素上,得到一个结果迭代器。 - 遍历结果迭代器,打印每个任务的执行结果。
4. example4_wait_for_all.py
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time
def task(n):
time.sleep(n)
return n
tasks = [1, 2, 3]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in tasks]
wait(futures, return_when=ALL_COMPLETED)
for future in futures:
print(future.result())
该代码的功能是等待线程池中的所有任务完成,并获取它们的执行结果。步骤如下:
- 定义一个
task
函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。 - 准备一个包含多个任务参数的列表
tasks
。 - 创建一个包含 3 个工作线程的线程池。
- 使用列表推导式将每个任务参数提交给线程池,得到一个
Future
对象列表。 - 调用
wait
函数,传入Future
对象列表和ALL_COMPLETED
参数,等待所有任务完成。 - 遍历
Future
对象列表,调用result
方法获取每个任务的执行结果并打印。
5. example5_first_completed.py
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time
def task(n):
time.sleep(n)
return n
tasks = [3, 1, 2]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in tasks]
done, _ = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
print(future.result())
此代码的主要功能是获取线程池中第一个完成的任务的结果。具体实现步骤为:
- 定义一个
task
函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。 - 准备一个包含多个任务参数的列表
tasks
。 - 创建一个包含 3 个工作线程的线程池。
- 使用列表推导式将每个任务参数提交给线程池,得到一个
Future
对象列表。 - 调用
wait
函数,传入Future
对象列表和FIRST_COMPLETED
参数,等待第一个任务完成。 - 遍历已完成的
Future
对象列表,调用result
方法获取任务的执行结果并打印。
6. example6_callback_function.py
from concurrent.futures import ThreadPoolExecutor
def task(x):
return x * 2
def callback(future):
print(f"任务结果: {future.result()}")
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task, 3)
future.add_done_callback(callback)
该代码为线程池中的任务添加了回调函数。具体步骤如下:
- 定义一个
task
函数,用于将输入值乘以 2。 - 定义一个
callback
函数,用于打印任务的执行结果。 - 创建一个包含 1 个工作线程的线程池。
- 使用
submit
方法将task
函数和参数3
提交给线程池,获取一个Future
对象。 - 调用
Future
对象的add_done_callback
方法,为任务添加回调函数。当任务完成时,会自动调用回调函数。
7. example7_exception_handling.py
from concurrent.futures import ThreadPoolExecutor
def task(x):
if x == 2:
raise ValueError("输入值不能为 2")
return x * 2
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in [1, 2, 3]]
for future in futures:
try:
print(future.result())
except ValueError as e:
print(f"捕获到异常: {e}")
此代码的功能是处理线程池任务中抛出的异常。具体操作如下:
- 定义一个
task
函数,当输入值为 2 时,抛出ValueError
异常,否则将输入值乘以 2。 - 创建一个包含 3 个工作线程的线程池。
- 使用列表推导式将不同的任务参数提交给线程池,得到一个
Future
对象列表。 - 遍历
Future
对象列表,使用try-except
语句捕获并处理可能抛出的异常,若没有异常则打印任务的执行结果。
8. example8_timeout_handling.py
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task, 3)
try:
result = future.result(timeout=2)
print(result)
except TimeoutError:
print("任务超时")
该代码用于设置线程池任务的超时时间。具体步骤如下:
- 定义一个
task
函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。 - 创建一个包含 1 个工作线程的线程池。
- 使用
submit
方法将task
函数和参数3
提交给线程池,获取一个Future
对象。 - 调用
Future
对象的result
方法,并设置超时时间为 2 秒。若任务在 2 秒内未完成,会抛出TimeoutError
异常,捕获该异常并打印提示信息。
9. example9_task_cancellation.py
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return n
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task, 3)
cancelled = future.cancel()
if cancelled:
print("任务已取消")
else:
print("任务无法取消")
此代码的主要功能是尝试取消线程池中的任务。具体实现步骤为:
- 定义一个
task
函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。 - 创建一个包含 1 个工作线程的线程池。
- 使用
submit
方法将task
函数和参数3
提交给线程池,获取一个Future
对象。 - 调用
Future
对象的cancel
方法尝试取消任务,根据返回结果判断任务是否成功取消,并打印相应的提示信息。
10. example10_iterate_results.py
from concurrent.futures import ThreadPoolExecutor
def task(x):
return x * 3
numbers = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
for result in executor.map(task, numbers):
print(result)
该代码使用线程池的 map
方法批量处理任务,并迭代获取任务的执行结果。具体步骤如下:
- 定义一个
task
函数,用于将输入值乘以 3。 - 准备一个包含多个数值的列表
numbers
。 - 创建一个包含 3 个工作线程的线程池。
- 使用
map
方法将task
函数应用到numbers
列表的每个元素上,得到一个结果迭代器。 - 遍历结果迭代器,打印每个任务的执行结果。