PyQt python 异步任务,多线程,进阶版
初级版见:
https://blog.csdn.net/q610098308/article/details/145175625
PyQt 多线程编程模式:通用的多线程执行器
方案一:装饰器方式(最简洁)
import sys
import time
from functools import wraps
from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QTextEdit, QVBoxLayout, QWidget, QProgressBarclass AsyncExecutor(QObject):"""通用的异步执行器"""started = pyqtSignal()finished = pyqtSignal(object)progress = pyqtSignal(int)error = pyqtSignal(str)def __init__(self):super().__init__()self.thread = QThread()self.moveToThread(self.thread)self.thread.start()@pyqtSlot()def execute(self, func, args=(), kwargs={}):"""在子线程中执行函数"""try:self.started.emit()result = func(*args, **kwargs)self.finished.emit(result)except Exception as e:self.error.emit(str(e))def async_execute(on_started=None, on_finished=None, on_error=None, on_progress=None):"""装饰器:让任何函数都能在子线程中异步执行"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):# 创建执行器executor = AsyncExecutor()# 连接信号if on_started:executor.started.connect(on_started)if on_finished:executor.finished.connect(on_finished)if on_error:executor.error.connect(on_error)if on_progress:executor.progress.connect(on_progress)# 异步执行executor.execute(func, args, kwargs)return executorreturn wrapperreturn decorator# 使用示例
class MainWindow(QMainWindow):def __init__(self):super().__init__()self.init_ui()def init_ui(self):self.setWindowTitle("简化版多线程执行器")central_widget = QWidget()self.setCentralWidget(central_widget)layout = QVBoxLayout(central_widget)self.btn1 = QPushButton("执行耗时计算")self.btn2 = QPushButton("执行文件处理") self.btn3 = QPushButton("执行网络请求")self.text = QTextEdit()layout.addWidget(self.btn1)layout.addWidget(self.btn2)layout.addWidget(self.btn3)layout.addWidget(self.text)# 连接按钮 - 使用装饰器方式self.btn1.clicked.connect(self.on_calculation_task)self.btn2.clicked.connect(self.on_file_task)self.btn3.clicked.connect(self.on_network_task)def log(self, msg):self.text.append(f"[{time.strftime('%H:%M:%S')}] {msg}")# 方法1:直接使用装饰器@async_execute(on_started=lambda self: self.log("计算任务开始..."),on_finished=lambda self, result: self.log(f"计算结果: {result}"),on_error=lambda self, error: self.log(f"计算错误: {error}"))def heavy_calculation(self, n=1000000):"""耗时计算任务"""result = 0for i in range(n):result += i * iif i % 100000 == 0: # 模拟进度更新passreturn f"计算完成,结果: {result}"def on_calculation_task(self):self.heavy_calculation(500000)# 方法2:动态连接方式def on_file_task(self):@async_execute(on_started=lambda: self.log("文件处理开始..."),on_finished=lambda result: self.log(f"文件处理完成: {result}"))def process_files():"""模拟文件处理"""time.sleep(2)return "处理了3个文件"process_files()# 方法3:更灵活的手动连接方式def on_network_task(self):def network_operation():"""模拟网络请求"""time.sleep(3)return "获取到10条数据"executor = AsyncExecutor()executor.started.connect(lambda: self.log("网络请求开始..."))executor.finished.connect(lambda result: self.log(f"网络请求完成: {result}"))executor.execute(network_operation)if __name__ == "__main__":app = QApplication(sys.argv)window = MainWindow()window.show()sys.exit(app.exec_())
方案二:链式调用方式(更直观)
from PyQt5.QtCore import QObject, QThread, pyqtSignalclass EasyAsyncTask:"""链式调用的异步任务执行器"""def __init__(self):self.executor = AsyncExecutor()self._func = Noneself._args = ()self._kwargs = {}def run(self, func, *args, **kwargs):"""设置要执行的函数"""self._func = funcself._args = argsself._kwargs = kwargsreturn selfdef on_start(self, callback):"""任务开始回调"""self.executor.started.connect(callback)return selfdef on_finish(self, callback):"""任务完成回调"""self.executor.finished.connect(callback)return selfdef on_error(self, callback):"""任务错误回调"""self.executor.error.connect(callback)return selfdef start(self):"""开始执行任务"""if self._func:self.executor.execute(self._func, self._args, self._kwargs)return self# 使用示例
class EasyWindow(QMainWindow):def __init__(self):super().__init__()self.init_ui()def init_ui(self):self.setWindowTitle("链式调用异步任务")central_widget = QWidget()self.setCentralWidget(central_widget)layout = QVBoxLayout(central_widget)self.btn = QPushButton("执行多个任务")self.text = QTextEdit()layout.addWidget(self.btn)layout.addWidget(self.text)self.btn.clicked.connect(self.run_multiple_tasks)def log(self, msg):self.text.append(msg)def run_multiple_tasks(self):"""同时执行多个不同类型的任务"""# 任务1:数据处理def data_processing():time.sleep(2)return "数据处理完成"EasyAsyncTask().run(data_processing)\.on_start(lambda: self.log("开始数据处理..."))\.on_finish(lambda result: self.log(result))\.start()# 任务2:图片处理def image_processing():for i in range(5):time.sleep(0.5)return "图片处理完成"EasyAsyncTask().run(image_processing)\.on_start(lambda: self.log("开始图片处理..."))\.on_finish(lambda result: self.log(result))\.start()# 任务3:网络下载def download_file():time.sleep(3)return "文件下载完成"EasyAsyncTask().run(download_file)\.on_start(lambda: self.log("开始下载文件..."))\.on_finish(lambda result: self.log(result))\.start()
方案三:全局线程池方式(最高效)
from PyQt5.QtCore import QThreadPool, QRunnable, pyqtSignal, QObjectclass WorkerSignals(QObject):"""定义工作线程的信号"""started = pyqtSignal()finished = pyqtSignal(object)error = pyqtSignal(str)progress = pyqtSignal(int)class TaskWorker(QRunnable):"""可重用的任务工作器"""def __init__(self, func, args=(), kwargs={}):super().__init__()self.func = funcself.args = argsself.kwargs = kwargsself.signals = WorkerSignals()def run(self):try:self.signals.started.emit()result = self.func(*self.args, **self.kwargs)self.signals.finished.emit(result)except Exception as e:self.signals.error.emit(str(e))class GlobalThreadManager:"""全局线程管理器"""_thread_pool = QThreadPool.globalInstance()@classmethoddef submit(cls, func, args=(), kwargs={}, on_started=None, on_finished=None, on_error=None):"""提交任务到全局线程池"""worker = TaskWorker(func, args, kwargs)if on_started:worker.signals.started.connect(on_started)if on_finished:worker.signals.finished.connect(on_finished)if on_error:worker.signals.error.connect(on_error)cls._thread_pool.start(worker)return worker# 使用示例 - 超级简洁!
class SimpleWindow(QMainWindow):def __init__(self):super().__init__()self.init_ui()def init_ui(self):self.setWindowTitle("全局线程池 - 最简单用法")central_widget = QWidget()self.setCentralWidget(central_widget)layout = QVBoxLayout(central_widget)self.btn = QPushButton("一键执行多个任务")self.text = QTextEdit()layout.addWidget(self.btn)layout.addWidget(self.text)self.btn.clicked.connect(self.simple_async_tasks)def log(self, msg):self.text.append(f"[{time.strftime('%H:%M:%S')}] {msg}")def simple_async_tasks(self):"""最简单的异步任务调用"""# 一行代码执行异步任务!GlobalThreadManager.submit(lambda: time.sleep(2) or "任务1完成",on_started=lambda: self.log("任务1开始"),on_finished=lambda result: self.log(result))GlobalThreadManager.submit(lambda: sum(i*i for i in range(1000000)),on_started=lambda: self.log("任务2开始"), on_finished=lambda result: self.log(f"任务2结果: {result}"))
小结:
方案三(全局线程池)是最推荐的,因为:
-
使用最简单:一行代码搞定异步执行
-
性能最好:重用线程,避免频繁创建销毁
-
资源友好:自动管理线程数量
-
功能完整:支持开始、完成、错误回调
# 最简单的使用示例
def my_heavy_function(param1, param2):# 你的耗时操作time.sleep(2)return f"处理结果: {param1} + {param2}"# 异步调用
GlobalThreadManager.submit(my_heavy_function, args=("hello", "world"),on_started=lambda: print("任务开始"),on_finished=lambda result: print(result)
)
这样封装后,你在任何地方需要异步执行耗时操作时,只需要调用 GlobalThreadManager.submit()
就可以了,非常方便!