当前位置: 首页 > news >正文

QT/C++ 多线程并发下载实践

在python线程池测试例子中,用到了queue的功能,python中,queue是阻塞式获取元素,所以是线程安全的,参考如下的示例:

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import time

def worker(queue,id):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f" thread {id} Processing {item}")
        # 模拟处理耗时
        time.sleep(0.5)
    print("Worker exiting")

q = Queue()
executor = ThreadPoolExecutor(max_workers=5)

future_list = []
for i in range(5):
    future = executor.submit(worker, q,i)
    future_list.append(future)

# 提交任务
#future = executor.submit(worker, q)
#future = executor.submit(worker, q)

# 生产数据
for i in range(15):
    q.put(i)
for i in range(15):
    q.put(None)  # 发送终止信号

for f in future_list:
    f.result()

executor.shutdown()

在Qt中,QQueue对象并不是线程安全的,所以我们需要通过改造来实现类似的功能。

一、Qt BlockingQueue定义

为了实现多线性并发,我们需要在队列的take方法中启用QMutexLocker来保证线程安全,并启动QWaitCondition的wait方法以及waitone方法来实现元素阻塞式的访问。

定义"blockingQueue.h":

#include <QWaitCondition>
#include <QQueue>
#include <QMutex>

template <typename T>
class blockingQueue
{
public:
    blockingQueue() {}
    void put(const T& value)
    {
        QMutexLocker locker(&m_mutex);
        m_queue.enqueue(value);
        m_condition.wakeOne();   //唤醒等待队列中的一个线程(来自wait)
    }
    T take()
    {
        QMutexLocker locker(&m_mutex);
        //队列为空,则等待,否则直接返回队列头元素
        while (m_queue.isEmpty()) {
            m_condition.wait(&m_mutex);
        }
        return m_queue.dequeue();
    }
    bool isEmpty() const
    {
        QMutexLocker locker(&m_mutex);
        return m_queue.isEmpty();
    }
    int size() const
    {
        QMutexLocker locker(&m_mutex);
        return m_queue.size();
    }

private:
    QQueue<T> m_queue;
    mutable QMutex m_mutex;
    QWaitCondition m_condition;
};

二、下载任务类定义

该类中,定义下载项目和下载任务对象。并在后续的线程池中,作为参数传递给执行函数。

定义"downloadTask.h":

//下载项目
struct downloadItem
{
	QString src;
	QString dest;
	bool quit;
	downloadItem(bool isQuit = false)
	{
		quit = isQuit;
	}
};

//整个下载任务,包含所有下载项目队列
class downloadTask  : public QObject
{
	Q_OBJECT

public:
	downloadTask(QObject *parent = nullptr);
	~downloadTask();
	bool isQuit() { QMutexLocker locker(&m_mutex);return m_quit; };
	void quit() { QMutexLocker locker(&m_mutex); m_quit = true; };
	downloadItem get() { return m_Queue.take(); };
	void add(downloadItem &item) {  m_Queue.put(item); };;
private:
	mutable QMutex m_mutex;
	bool m_quit = false;
	blockingQueue<downloadItem> m_Queue;
};

三、创建下载任务

以下代码模拟重建100个下载任务,并在正常任务后面添加终止标记

void createTask()
{
    downloadTask* task = new downloadTask();
    //创建任务
    for (size_t i = 0; i < 100; i++)
    {
        downloadItem item;
        item.src = QString("%1.jpg").arg(i);
        item.dest = QString("%1-d.jpg").arg(i);
        task->add(item);
    }
    //终止标记(在需要正常结束的时候添加)
    for (size_t i = 0; i < gc_threadCount; i++)
    {
        downloadItem item(true);
        task->add(item);
    }
}

四、执行下载任务

定义下载任务由5个线程并发执行:

static int gc_threadCount = 5;

下载具体执行将有QRunable类操作:

class taskRunable : public QRunnable
{
public:
	taskRunable(downloadTask* task) :m_task(task) {}

	void run() override 
	{
		while (true)
		{
			//任务中途终止
			if (m_task->isQuit())
				break;
			downloadItem item = m_task->get();
			//任务完成后终止
			if (item.quit)
				break;
			//TODO:下载函数处理item
		}
	}

private:
	downloadTask* m_task;
};

下载任务将有线程池执行。

void startDownload(downloadTask* task)
{
    QThreadPool::globalInstance()->setMaxThreadCount(gc_threadCount);
    for (int i = 0; i < gc_threadCount; ++i)
    {
        auto runable = new taskRunable(task);
        QThreadPool::globalInstance()->start(runable);
    }
    QThreadPool::globalInstance()->waitForDone();
    return;
}

如需要中途结束下载任务,则执行task->quit();

五、总结

至此,我们通过改造Qt的队列,实现了阻塞式的元素获取。并通过线城池的方式,实现了并发下载的示例。

相关文章:

  • nx-admin1.2版本发布
  • 【教程】如何使用匿名Github仓库: anonymous.4open.science
  • 【蓝桥杯】单片机设计与开发,RTC实时时钟
  • 微信小程序使用 Vant Weapp 组件库教程
  • 迅为RK3568开发板helloworld 驱动实验-驱动编写
  • Python 自动化:节省时间,更智能地工作
  • Python小练习系列 Vol.12:学生信息排序(sorted + key函数)
  • 1.2 基于卷积神经网络与SE注意力的轴承故障诊断
  • Spring 面经
  • 生物化学笔记:医学免疫学原理11 免疫应答 + 固有免疫应答占位效应 + 适应性免疫应答 IgM和IgG抗体用于判断感染时期
  • 【C语言】深入理解指针(三):C语言中的高级指针应用
  • Linux centos 7 服务器组建与管理
  • 2025年 APP测试要点汇总!
  • docker存储卷及dockers容器源码部署httpd
  • Tomcat中的webapps的访问方式和java -jar内置Tomcat的访问方式的区别
  • 【MVP 和 MVVM 相比 MVC 有哪些优化点?】
  • 【MySQL篇】从零开始:解锁数据库的神秘面纱
  • 多光谱相机在农业中的应用(农作物长势、病虫害、耕地检测等)
  • 程序化广告行业(49/89):平台对接与用户识别技术全解析
  • 论文阅读笔记:Denoising Diffusion Implicit Models (3)
  • 中青报聚焦上海社区心理服务:社工介入让居民“心畅”
  • 浦江潮涌征帆劲,上海以高质量发展服务全国发展大局
  • 大风+暴雨,中央气象台双预警齐发
  • 美联储官员:美国经济增速可能放缓,现行关税政策仍将导致物价上涨
  • 李成钢:近期个别经济体实施所谓“对等关税”,严重违反世贸组织规则
  • 昆明公布3起经济犯罪案例:一人持有820余万假美元被判刑十年