当前位置: 首页 > news >正文

C++ 线程库使用详解

本文初发于 “天目中云的小站”,同步转载于此。’

C++ 线程

核心操作

线程创建

// 需要传入回调函数, 如果有参数紧跟在后面
std::thread t1(print_hello, 1);  
// 当然lambda也可以当作回调函数传进去
std::thread t([]() {
    std::cout << "Hello from lambda thread!" << std::endl;
});
// 也可以创建时传入回调函数
std::thread t(calculate_sum, 3, 5, print_result);

线程等待

t1.join();
  • 为什么需要线程等待?

    目的在于接收信息和同步.

    join中并没有实际和接收信息相关的操作, 信息交流是靠其他几种线程间通信的方式来实现的. 关键在于如果不用join, 将无法安全使用线程间通信得来的信息, 因为如果不等待子线程运行结束, 主线程和子线程并发运行, 主线程没有办法确定什么时候子线程会得到结果并发送过来, 简单来说就是无法实现同步.

  • 可以通过joinable()函数来确定线程是否可join而未被分离.

线程分离

t1.detach();

线程分离就是为了实现和线程等待相反的目的, 也就是不需要同步的情况, 当代码不需要和主线程互通有无的时候便可以调用detach, 让线程自己干自己的去.


线程间通信

这里线程间通信的定义其实很广泛, 可以细化为线程间权限申请, 数据交互, 函数调用等.

  • 互斥锁
mutex mtx;
mtx.lock();
//....
mtx.unlock();
  • RAII风格的锁
mutex mtx;
{
	lock_guard<std::mutex> lock(mtx); // 离开作用域自动调用析构函数解锁
 	// ... 使用lock_guard需要控制作用域   
}

条件变量

template <typename Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

条件变量需要和互斥锁绑定, 并且需要条件撑腰, 在等待时是否原先持有的锁, 被唤醒时重新取回拥有的锁并进行之后的代码, 有两种方式判断条件 :

  • 直接套在while循环里, 只有符合条件才能离开循环, 这里循环不是意味着要频繁判断, wait本身是阻塞的, 而是为了避免虚假唤醒的情况, 可以理解为系统问题.
std::mutex mtx;                    // 互斥量
std::condition_variable cv;         // 条件变量
// ...
std::unique_lock<std::mutex> lck(mtx);
while (!ready) {               // 使用while循环避免虚假唤醒
    cv.wait(lck);               // 等待通知
}
// ... 另一个线程
cv.notify_all(); 
cv.notify_one();
  • 使用wait的第二个参数, 传入一个返回值为bool类型的回调函数, 官方称其为谓词.

    谓词只在第一次触发wait和被唤醒之后发生, 如果谓词返回值为true, 则继续后面的代码, 反之继续阻塞挂起.

condition.wait(lock, [this] {  // 捕获this,使之可以使用成员函数和变量
    return stop || !taskQueue.empty();
});

其实底层就是对C风格互斥锁和条件变量的封装.

共享内存

这是一个非常宽泛的概念, 我们知道一个进程的不同线程间数据块,堆都是共享的, 所以在其中申请的资源都是共享内存, 锁也是为了应对使用共享内存的情况而被设计的.

消息队列

其实就是STL中的queue, 这个容器其实非常强大, 因为其是线程安全的, 例如我们就可以直接在生产者线程中将产物push进queue, 然后直接在消费者线程中从queue中取出, 这其中没有任何线程安全问题!

future和promise

future可以得到未来从其他线程传来的一个结果.

这里只介绍future在线程中的普通用法, 一般和promise共用.

  • future : 未来, 用于表示在未来这里会获取一个结果.
  • promise : 承诺, 可以和future关联, 承诺会设置future要获取的结果.
std::promise<int> prom;  
std::future<int> fut = prom.get_future(); // 将promise和future关联

std::thread t([&prom]() { // 捕获prom传入线程
    int result = 42; // 模拟计算
    prom.set_value(result); // promise将结果传递给future
});

int value = fut.get(); // 阻塞直到获取结果
t.join();
std::cout << "Result: " << value << "\n";
  • async : asyncasynchronous 的缩写,表示“异步”的意思. 不过异步这个意义很宽泛, 很多都可以被视为异步. 有些异步相对复杂, 比如异步I/O, 有些有十分简单, 比如我们现在学的async.

    其实它的实际功效就是上面future + promise + thread的统合, 可以理解为和future强绑定的thread.

    可以向async中传入一个回调函数, async会直接在一个新线程中运行它, 会返回一个future对象, 新线程运行结束会将结果设置到future对象中.

    template< class F, class... Args >
    std::future<std::invoke_result_t<F, Args...>> 
      async(F&& f, Args&&... args);
    
    • f : 用于确定async的启动策略, 一般写std::launch::async, 代表强制任务在新线程中执行.
    • args : 可变参数, 传入回调函数需要的参数.
    // 使用 std::async 启动一个异步任务
    std::future<int> fut = std::async(std::launch::async, []() {
        int result = 42; // 模拟计算
        return result;   // 返回结果
    });
    // 获取结果(会阻塞直到任务完成)
    int value = fut.get();
    std::cout << "Result: " << value << "\n";
    

共享内存 vs 消息队列 vs future

这三个其实都可以做到线程间传递数据, 但是应用场景不同.

  • 共享内存 : 用于多数据, 需求复杂的情况, 需要互斥锁和条件变量维护.
  • 消息队列 : 适合生产者消费者模型, 简单易用, 线程安全, 但是需要排队, 可能需要条件变量配合.
  • future : 只传递单数据, 简单易用, 并且线程安全不需要考虑额外内容.

this_thread

这是 C++11 引入的一个命名空间,用来提供一些和当前线程相关的操作.

void sleep_for(const std::chrono::duration& d); // 使当前线程挂起指定的时间
std::this_thread::sleep_for(std::chrono::seconds(2));  // 休眠2秒

void sleep_until(const std::chrono::time_point& t);  // 使进程挂起到指定的时间
auto wakeup_time = std::chrono::system_clock::now() + std::chrono::seconds(3);  // 3秒后
std::this_thread::sleep_until(wakeup_time);  // 休眠直到指定时间

void yield(); // 一个请求, 表示当前线程愿意让出cpu的时间片, 假如线程间的优先度不同可以使用
std::this_thread::yield();

thread的移动语义

线程对象不可复制, 因为线程本身就不好复制, 从底层来讲thread类的拷贝构造函数本身就是删除的.

但是thread类可以被移动, 其实就相当于转移控制权, 可以使用move转换.

假如我们想把多个线程存入一个vector中, 可以使用emplace_back在vector中直接构建, 也可以选择在外部构建然后用move移动进去.

std::thread t(task);  // 创建一个线程对象 t
threads.push_back(std::move(t));

简易线程池

AI生成仅供学习 :

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>

// 线程池类
class ThreadPool {
public:
    ThreadPool(size_t numThreads) {
        // 启动指定数量的工作线程
        for (size_t i = 0; i < numThreads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queueMutex);
                        condition.wait(lock, [this] {
                            return stop || !taskQueue.empty();
                        }); // 直到return 返回 ture才能进行
                        
                        // 如果线程池停止且任务队列为空,退出线程
                        if (stop && taskQueue.empty()) {
                            return;
                        }
						// 取出队列不能拷贝, 要移动
                        task = std::move(taskQueue.front()); 
                        taskQueue.pop();
                    }
                    // 执行任务
                    task();
                }
            });
        }
    }

    // 提交任务
    template <class F>
    void enqueue(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            if (stop) {
                throw std::runtime_error("ThreadPool is stopped");
            }
            taskQueue.push(std::forward<F>(f)); // 完美转发到队列中
        }
        condition.notify_one(); // 传入一个任务, 就激活等待中的一个线程
    }

    // 停止线程池
    void stopPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();

        for (std::thread& worker : workers) {
            worker.join();
        }
    }

    ~ThreadPool() {
        if (!stop) {
            stopPool();
        }
    }

private:
    std::vector<std::thread> workers;           // 工作线程
    std::queue<std::function<void()>> taskQueue;  // 任务队列
    std::mutex queueMutex;                      // 任务队列互斥锁
    std::condition_variable condition;          // 条件变量
    std::atomic<bool> stop{false};              // 是否停止线程池
};

int main() {
    ThreadPool pool(4);  // 创建一个包含4个线程的线程池
    // 提交几个任务给线程池执行
    for (int i = 0; i < 10; ++i) {
        pool.enqueue([i] {
            std::cout << "Task " << i << " is being executed by thread " 
                      << std::this_thread::get_id() << std::endl;
        });
    }
    // 停止线程池
    pool.stopPool();
    return 0;
}

by 天目中云

相关文章:

  • 局部适应的分子标记筛选
  • 京准电钟:NTP精密时钟服务器在自动化系统中的作用
  • 2025年2月最新一区SCI-海市蜃楼搜索优化算法Mirage search optimization-附Matlab免费代码
  • 5、进程间有哪些通信方式【高频】
  • 蓝桥杯之日期问题2
  • 前端如何进行性能优化
  • Hbase分布式——储存机制
  • 人工智能中的特征是什么?
  • MySQL常用命令大全(可复制使用)
  • 深入理解 JavaScript 中的 this 指向
  • android计算屏幕尺寸dpi
  • Java设计模式-基于MVC的WEB设计模式
  • 【Springboot知识】Logback从1.2.x升级到1.3.x需要注意哪些点?
  • 谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理
  • MotionLayout(二):MotionLayout是什么?MotionLayout调试技巧、KeyFrame关键帧等等
  • Desktop Builder 操作概述
  • 前端工程化---ES6
  • LeetCode刷题---栈---946
  • XXE漏洞:原理、危害与修复方法详解
  • AIP-153 导入和导出
  • 巴基斯坦外长:近期军事回应是自卫措施
  • 巴总理召开国家指挥当局紧急会议
  • 101条关于减重的知识,其中一定有你不知道的
  • 屈晓华履新四川省社科联党组书记,此前担任省国动办主任
  • 复旦发文缅怀文科杰出教授裘锡圭:曾提出治学需具备三种精神
  • 央行行长:债券市场“科技板”准备工作基本就绪,目前近百家市场机构计划发行超三千亿科技创新债