当前位置: 首页 > news >正文

Python 线程池

Python 线程池

flyfish

线程池的概念

线程池是一种多线程处理形式,它预先创建了一定数量的线程,这些线程会被保存在一个线程池中。当有新的任务提交时,线程池会从池中取出一个空闲的线程来执行该任务;若池中没有空闲线程,任务会被暂时放入任务队列等待,直到有线程空闲。使用线程池可以避免频繁创建和销毁线程带来的开销,提高程序的性能和资源利用率。

常用函数

1. submit(fn, *args, **kwargs)

  • 功能:将可调用对象 fn 及其参数 *args**kwargs 提交给线程池执行,返回一个 Future 对象。通过这个 Future 对象可以获取任务的执行结果、状态等信息。
  • 示例
from concurrent.futures import ThreadPoolExecutor

def add(a, b):
    return a + b

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(add, 2, 3)
    result = future.result()
    print(result)

2. map(func, *iterables, timeout=None, chunksize=1)

  • 功能:类似于内置的 map 函数,将可迭代对象 iterables 中的每个元素依次作为参数传递给函数 func 进行处理,返回一个迭代器。可以设置超时时间 timeout 和分块大小 chunksize
  • 示例
from concurrent.futures import ThreadPoolExecutor

def square(x):
    return x * x

numbers = [1, 2, 3, 4]
with ThreadPoolExecutor(max_workers=2) as executor:
    results = executor.map(square, numbers)
    for result in results:
        print(result)

3. shutdown(wait=True, cancel_futures=False)

  • 功能:关闭线程池,阻止新任务的提交。wait 参数为 True 时,会等待所有已提交的任务完成;cancel_futures 参数为 True 时,会尝试取消所有未开始的任务。
  • 示例
from concurrent.futures import ThreadPoolExecutor

def task():
    print("执行任务")

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(task)
executor.shutdown(wait=True)

4. wait(fs, timeout=None, return_when=ALL_COMPLETED)

  • 功能:等待指定的 Future 对象集合 fs 中的任务完成。timeout 为可选的超时时间,return_when 可以取 ALL_COMPLETED(所有任务完成)、FIRST_COMPLETED(第一个任务完成)、FIRST_EXCEPTION(第一个任务抛出异常)。
  • 示例
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(task, 1), executor.submit(task, 2)]
    done, not_done = wait(futures, return_when=ALL_COMPLETED)
    for future in done:
        print(future.result())

5. as_completed(fs, timeout=None)

  • 功能:返回一个迭代器,当 Future 对象集合 fs 中的任务完成时,会按完成顺序产生对应的 Future 对象。可以设置超时时间 timeout
  • 示例
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(task, 2), executor.submit(task, 1)]
    for future in as_completed(futures):
        print(future.result())

6. Future 对象的方法

  • result(timeout=None):获取任务的执行结果。如果任务还未完成,会阻塞直到任务完成或超时(timeout 为可选的超时时间)。
  • exception(timeout=None):获取任务抛出的异常。如果任务没有抛出异常,返回 None;如果任务还未完成,会阻塞直到任务完成或超时。
  • add_done_callback(fn):为任务添加一个回调函数 fn,当任务完成时会自动调用该回调函数,并将 Future 对象作为参数传递给它。
  • cancel():尝试取消任务。如果任务还未开始,会成功取消;如果任务已经开始或完成,取消失败。返回 True 表示取消成功,False 表示取消失败。
  • cancelled():判断任务是否已被取消。返回 True 表示已取消,False 表示未取消。
  • running():判断任务是否正在执行。返回 True 表示正在执行,False 表示未执行或已完成。
  • done():判断任务是否已完成(包括正常完成、抛出异常或被取消)。返回 True 表示已完成,False 表示未完成。

示例

1. example1_single_task.py

from concurrent.futures import ThreadPoolExecutor

def square(x):
    return x * x

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(square, 5)
    result = future.result()
    print(result)

此代码的主要功能是向线程池提交单个任务,并获取该任务的执行结果。具体步骤为:

  • 定义一个 square 函数,该函数用于计算输入值的平方。
  • 创建一个仅包含 1 个工作线程的线程池。
  • 利用 submit 方法将 square 函数和参数 5 提交给线程池,获取一个 Future 对象。
  • 调用 Future 对象的 result 方法获取任务的执行结果并打印。

2. example2_multiple_tasks.py

from concurrent.futures import ThreadPoolExecutor

def multiply(x, y):
    return x * y

tasks = [(2, 3), (4, 5), (6, 7)]
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(multiply, *task) for task in tasks]
    for future in futures:
        print(future.result())

该代码的功能是向线程池提交多个任务,并获取每个任务的执行结果。详细步骤如下:

  • 定义一个 multiply 函数,用于计算两个输入值的乘积。
  • 准备一个包含多个任务参数元组的列表 tasks
  • 创建一个包含 3 个工作线程的线程池。
  • 使用列表推导式将每个任务参数元组提交给线程池,得到一个 Future 对象列表。
  • 遍历 Future 对象列表,调用 result 方法获取每个任务的执行结果并打印。

3. example3_map_function.py

from concurrent.futures import ThreadPoolExecutor

def add_one(x):
    return x + 1

numbers = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(add_one, numbers)
    for result in results:
        print(result)

此代码使用线程池的 map 方法批量处理任务。具体操作如下:

  • 定义一个 add_one 函数,用于将输入值加 1。
  • 准备一个包含多个数值的列表 numbers
  • 创建一个包含 3 个工作线程的线程池。
  • 使用 map 方法将 add_one 函数应用到 numbers 列表的每个元素上,得到一个结果迭代器。
  • 遍历结果迭代器,打印每个任务的执行结果。

4. example4_wait_for_all.py

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time

def task(n):
    time.sleep(n)
    return n

tasks = [1, 2, 3]
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in tasks]
    wait(futures, return_when=ALL_COMPLETED)
    for future in futures:
        print(future.result())

该代码的功能是等待线程池中的所有任务完成,并获取它们的执行结果。步骤如下:

  • 定义一个 task 函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。
  • 准备一个包含多个任务参数的列表 tasks
  • 创建一个包含 3 个工作线程的线程池。
  • 使用列表推导式将每个任务参数提交给线程池,得到一个 Future 对象列表。
  • 调用 wait 函数,传入 Future 对象列表和 ALL_COMPLETED 参数,等待所有任务完成。
  • 遍历 Future 对象列表,调用 result 方法获取每个任务的执行结果并打印。

5. example5_first_completed.py

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time

def task(n):
    time.sleep(n)
    return n

tasks = [3, 1, 2]
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in tasks]
    done, _ = wait(futures, return_when=FIRST_COMPLETED)
    for future in done:
        print(future.result())

此代码的主要功能是获取线程池中第一个完成的任务的结果。具体实现步骤为:

  • 定义一个 task 函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。
  • 准备一个包含多个任务参数的列表 tasks
  • 创建一个包含 3 个工作线程的线程池。
  • 使用列表推导式将每个任务参数提交给线程池,得到一个 Future 对象列表。
  • 调用 wait 函数,传入 Future 对象列表和 FIRST_COMPLETED 参数,等待第一个任务完成。
  • 遍历已完成的 Future 对象列表,调用 result 方法获取任务的执行结果并打印。

6. example6_callback_function.py

from concurrent.futures import ThreadPoolExecutor

def task(x):
    return x * 2

def callback(future):
    print(f"任务结果: {future.result()}")

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(task, 3)
    future.add_done_callback(callback)

该代码为线程池中的任务添加了回调函数。具体步骤如下:

  • 定义一个 task 函数,用于将输入值乘以 2。
  • 定义一个 callback 函数,用于打印任务的执行结果。
  • 创建一个包含 1 个工作线程的线程池。
  • 使用 submit 方法将 task 函数和参数 3 提交给线程池,获取一个 Future 对象。
  • 调用 Future 对象的 add_done_callback 方法,为任务添加回调函数。当任务完成时,会自动调用回调函数。

7. example7_exception_handling.py

from concurrent.futures import ThreadPoolExecutor

def task(x):
    if x == 2:
        raise ValueError("输入值不能为 2")
    return x * 2

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in [1, 2, 3]]
    for future in futures:
        try:
            print(future.result())
        except ValueError as e:
            print(f"捕获到异常: {e}")

此代码的功能是处理线程池任务中抛出的异常。具体操作如下:

  • 定义一个 task 函数,当输入值为 2 时,抛出 ValueError 异常,否则将输入值乘以 2。
  • 创建一个包含 3 个工作线程的线程池。
  • 使用列表推导式将不同的任务参数提交给线程池,得到一个 Future 对象列表。
  • 遍历 Future 对象列表,使用 try-except 语句捕获并处理可能抛出的异常,若没有异常则打印任务的执行结果。

8. example8_timeout_handling.py

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(task, 3)
    try:
        result = future.result(timeout=2)
        print(result)
    except TimeoutError:
        print("任务超时")

该代码用于设置线程池任务的超时时间。具体步骤如下:

  • 定义一个 task 函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。
  • 创建一个包含 1 个工作线程的线程池。
  • 使用 submit 方法将 task 函数和参数 3 提交给线程池,获取一个 Future 对象。
  • 调用 Future 对象的 result 方法,并设置超时时间为 2 秒。若任务在 2 秒内未完成,会抛出 TimeoutError 异常,捕获该异常并打印提示信息。

9. example9_task_cancellation.py

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return n

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(task, 3)
    cancelled = future.cancel()
    if cancelled:
        print("任务已取消")
    else:
        print("任务无法取消")

此代码的主要功能是尝试取消线程池中的任务。具体实现步骤为:

  • 定义一个 task 函数,该函数会根据输入的秒数进行休眠,模拟耗时操作,最后返回输入值。
  • 创建一个包含 1 个工作线程的线程池。
  • 使用 submit 方法将 task 函数和参数 3 提交给线程池,获取一个 Future 对象。
  • 调用 Future 对象的 cancel 方法尝试取消任务,根据返回结果判断任务是否成功取消,并打印相应的提示信息。

10. example10_iterate_results.py

from concurrent.futures import ThreadPoolExecutor

def task(x):
    return x * 3

numbers = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
    for result in executor.map(task, numbers):
        print(result)

该代码使用线程池的 map 方法批量处理任务,并迭代获取任务的执行结果。具体步骤如下:

  • 定义一个 task 函数,用于将输入值乘以 3。
  • 准备一个包含多个数值的列表 numbers
  • 创建一个包含 3 个工作线程的线程池。
  • 使用 map 方法将 task 函数应用到 numbers 列表的每个元素上,得到一个结果迭代器。
  • 遍历结果迭代器,打印每个任务的执行结果。

相关文章:

  • java-正则表达式-集合-泛型
  • VScode的debug
  • AUTOSAR与arxml的文档解析
  • 【Ratis】ratis-grpc支持零拷贝系列之引入ZeroCopyMessageMarshaller工具类
  • 【6】组合计数学习笔记
  • 2.FastAPI 中的路由与路径操作
  • Python中的字典:深度解析与应用实践
  • 破解验证码新利器:基于百度OCR与captcha-killer-modified插件的免费调用教程
  • 数据驱动进化:AI Agent如何重构手机交互范式?
  • CUDAOpenCV Hessain矩阵计算
  • 虚拟电商-延迟任务系统的微服务改造(二)
  • Linux内核Netfilter使用实战案例分析
  • 利用labelme进行图片标注
  • Redis BitMap 用户签到
  • numpy学习笔记12:实现数组的归一化(0-1范围)
  • 力扣 797. 所有可能的路径 解析JS、Java、python、Go、c++
  • 第2章:容器核心原理:深入理解Namespace、Cgroup与联合文件系统
  • 自动化测试框架pytest+requests+allure
  • Lambda 表达式的语法:
  • 【STL】string类用法介绍及部分接口的模拟实现
  • 习近平同俄罗斯总统普京举行会谈
  • 公募基金解读“一揽子金融政策”:增量财政空间或打开,有助于维持A股活力
  • “五一”从昆明机场出境1.4万人次,较去年增长7.7%
  • 马克思主义理论研究教学名师系列访谈|丁晓强:马克思主义学者要更关注社会现实的需要
  • 微软上财季净利增长18%:云业务增速环比提高,业绩指引高于预期
  • 光明日报社论:用你我的匠心,托举起繁盛的中国