当前位置: 首页 > news >正文

C++20 线程协调类:从入门到精通

文章目录

    • 1. 初识线程协调
    • 2. std::barrier:多线程同步的屏障
      • 2.1 核心函数
      • 2.2 示例代码
      • 2.3 高级用法
      • 2.4 适用场景
    • 3. std::latch:一次性同步原语
      • 3.1 核心函数
      • 3.2 示例代码
      • 3.3 高级用法
      • 3.4 适用场景
    • 4. std::counting_semaphore:可重用的同步原语
      • 4.1 核心函数
      • 4.2 示例代码
      • 4.3 高级用法
      • 4.4 适用场景
    • 5. 性能与优化
    • 6. 实际应用案例
      • 6.1 并行矩阵乘法
      • 6.2 线程池
    • 7. 总结

在多线程编程中,线程之间的协调是一个关键问题。C++20 引入了三种新的同步原语: std::barrierstd::latchstd::counting_semaphore,它们极大地简化了线程间的同步操作。本文将从入门到精通,逐步深入地介绍这三种同步原语的使用方法和适用场景。

1. 初识线程协调

在多线程编程中,线程协调是指控制多个线程的执行顺序,确保它们在特定的点上同步或互斥。常见的线程协调问题包括:

  • 同步执行:多个线程需要在某个点上同步,然后一起继续执行。
  • 资源限制:限制同时访问某个资源的线程数量。
  • 任务完成:等待多个线程完成任务后继续执行。

C++20 引入的三种同步原语正是为了解决这些问题而设计的。

2. std::barrier:多线程同步的屏障

std::barrier 是一种用于多线程同步的机制,它允许一组线程在某个点(称为屏障点)上同步。当线程到达屏障点时,它会被阻塞,直到所有线程都到达该点,然后所有线程同时继续执行。

2.1 核心函数

  • arrive_and_wait():线程到达屏障点并等待,直到所有线程都到达屏障点。
  • arrive_and_drop():线程到达屏障点,减少一个期待线程数,并重置屏障。

2.2 示例代码

以下代码展示了如何使用 std::barrier 来同步多个线程:

#include <iostream>
#include <thread>
#include <barrier>
#include <vector>

void worker(std::barrier& barrier, int id) {
    std::cout << "Thread " << id << " is working.\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000 * id));
    std::cout << "Thread " << id << " reached the barrier.\n";
    barrier.arrive_and_wait();
}

int main() {
    const int num_threads = 3;
    std::barrier barrier(num_threads);
    std::vector<std::thread> threads;

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, std::ref(barrier), i + 1);
    }

    for (auto& t : threads) {
        t.join();
    }

    std::cout << "All threads have finished.\n";
    return 0;
}

2.3 高级用法

std::barrier 支持一个可选的回调函数,当所有线程到达屏障点时,回调函数会被自动调用。这可以用于执行一些同步操作,例如更新状态或清理资源。

std::barrier barrier(num_threads, [](std::size_t) {
    std::cout << "All threads have reached the barrier. Executing callback.\n";
});

2.4 适用场景

  • 分阶段任务:适用于多线程分阶段任务,每个阶段结束后同步。
  • 并行算法:例如矩阵乘法等多阶段并行算法。

3. std::latch:一次性同步原语

std::latch 是一种一次性同步原语,用于确保一组线程在某个条件满足后继续执行。它类似于 std::counting_semaphore,但只能使用一次。

3.1 核心函数

  • count_down():减少计数器的值。
  • wait():阻塞线程,直到计数器减少到零。

3.2 示例代码

以下代码展示了如何使用 std::latch 等待多个线程完成任务:

#include <iostream>
#include <thread>
#include <latch>
#include <vector>

void worker(std::latch& latch, int id) {
    std::cout << "Thread " << id << " is working.\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000 * id));
    std::cout << "Thread " << id << " finished. Counting down latch.\n";
    latch.count_down();
}

int main() {
    const int num_threads = 3;
    std::latch latch(num_threads);

    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, std::ref(latch), i + 1);
    }

    latch.wait();
    std::cout << "All threads have finished.\n";

    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

3.3 高级用法

std::latch 的计数器可以初始化为任意值,但一旦计数器减少到零,后续的 wait() 调用将立即返回,而不会阻塞。

3.4 适用场景

  • 一次性同步:适用于一次性同步,确保所有线程完成某个任务后继续。
  • 资源初始化:例如初始化资源或等待所有任务完成。

4. std::counting_semaphore:可重用的同步原语

std::counting_semaphore 是一种计数信号量,用于控制对共享资源的访问。它类似于 std::latch,但可以多次使用。

4.1 核心函数

  • acquire():减少信号量的计数,阻塞直到计数大于零。
  • release():增加信号量的计数。

4.2 示例代码

以下代码展示了如何使用 std::counting_semaphore 控制线程的执行:

#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>

void worker(std::counting_semaphore<>& sem, int id) {
    std::cout << "Thread " << id << " is waiting.\n";
    sem.acquire();
    std::cout << "Thread " << id << " is working.\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000 * id));
    std::cout << "Thread " << id << " finished.\n";
}

int main() {
    const int num_threads = 3;
    std::counting_semaphore<> sem(2); // 允许同时运行两个线程

    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, std::ref(sem), i + 1);
    }

    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

4.3 高级用法

std::counting_semaphore 的计数器可以初始化为任意值,且可以通过 try_acquire() 尝试非阻塞地获取信号量。

4.4 适用场景

  • 资源限制:适用于控制对共享资源的访问。
  • 线程池:例如限制同时运行的线程数量。

5. 性能与优化

虽然 std::barrierstd::latchstd::counting_semaphore 提供了强大的同步功能,但过度使用同步原语可能会导致性能问题。以下是一些优化建议:

  1. 减少同步点:尽量减少线程同步的次数,避免不必要的阻塞。
  2. 使用局部变量:尽量使用线程局部变量(thread_local),减少线程间的竞争。
  3. 避免死锁:确保线程同步的顺序一致,避免死锁。
  4. 使用无锁编程:在可能的情况下,使用无锁编程技术,减少同步开销。

6. 实际应用案例

6.1 并行矩阵乘法

以下代码展示了如何使用 std::barrier 实现并行矩阵乘法:

#include <iostream>
#include <vector>
#include <thread>
#include <barrier>

void multiply(const std::vector<std::vector<int>>& A, const std::vector<std::vector<int>>& B, std::vector<std::vector<int>>& result, int row, int col, std::barrier& barrier) {
    for (size_t i = 0; i < A.size(); ++i) {
        result[row][col] += A[row][i] * B[i][col];
    }
    barrier.arrive_and_wait();
}

int main() {
    const int size = 4;
    std::vector<std::vector<int>> A(size, std::vector<int>(size, 1));
    std::vector<std::vector<int>> B(size, std::vector<int>(size, 2));
    std::vector<std::vector<int>> result(size, std::vector<int>(size, 0));

    std::barrier barrier(size * size);
    std::vector<std::thread> threads;

    for (size_t i = 0; i < size; ++i) {
        for (size_t j = 0; j < size; ++j) {
            threads.emplace_back(multiply, std::ref(A), std::ref(B), std::ref(result), i, j, std::ref(barrier));
        }
    }

    for (auto& t : threads) {
        t.join();
    }

    for (const auto& row : result) {
        for (const auto& val : row) {
            std::cout << val << " ";
        }
        std::cout << "\n";
    }

    return 0;
}

6.2 线程池

以下代码展示了如何使用 std::counting_semaphore 实现一个简单的线程池:

#include <iostream>
#include <thread>
#include <semaphore>
#include <queue>
#include <functional>
#include <vector>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : semaphore(num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) {
                            return;
                        }
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (auto& t : threads) {
            t.join();
        }
    }

    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
    std::counting_semaphore<> semaphore;
};

int main() {
    ThreadPool pool(4);

    auto result1 = pool.enqueue([] { return 42; });
    auto result2 = pool.enqueue([] { return 43; });

    std::cout << "Result 1: " << result1.get() << "\n";
    std::cout << "Result 2: " << result2.get() << "\n";

    return 0;
}

7. 总结

C++20 引入的 std::barrierstd::latchstd::counting_semaphore 提供了强大的线程协调机制,简化了多线程编程中的同步操作。它们各具特色,适用于不同的场景:

  • std::barrier:适用于多线程分阶段任务,每个阶段结束后同步。
  • std::latch:适用于一次性同步,确保所有线程完成某个任务后继续。
  • std::counting_semaphore:适用于控制对共享资源的访问。

通过合理使用这些同步原语,可以显著提高代码的可读性和性能,同时减少死锁和竞态条件的可能性。

相关文章:

  • 大模型知识补充
  • 【APT攻击】针对渗透测试人员的大规模钓鱼攻击,涉及38个Github账号,你中招了吗?
  • 【大模型基础_毛玉仁】2.6 非 Transformer 架构
  • 【Go】运算符笔记
  • MFC开发:图形的绘制
  • C++八大常见的设计模式的实现与实践指南
  • 【亲测有效,已顺利上线】你好,你的小程序涉及提供播放、观看等服务,请补充选择:文娱-其他视频类目。(多种没有资质的解决方案)
  • 【EDA】Altium Designer关于禁网设置的那些事
  • 数据库系统概论
  • 【大模型基础_毛玉仁】3.1 Prompt 工程简介
  • 从0到1彻底掌握Trae:手把手带你实战开发AI Chatbot,提升开发效率的必备指南!
  • 活码在实际操作中的具体场景有哪些?怎么应用?
  • 蓝桥杯嵌入式组第十四届省赛题目解析+STM32G431RBT6实现源码
  • 整合百款经典街机游戏的模拟器介绍
  • 算法刷题记录——LeetCode篇(3) [第201~300题](持续更新)
  • 谱分析方法
  • 为什么“连接断开可能导致锁未释放”
  • 3.17 模拟赛总结(虚树求交,FWT/容斥, 后缀数组SA)
  • 【八股文】从浏览器输入一个url到服务器的流程
  • 密度估计:参数与非参数
  • 国新办将就2025年4月份国民经济运行情况举行新闻发布会
  • 秦洪看盘|指标股发力,A股渐有突破态势
  • 第十二届警博会在京开幕:12个国家和地区835家企业参展
  • 125%→10%、24%税率暂停90天,对美关税开始调整
  • 陕西河南山西等地将现“干热风”灾害,小麦产区如何防范?
  • 北京今日白天超30℃晚间下冰雹,市民称“没见过这么大颗的”