生产者-消费者问题与 QWaitCondition
生产者-消费者问题是一个经典的线程同步问题。Qt 中的 QWaitCondition
类允许线程在某些条件满足时唤醒其他线程,是实现线程间同步的强大工具之一。它与 QMutex
配合使用,可以高效地解决生产者-消费者问题。
下面是一个使用 QWaitCondition
实现生产者-消费者模型的详细示例代码。
代码示例:使用 QWaitCondition 的生产者-消费者模型
这个例子创建了一个固定大小的缓冲区。生产者线程向缓冲区填充数据,当缓冲区满时自动等待;消费者线程从缓冲区取出数据,当缓冲区空时自动等待。两者通过 QWaitCondition
和 QMutex
进行同步。
#include <QtCore/QCoreApplication>
#include <QWaitCondition>
#include <QThread>
#include <QMutex>
#include <QDebug>
#include <QRandomGenerator>// 全局变量定义
const int DataSize = 100; // 生产者要生产的数据总量
const int BufferSize = 10; // 环形缓冲区的大小
char buffer[BufferSize]; // 环形缓冲区QWaitCondition bufferNotEmpty; // 条件变量:缓冲区不为空(有数据可消费时唤醒消费者)
QWaitCondition bufferNotFull; // 条件变量:缓冲区不满(有空位可生产时唤醒生产者)
QMutex mutex; // 保护缓冲区的互斥锁int numUsedBytes = 0; // 缓冲区中已使用的字节数(关键共享资源,需互斥访问)
int producerIndex = 0; // 生产者下次写入的位置
int consumerIndex = 0; // 消费者下次读取的位置// 生产者线程类
class Producer : public QThread
{
public:explicit Producer(QObject *parent = nullptr) : QThread(parent) {}void run() override{qDebug() << "Producer thread started.";for (int i = 0; i < DataSize; ++i) {// 1. 获取互斥锁,保护共享资源 numUsedBytes 和缓冲区访问mutex.lock();// 2. 检查条件:如果缓冲区已满,则等待 bufferNotFull 条件while (numUsedBytes == BufferSize) {qDebug() << "Buffer full, Producer waiting...";bufferNotFull.wait(&mutex); // 等待时会临时释放mutex,被唤醒后重新获取}// 3. 生产数据(模拟:随机生成一个字符)char produceChar = "ACGT"[QRandomGenerator::global()->bounded(4)];buffer[producerIndex] = produceChar;qDebug() << "Produced: '" << produceChar << "' at position: " << producerIndex << ", Total used: " << numUsedBytes + 1;// 4. 更新生产索引和已使用字节计数producerIndex = (producerIndex + 1) % BufferSize; // 环形缓冲区,循环使用++numUsedBytes;// 5. 通知消费者:缓冲区不再为空(可能已经有数据了)if (numUsedBytes > 0) {bufferNotEmpty.wakeAll(); // 或使用 wakeOne() 唤醒一个消费者}// 6. 释放互斥锁mutex.unlock();// 模拟生产所需时间msleep(10);}qDebug() << "Producer thread finished.";}
};// 消费者线程类
class Consumer : public QThread
{Q_OBJECT
public:explicit Consumer(QObject *parent = nullptr) : QThread(parent) {}void run() override{qDebug() << "Consumer thread started.";for (int i = 0; i < DataSize; ++i) {// 1. 获取互斥锁mutex.lock();// 2. 检查条件:如果缓冲区为空,则等待 bufferNotEmpty 条件while (numUsedBytes == 0) {qDebug() << "Buffer empty, Consumer waiting...";bufferNotEmpty.wait(&mutex); // 等待时会临时释放mutex,被唤醒后重新获取}// 3. 消费数据char consumeChar = buffer[consumerIndex];qDebug() << "Consumed: '" << consumeChar << "' from position: " << consumerIndex << ", Total used: " << numUsedBytes - 1;// 4. 更新消费索引和已使用字节计数consumerIndex = (consumerIndex + 1) % BufferSize; // 环形缓冲区,循环使用--numUsedBytes;// 5. 通知生产者:缓冲区不再已满(可能已经有空位了)if (numUsedBytes < BufferSize) {bufferNotFull.wakeAll(); // 或使用 wakeOne() 唤醒一个生产者}// 6. 释放互斥锁mutex.unlock();// 模拟消费所需时间(可能比生产慢)msleep(20);}qDebug() << "Consumer thread finished.";emit done();}signals:void done();
};#include "main.moc" // 如果 Consumer 类在cpp中定义且含Q_OBJECT,则需要包含.mocint main(int argc, char *argv[])
{QCoreApplication app(argc, argv);qDebug() << "Starting producer and consumer threads...";Producer producer;Consumer consumer;QObject::connect(&consumer, &Consumer::done, &app, &QCoreApplication::quit);producer.start();consumer.start();// 等待两个线程执行完毕producer.wait();consumer.wait();qDebug() << "All threads finished. Application exiting.";return 0;
}
关键点说明
全局变量:
buffer
: 一个固定大小的环形缓冲区,用于在生产者和消费者之间共享数据。numUsedBytes
: 关键共享资源,表示缓冲区中已使用的字节数,必须用互斥锁保护。producerIndex
和consumerIndex
: 记录生产者和消费者在环形缓冲区中的当前位置。mutex
: 用于保护所有对共享资源(主要是numUsedBytes
和缓冲区)的访问。bufferNotEmpty
和bufferNotFull
: 条件变量,用于线程间通信和同步。
**Producer::run()**:
获取锁 (
mutex.lock()
)。检查条件:使用
while
循环(非if
)检查缓冲区是否已满。这是为了防止虚假唤醒(spurious wakeup)。如果满,则调用bufferNotFull.wait(&mutex)
释放互斥锁并等待。生产数据:向缓冲区写入数据。
更新状态:更新生产索引和
numUsedBytes
。发送信号:调用
bufferNotEmpty.wakeAll()
或bufferNotEmpty.wakeOne()
唤醒可能正在等待"缓冲区不为空"条件的消费者线程。释放锁 (
mutex.unlock()
)。
**Consumer::run()**:
获取锁 (
mutex.lock()
)。检查条件:使用
while
循环检查缓冲区是否为空。如果空,则调用bufferNotEmpty.wait(&mutex)
释放互斥锁并等待。消费数据:从缓冲区读取数据。
更新状态:更新消费索引和
numUsedBytes
。发送信号:调用
bufferNotFull.wakeAll()
或bufferNotFull.wakeOne()
唤醒可能正在等待"缓冲区不满"条件的生产者线程。释放锁 (
mutex.unlock()
)。
QWaitCondition::wait(QMutex *lockedMutex)
的重要性:它接受一个已锁定的互斥锁。
它会原子性地解锁
lockedMutex
并使调用线程进入等待状态。原子性操作避免了条件竞争。当线程被唤醒(通过
wakeOne()
或wakeAll()
)后,在从wait()
返回之前,它会重新获取互斥锁。
使用
while
循环检查条件: 这是必须的,而不能用if
语句替代。原因有二:虚假唤醒:即使没有线程调用
wakeOne()
或wakeAll()
,等待的线程也可能被操作系统唤醒。条件可能再次改变:在被唤醒到重新获取锁的间隙,其他线程可能已经改变了条件状态(例如,一个消费者在被唤醒后、获取锁之前,另一个消费者可能抢先取走了数据)。
QWaitCondition 与 QSemaphore 的对比
特性 | QWaitCondition + QMutex | QSemaphore |
---|---|---|
核心机制 | 基于条件的等待和通知 | 基于信号量的计数(PV操作) |
灵活性 | 更高 。可以等待复杂的条件(不仅仅是缓冲区大小) | 相对较低 。主要管理固定数量的资源访问 |
控制粒度 | 更细粒度的控制,例如可以基于多个条件进行等待和唤醒 | 主要基于可用资源数量(信号量计数) |
典型应用场景 | 需要等待特定条件成立(如缓冲区有数据/有空位、任务完成、状态变化等) | 控制对一组相同资源(如缓冲区槽位、数据库连接、IO设备)的访问 |
代码复杂度 | 相对高一些,需要手动管理互斥锁和条件检查 | 相对简单直观,特别是在经典的生产者-消费者模型中(缓冲区计数直接映射到信号量计数) |
防止虚假唤醒 | 需要开发者使用 | 信号量机制本身一定程度上减少了此类问题 |
注意事项与最佳实践
总是与互斥锁一起使用:
QWaitCondition
必须与QMutex
(或QReadWriteLock
) 结合使用,以保护共享数据。总是用 while 循环检查条件:防止虚假唤醒和条件状态意外改变。
**考虑 wakeOne() 与 wakeAll()**:
wakeOne()
随机唤醒一个等待该条件的线程,效率更高,可能减少不必要的竞争。wakeAll()
唤醒所有等待该条件的线程。通常在你认为多个线程可能都需要响应条件变化时使用(例如,多个消费者等待数据,并且新生产的数据可以被多个消费者处理)。过度使用wakeAll()
可能导致性能下降(惊群效应)。
性能考量:锁的粒度要尽可能小。在持有锁的情况下不要进行耗时的操作(如文件 I/O、网络操作等)。
线程安全:确保所有对共享数据的访问都在互斥锁的保护之下。
优雅退出:在实际应用中,你需要一种机制来通知线程优雅地退出循环,例如设置一个标志位(也需用锁保护),并在退出前唤醒所有可能等待的线程。
这个例子展示了如何使用 QWaitCondition
和 QMutex
高效地解决生产者-消费者问题。通过条件变量,线程可以在条件不满足时主动等待,并在条件可能满足时被其他线程唤醒,从而避免了忙等待,提高了 CPU 利用率和程序的响应性。