【工具】多线程任务执行函数
1. 代码示例
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_tasks_concurrently(task_func, tasks, max_workers=10):"""通用多线程任务执行函数参数:- task_func: 任务函数,接受单个任务参数- tasks: 任务列表- max_workers: 最大并发线程数,默认10返回:- results: 任务结果列表,顺序不一定,与tasks顺序无关任务异常时返回 (f"[ERROR] {e}", {})"""results = []with ThreadPoolExecutor(max_workers=max_workers) as pool:futures = {pool.submit(task_func, task): task for task in tasks}for fut in as_completed(futures):try:results.append(fut.result())except Exception as e:results.append((f"[ERROR] {e}", {}))return results
您可用如下方式调用:
def my_query_fn(q):# 处理单个查询q的函数...queries = [...]
results = run_tasks_concurrently(my_query_fn, queries, max_workers=5)
2. 详细讲解
1. pool
是什么?
pool
是一个ThreadPoolExecutor
的实例,代表一个线程池管理器。线程池可以管理多个线程,有效复用线程资源,提升并发执行效率。
2. submit()
方法作用
submit()
是ThreadPoolExecutor
提供的方法,用于向线程池提交一个任务。任务是一个函数调用(可调用对象)及其参数。
submit()
非阻塞,调用后立即返回一个Future
对象。
3. 参数解析
第一个参数是可调用对象,如函数或实例方法,比如
self._run_single_query
表示实例的一个方法。后续参数是传给此函数的参数,示例中传给
_run_single_query
方法的参数是q
。
4. Future
对象
submit()
返回的Future
对象代表异步执行的结果。可以调用
future.result()
来阻塞等待结果完成并获取返回值。也可以通过
future.done()
判断任务是否完成。提供了异常捕获和取消任务等操作接口。
5. 实际运行逻辑
此调用把方法 _run_single_query(q)
提交给线程池线程执行,立即返回句柄 Future
。线程池负责调度实际线程去调用此方法,主线程可以同时提交多个任务异步执行。