当前位置: 首页 > news >正文

《Muduo网络库:实现one loop per thread设计模式》

自此,我们掌握了EventLoop事件循环类,事件到来之后可以跨线程任务调度从而避免了直接操作EventLoop非线程安全状态造成的线程安全问题。

EventLoop是运行在一个线程中的,并且一个线程运行一个EventLoop,one loop per thread是如何实现的呢?

实现Thread线程类

专门处理一个线程。

Thread.h

#pragma once#include "noncopyable.h"#include <functional>
#include <thread>
#include <memory>
#include <unistd.h>
#include <string>
#include <atomic>class Thread : noncopyable
{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = std::string());~Thread();void start();void join();bool started() const { return started_; }pid_t tid() const { return tid_; }const std::string &name() const { return name_; }static int numCreated() { return numCreated_; }private:// 生成默认线程名void setDefaultName();bool started_;bool joined_;std::shared_ptr<std::thread> thread_;pid_t tid_;ThreadFunc func_;std::string name_;static std::atomic_int numCreated_; // 静态原子变量确保线程数量和线程名唯一
};

 Thread.cc

#include "Thread.h"
#include "CurrentThread.h"
#include <semaphore.h>Thread::Thread(ThreadFunc func, const std::string &name): started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name)
{setDefaultName();
}Thread::~Thread()
{if (started_ && !joined_){thread_->detach(); // thread类提供的设置分离线程的方法}
}void Thread::start() // 一个Thread对象,记录的就是一个新线程的详细信息
{started_ = true;sem_t sem;sem_init(&sem, false, 0);// 开启新线程,新线程的执行逻辑thread_ = std::shared_ptr<std::thread>(new std::thread([&](){// 获取线程tid值tid_=CurrentThread::tid();// 发送信号,告知主线程“tid以获取”sem_post(&sem);  // 开启的新线程,专门执行该线程函数func_(); }));// 这里主线程必须等待获取上面创建的新线程的tid值sem_wait(&sem);
}void Thread::join()
{joined_ = true;thread_->join();
}// 生成默认线程名"Thread1""Thread2"
void Thread::setDefaultName()
{int num = ++numCreated_;if (name_.empty()){char buf[32] = {0};snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}

使用std::shared_ptr<std::thread>管理线程对象,提供更灵活的生命周期管理。

来分析一下Thread类实现的核心方法,start()方法:

1、使用信号量同步线程创建

2、new创建新线程,在Lambda表达式中:

  • 获取新线程id值并存储
  • 发送信号量通知主线程已获取id
  • 执行用户提供的线程函数

3、使用信号量主动等待,确保start()方法返回前主线程已获取到新线程的id

实现EventLoopThread类

Thread类实现完成,如何在一个线程中运行一个EventLoop?也就是怎么结合EventLoop和Thread实现one loop per thread模型?

Muduo实现了EventLoopThread事件循环线程类,它是一个将线程与事件循环EventLoop绑定的一个类,核心实现one loop per thread,每个线程独立运行一个EventLoop。

EventLoopThread.h

#pragma once#include "noncopyable.h"
#include "Thread.h"#include <functional>
#include <mutex>
#include <condition_variable>
#include <string>class EventLoop;class EventLoopThread : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(), const std::string &name = std::string());~EventLoopThread();EventLoop *startLoop();private:void threadFunc();EventLoop *loop_;  // 将在新线程中指向创建的EventLoop对象bool exiting_;  // 标记是否退出Thread thread_;  // 创建底层线程对象,绑定线程的入口函数为当前类的成员函数threadFunc,并指定线程名称std::mutex mutex_;std::condition_variable cond_;ThreadInitCallback callback_;  // 保存用户传入的初始化回调,用于在EventLoop启动前做额外配置
};

EventLoopThread.cc

#include "EventLoopThread.h"
#include "EventLoop.h"EventLoopThread::EventLoopThread(const ThreadInitCallback &cb, const std::string &name): loop_(nullptr), exiting_(false), thread_(std::bind(&EventLoopThread::threadFunc, this), name), mutex_(), cond_(), callback_(cb)
{
}EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != nullptr){loop_->quit();  // EventLoop退出事件循环thread_.join(); // 主线程主动等待新线程结束,回收线程资源}
}EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 启动(创建)底层的新线程,触发threadFunc()执行EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);// 当loop_为空时,必须主动等待新线程中loop_被初始化while (loop_ == nullptr){cond_.wait(lock); // 主动等待,释放锁并阻塞,直到被唤醒}loop = loop_; // 获取初始化完成的EventLoop指针}return loop;
}// 创建的新线程执行的线程函数
void EventLoopThread::threadFunc()
{// 在新线程栈上创建EventLoop对象(与该线程绑定),one loop per threadEventLoop loop;if (callback_){callback_(&loop); // 执行用户初始化回调(设置线程名称)}{std::unique_lock<std::mutex> lock(mutex_);loop_ = &loop;      // 将loop_指向新创建的EventLoopcond_.notify_one(); // 通知主线程,loop_已初始化完成}loop.loop(); // 启动事件循环,执行EventLoop的loop()=> 开启Poller.poll()(阻塞,直到loop_.quit()被调用)// 事件循环退出后清理std::unique_lock<std::mutex> lock(mutex_);loop_ = nullptr;
}

这种设计就是高性能网络库Muduo的典型实现,通过“one loop per thread”充分利用多核CPU,同时简化线程间同步。

来分析一下其核心方法,startLoop()方法:

启动线程并等待EventLoop初始化完成,返回其指针。

1、调用thread_.start()启动底层线程,此时新线程会执行threadFunc()函数

2、主线程通过条件变量等待新线程中的EventLoop完全初始化:

  • 加锁后检查loop_是否为空(初始状态)
  • 若未初始化,则在条件变量上等待并释放锁阻塞,直到新线程通知loop_已赋值

3、最终返回初始化完成的EventLoop*,确保外部拿到的指针一定有效。

再来分析一下线程入口函数,threadFunc()方法:

新线程的执行逻辑,负责创建EventLoop对象并运行事件循环。

1、创建EventLoop:在新线程的栈上创建EventLoop对象,生命周期与线程绑定,线程结束则对象销毁。

2、执行初始化回调:若用户传入callback_,则在事件循环启动前执行。

3、通知主线程:

  • 加锁后将loop_指向新创建的EventLoop对象
  • 调用cond_.notify_one()唤醒主线程的cond_.wait(lock),告知loop_已就绪

4、启动事件循环:调用loop.loop()进入事件循环(处理IO事件,直到loop_->quie()被调用)。

5、清理工作:事件循环退出后,将loop_重置为nullptr(线程即将结束)。

整体EventLoopThread流程

负责启动新线程,并在其线程栈上创建EventLoop对象与其绑定。

1、创建对象:EventLoop构造时,绑定线程入口函数threadFunc(),但线程未启动。

2、启动流程:

  • 外部调用startLoop(),触发thread_.start()启动新线程,新线程执行threadFunc()
  • threadFunc()中创建EventLoop并通过条件变量通知主线程,主线程从startLoop()返回EventLoop*(与新线程绑定的EventLoop对象指针)

3、运行阶段:EventLoop在新线程中通过loop.loop()持续运行,处理事件。

3、销毁流程:

  • 析构EventLoopThread时,设置exiting_=true,调用loop_.quit()退出事件循环
  • 新线程执行loop_=nullptr后结束,主线程通过thread_。join()等待线程回收

核心设计亮点

线程与事件循环绑定:EventLoop对象在新线程栈上创建,确保其生命周期与线程一致,避免跨线程访问问题。

同步机制:通过互斥锁+条件变量解决主线程与新线程的竞态条件,确保startLoop()返回的EventLoop*一定有效,保证了线程安全

可扩展性:支持线程初始化回调ThreadInitCallback(),允许用户在事件循环启动前自定义配置(如设置线程名称、注册初始事件等)

实现EventLoopThreadPool线程池类

在多线程工作环境下肯定存在多个EventLoopThread对象,如何管理多个EventLoopThread对象?

Muduo实现了EventLoopThreadPool事件循环线程池类,用于管理多个EventLoopThread对象,实现了one loop per thread模式的线程池版本。可以根据配置创建多个事件循环线程EventLoopThread,并提供EventLoop的分配机制。

EventLoopThreadPool.h

#pragma once#include "noncopyable.h"#include <functional>
#include <string>
#include <vector>
#include <memory>class EventLoop;
class EventLoopThread;class EventLoopThreadPool : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);~EventLoopThreadPool();// 设置线程数量void setThreadNum(int numThreads) { numThreads_ = numThreads; }// 启动线程池void start(const ThreadInitCallback &cb = ThreadInitCallback());// 获取下一个处理事件的EventLoopEventLoop *getNextLoop();// 获取所有EventLoopstd::vector<EventLoop *> getAllLoops();bool started() const { return started_; }const std::string &name() const { return name_; }private:EventLoop *baseLoop_; // 用户自己的(主线程的)EventLoop, EventLoop loop,不参与线程池管理std::string name_;    // 线程池名字bool started_;        // 标志线程池是否启动int numThreads_;      // 线程池大小,要创建的事件循环线程数量int next_;            // 轮询算法的索引,用于分配下一个事件循环std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop *> loops_; // 存储所有子线程的事件循环的指针
};

EventLoopThreadPool.cc

#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"#include <memory>EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg): baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0)
{
}EventLoopThreadPool::~EventLoopThreadPool()
{/*** 不需要显示释放资源* 1. threads_存储的是unique_ptr管理的EventLoopThread,会自动释放EventLoopThread对象* 2. EventLoop对象在EventLoopThread的线程栈上创建的,线程结束时会自动销毁*/
}void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{started_ = true;for (int i = 0; i < numThreads_; i++){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i); // 为每个线程生成唯一名称:主线程名+线程编号EventLoopThread *t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t)); // 将每一个EventLoopThread存入threads_// 启动每一个EventLoopThread并将返回的EventLoop*存入loops_loops_.push_back(t->startLoop()); // 底层启动创建线程,绑定一个新的EventLoop,并返回该loop的地址}// 如果线程数为0表示整个服务端只有一个主线程线程,直接在主线程的baseLoop_上执行初始化回调if (numThreads_ == 0 && cb){cb(baseLoop_);}
}// 如果工作在多线程中,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{EventLoop *loop = baseLoop_;if (!loops_.empty()) // 通过轮询(按顺序循环)获取下一个处理事件的loop{loop = loops_[next_];++next_;if (next_ > loops_.size()){next_ = 0;}}return loop;
}std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{if (loops_.empty()){return std::vector<EventLoop *>(1, baseLoop_);}else{return loops_;}
}

整体工作流程

1、初始化线程池,设置线程数量

2、调用start()方法启动线程池,创建指定数量的EventLoopThread事件循环线程对象

3、调用每一个EventLoopThread对象的startLoop()方法,创建一个线程和其对应的EventLoop

4、通过getNext Loop()方法以轮询(按顺序)的方式获取事件循环,分配任务

5、内部通过loops_容器管理所有子线程的事件循环,start Loop()方法会返回子线程绑定的EventLoop的指针

核心设计亮点

分层设计:线程池不直接管理线程,而是通过EventLoopThread间接管理,职责清晰

灵活配置:可通过setThreadNum()动态设置线程数量

负载均衡:通过轮询算法为每个事件循环分配任务,简单高效

线程安全:通过EventLoopThread内部的同步机制保证线程安全

兼容性:支持单线程模式(线程数为0时使用主线程的事件循环)

以上,就完成了EventLoopThreadPool事件循环线程池类,这种设计,通过多线程事件循环充分利用多核CPU,提高了并发处理能力。

http://www.dtcms.com/a/486120.html

相关文章:

  • 怎么注册网站卖东西哪有培训网站开发
  • makefile概述
  • 用R处理nc文件
  • GaussDB DN动态内存使用满导致DN主备切换
  • 湖南微网站开发北京市建设规划网站
  • TCP与UDP:传输层双雄的核心对比
  • 安化网站建设怎样建个人网站 步骤
  • 并查集-547.省份的数量-力扣(LeetCode)
  • 生命周期全景图:从componentDidMount到getSnapshotBeforeUpdate
  • p2p做网站plc编程入门基础知识
  • 学院个人信息|基于SprinBoot+vue的学院个人信息管理系统(源码+数据库+文档)
  • Unity AB包加载与依赖管理全解析
  • 基于Springboot的游戏网站的设计与实现45nuv3l8(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 深入理解 Vue.js 原理
  • 基于bert-base-chinese的外卖评论情绪分类项目
  • OpenSSL EVP编程介绍
  • 网站服务器组建中国国际贸易网站
  • 上新!功夫系列高通量DPU卡 CONFLUX®-2200P 全新升级,带宽升 40% IOPS提60%,赋能多业务场景。
  • Spring Boot 3零基础教程,properties文件中配置和类的属性绑定,笔记14
  • 以数据智能重构 OTC 连锁增长逻辑,覆盖网络与合作生态双维赛跑
  • 【推荐100个unity插件】基于节点的程序化无限地图生成器 —— MapMagic 2
  • 71_基于深度学习的布料瑕疵检测识别系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • 工控机做网站服务器网络模块
  • Mac——文件夹压缩的简便方法
  • Playwright自动化实战一
  • 电商网站开发面临的技术问题做seo网站诊断书怎么做
  • 【Qt】QTableWidget 自定义排序功能实现
  • WPF 疑点汇总2.HorizontalAlignment和 HorizontalContentAlignment
  • 【Qt】3.认识 Qt Creator 界面
  • 垂直网站建设付费小说网站怎么做