PyQt6/PySide6 线程间通信(Signal/Slot)
一、线程通信核心机制
1.1 信号与槽基础
# PyQt6 信号声明
from PyQt6.QtCore import QObject, pyqtSignal
class Worker(QObject):
progress_updated = pyqtSignal(int) # 进度更新信号
result_ready = pyqtSignal(object) # 结果就绪信号
# PySide6 信号声明(语法差异)
from PySide6.QtCore import QObject, Signal
class Worker(QObject):
progress_updated = Signal(int)
result_ready = Signal(object)
1.2 连接方式对比
连接类型 | 行为描述 | 适用场景 |
---|---|---|
自动连接(Auto) | 根据线程关系自动选择直接或队列连接 | 默认模式(推荐) |
直接连接 | 立即在发送者线程执行 | 同线程内通信 |
队列连接 | 将事件放入接收者事件队列 | 跨线程通信(默认) |
阻塞队列连接 | 发送线程等待接收线程处理完成 | 需要同步的场景 |
# 显式指定连接类型示例
worker.progress_updated.connect(update_ui, Qt.ConnectionType.QueuedConnection)
二、完整多线程通信案例
2.1 文件处理工作线程
class FileProcessor(QThread):
progress = Signal(int)
finished = Signal(list)
error = Signal(str)
def __init__(self, file_list):
super().__init__()
self.files = file_list
self._is_running = True
def run(self):
results = []
try:
for i, file in enumerate(self.files):
if not self._is_running:
break
# 模拟耗时操作
time.sleep(0.5)
result = process_file(file)
results.append(result)
# 发送进度(0-100)
self.progress.emit(int((i+1)/len(self.files)*100))
self.finished.emit(results)
except Exception as e:
self.error.emit(f"处理失败: {str(e)}")
def stop(self):
self._is_running = False
2.2 主线程控制器
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.worker = FileProcessor([])
# 信号连接
self.worker.progress.connect(self.update_progress_bar)
self.worker.finished.connect(self.show_results)
self.worker.error.connect(self.show_error)
def start_processing(self, files):
if self.worker.isRunning():
QMessageBox.warning(self, "警告", "当前有任务正在执行")
return
self.worker.files = files
self.worker.start() # 启动线程
def update_progress_bar(self, value):
self.progressBar.setValue(value) # 安全更新UI
def cancel_processing(self):
self.worker.stop()
三、进阶通信模式
3.1 复杂数据传递
# 发送字典数据
class DataSender(QThread):
complex_data = Signal(dict)
def run(self):
sensor_data = {
'timestamp': datetime.now(),
'values': [random.random() for _ in range(10)],
'status': 0x1A3
}
self.complex_data.emit(sensor_data)
# 接收处理
def handle_data(data: dict):
print(f"收到传感器数据:{data['timestamp']}")
plot_graph(data['values'])
3.2 多参数信号
# 自定义参数类型信号
multi_args_signal = Signal(int, str, list) # PyQt6语法
# 发射信号
self.multi_args_signal.emit(100, "完成", ["file1", "file2"])
# 连接槽函数
@Slot(int, str, list)
def handle_multi_args(progress, status, files):
print(f"进度:{progress}% 状态:{status}")
print("已处理文件:", files)
四、线程安全最佳实践
4.1 资源互斥访问
from threading import Lock
shared_data = []
data_lock = Lock()
class DataWriter(QThread):
def run(self):
global shared_data
for i in range(1000):
with data_lock: # 使用上下文管理器自动加锁
shared_data.append(i)
class DataReader(QThread):
data_ready = Signal(list)
def run(self):
while True:
with data_lock:
if len(shared_data) > 0:
self.data_ready.emit(shared_data.copy())
shared_data.clear()
time.sleep(0.1)
4.2 优雅停止线程
class StoppableWorker(QThread):
def __init__(self):
super().__init__()
self._stop_flag = False
def run(self):
while not self._stop_flag:
# 执行任务
time.sleep(1)
self.do_work()
def stop(self):
self._stop_flag = True
if not self.wait(3000): # 等待3秒
self.terminate() # 强制终止(不推荐)
五、性能优化技巧
- 信号频率控制:当处理高频数据时(如传感器数据),使用缓冲机制
class DataAggregator:
def __init__(self):
self.buffer = []
self.timer = QTimer()
self.timer.timeout.connect(self.flush_buffer)
self.timer.start(100) # 每100ms发送一次
def add_data(self, point):
self.buffer.append(point)
def flush_buffer(self):
if self.buffer:
emit_data(self.buffer)
self.buffer = []
- 线程池管理:使用QThreadPool避免频繁创建线程
class Task(QRunnable):
def __init__(self, task_id):
super().__init__()
self.task_id = task_id
self.signals = WorkerSignals()
def run(self):
try:
result = heavy_computation(self.task_id)
self.signals.result.emit(result)
except Exception as e:
self.signals.error.emit(str(e))
# 使用线程池
pool = QThreadPool.globalInstance()
for i in range(10):
task = Task(i)
pool.start(task)
六、调试与问题排查
6.1 常见错误案例
错误示例:跨线程直接修改UI
class WrongWorker(QThread):
def run(self):
# 错误:直接操作UI组件
main_window.label.setText("处理中...") # 将引发异常
正确做法:
class CorrectWorker(QThread):
status_update = Signal(str)
def run(self):
self.status_update.emit("处理中...") # 通过信号传递
# 主窗口连接信号
worker.status_update.connect(lambda s: label.setText(s))
6.2 调试技巧
- 使用线程ID验证执行位置:
print(f"当前线程:{QThread.currentThread().objectName()}")
- 信号连接验证:
print("连接状态:", self.signal.receivers()) # 检查信号是否连接成功
- 使用QThread的finished信号进行资源清理:
worker.finished.connect(lambda: print("线程结束"))
worker.finished.connect(worker.deleteLater) # 自动内存管理
掌握这些核心要点后,开发者可以构建出响应迅速、稳定可靠的GUI应用程序。关键是要始终遵循Qt的线程模型,合理利用信号机制进行线程间通信,同时注意资源管理和异常处理。