[muduo] ThreadPool | TcpClient | 异步任务 | 通信测试
第九章:线程池(ThreadPool)
在第八章《TcpServer》中,我们了解到muduo::net::TcpServer
通过EventLoop
线程池处理入站连接。
这些EventLoop
线程主要负责网络I/O:套接字读写和定时器处理,由Poller
和Channel
协调,保持高效响应。
但当
EventLoop
线程中的MessageCallback
函数需要执行耗时操作时会发生什么?
例如:
- 复杂计算
- 数据库访问(可能阻塞等待结果)
- 大文件磁盘写入(可能阻塞)
- 同步网络请求其他服务
若直接在EventLoop
线程执行这些操作,线程将被阻塞,导致该线程管理的所有连接无法处理其他事件,服务器响应能力下降
甚至引发超时。
这正是muduo::ThreadPool
的价值所在。
线程池解决的问题
核心目标是将可能阻塞或耗时的计算任务从关键EventLoop
线程卸载
。
EventLoop
线程必须保持空闲以快速处理网络I/O。
ThreadPool
提供专用线程组执行任意任务。当EventLoop
回调中遇到耗时任务时,将其提交给线程池,由池中的工作线程异步处理,使EventLoop
线程快速返回继续处理网络事件。
类比场景:
EventLoop
线程如同高效的前台接待员,仅处理简短交互(如引导访客、接收邮件)。ThreadPool
则是后勤团队,处理复杂请求(如审核详细申请),前台人员可立即返回岗位继续接待。- (类似于
redis
和mysql
实现分类,还有上一篇文章当中的reactor分离
,也是一个前台+多个处理人员)
ThreadPool:后台任务执行引擎
muduo::ThreadPool
类管理一组待命的工作线程,关键设计要素包括:
- 线程集合:使用
muduo::Thread
类(第二章:线程)创建指定数量的工作线程 - 任务队列:线程安全的双端队列(
std::deque
),通过互斥锁(MutexLock
)和条件变量(Condition
)实现同步 - 工作线程逻辑:每个线程循环执行:取任务→执行→等待新任务
- 任务提交(
run()
):通过run()
方法将函数对象加入队列 - 阻塞等待(
take()
):队列空时,工作线程通过条件变量(notEmpty_
)阻塞休眠 - 唤醒机制:添加新任务时通过
notEmpty_.notify()
唤醒等待线程 - 有界队列(可选):可设置队列最大容量(
maxQueueSize_
),满时run()
可能阻塞(通过notFull_
),防止任务积压 - 启停控制:
start()
启动线程池,stop()
停止并等待线程退出
使用ThreadPool执行异步任务
通过创建实例、启动线程池并调用run()
提交任务即可使用:
#include "muduo/base/ThreadPool.h"
#include "muduo/base/CurrentThread.h"
#include "muduo/base/CountDownLatch.h"
#include <cstdio>
#include <string>
#include <unistd.h>
#include <functional>// 工作线程中运行的函数
void print(const std::string& msg) {printf("线程池任务: %s, 进程ID: %d, 线程ID: %d\n",msg.c_str(), getpid(), muduo::CurrentThread::tid());sleep(1); // 模拟耗时操作
}int main() {printf("主线程启动. 进程ID: %d, 线程ID: %d\n", getpid(), muduo::CurrentThread::tid());muduo::ThreadPool pool("MyWorkerPool"); // 1. 创建线程池pool.setMaxQueueSize(5); // 设置队列最大容量为5pool.start(3); // 2. 启动3个工作线程sleep(1); // 等待线程启动printf("线程池已启动.\n");// 3. 提交10个任务for (int i = 0; i < 10; ++i) {char task_msg[32];snprintf(task_msg, sizeof task_msg, "任务%d", i);pool.run(std::bind(print, std::string(task_msg))); // 提交任务}printf("任务提交完成.\n");sleep(10); // 等待任务执行pool.stop(); // 4. 停止线程池printf("线程池已停止.\n");return 0;
}
(需包含Muduo头文件并链接库)
使用muduo网络库中的线程池组件处理并发任务的基本流程,包含四个关键操作:
创建线程池、启动线程、提交任务、停止线程池。
原理
线程池管理一组工作线程,避免频繁创建销毁线程的开销。
任务提交后进入队列,空闲线程自动获取任务执行。
muduo库的ThreadPool
采用生产者-消费者
模型,主线程提交任务(生产者),工作线程处理任务(消费者)。
解析
初始化阶段
-
muduo::ThreadPool pool("MyWorkerPool")
创建名为"MyWorkerPool"的线程池实例。
pool.setMaxQueueSize(5)
限制任务队列最多容纳5个未处理任务
,防止内存过度消耗。pool.start(3)
启动3个常驻工作线程,这些线程会持续从任务队列获取任务执行。启动后立即进入等待任务状态。
任务提交阶段
-
循环提交10个打印任务,每个任务绑定
print
函数和字符串参数。当任务提交速度超过处理速度时,后提交的任务会因队列满(max=5)而阻塞,直到有线程处理完队列任务。
print
函数显示任务信息、进程PID和线程TID,通过sleep(1)
模拟耗时操作。注意线程ID由muduo::CurrentThread::tid()
获取,这是muduo库提供的跨平台线程标识。
终止阶段
sleep(10)
确保所有任务完成,pool.stop()
安全停止线程池:停止接受新任务,等待正在执行的任务完成,最后回收线程资源。
执行效果说明
程序输出将显示:
- 主线程信息
- 3个工作线程循环执行10个任务(每个任务间隔1秒)
- 由于只有3个线程,10个任务需约4秒完成(3并发+队列等待)
- 最终线程池停止信息
该模式适用于需要批量处理短生命周期任务
的场景,如网络请求处理、日志写入等I/O密集型操作。
运行
理解:货架上有五个空位
,三个三个的往上放
货物
执行流程:
- 主线程创建线程池并启动3个工作线程
- 工作线程启动后立即尝试从空队列取任务,进入阻塞等待
- 主循环提交10个任务,工作线程被唤醒并并发执行
- 输出显示不同线程ID执行任务,主线程快速返回
- 任务队列满时
run()
可能阻塞,直到有空闲 stop()
通知所有线程退出,等待清理
线程池内部机制
核心交互流程:
加锁
到任务队列中,再加锁
的执行任务
关键源码(简化版):
// ThreadPool.h
class ThreadPool : noncopyable {
public:using Task = std::function<void ()>; // 任务类型explicit ThreadPool(const string& name = "ThreadPool");void setMaxQueueSize(int maxSize); // 设置队列容量void start(int numThreads); // 启动线程void stop(); // 停止线程void run(Task f); // 提交任务private:bool isFull() const; // 队列是否满Task take(); // 获取任务void runInThread(); // 工作线程主循环mutable MutexLock mutex_; // 保护队列和状态Condition notEmpty_; // 非空条件变量Condition notFull_; // 非满条件变量std::deque<Task> queue_; // 任务队列size_t maxQueueSize_ = 0; // 队列最大容量bool running_ = false; // 运行状态
};// ThreadPool.cc
void ThreadPool::start(int numThreads) {running_ = true;// 创建numThreads个工作线程threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i) {threads_.emplace_back(new Thread(std::bind(&ThreadPool::runInThread, this)));threads_.back()->start();}
}void ThreadPool::run(Task task) {if (threads_.empty()) {task(); // 无线程时直接执行} else {MutexLockGuard lock(mutex_);while (isFull() && running_) {notFull_.wait(); // 队列满时等待}queue_.push_back(std::move(task));notEmpty_.notify(); // 通知工作线程}
}ThreadPool::Task ThreadPool::take() {MutexLockGuard lock(mutex_);while (queue_.empty() && running_) {notEmpty_.wait(); // 队列空时等待}Task task;if (!queue_.empty()) {task = queue_.front();queue_.pop_front();if (maxQueueSize_ > 0) {notFull_.notify(); // 通知任务提交者}}return task;
}void ThreadPool::runInThread() {while (running_) {Task task(take()); // 循环取任务if (task) task(); // 执行任务}
}
⭕核心功能
任务队列管理
ThreadPool 使用 std::deque<Task>
作为任务队列,通过 maxQueueSize_
控制队列容量。
任务类型为 std::function<void()>
,表示无参数无返回值的可调用对象。
线程工作流程
- 启动时通过
start()
创建多个工作线程,每个线程执行runInThread()
循环。 runInThread()
不断调用take()
获取任务并执行。若队列为空,线程在notEmpty_
条件变量上等待。take()
从队列头部取出任务,若队列从满变为非满,通知notFull_
唤醒可能阻塞的任务提交者。
任务提交逻辑
run()
提交任务时,若队列已满且线程池在运行,提交线程在notFull_
上等待。- 任务入队后通过
notEmpty_.notify()
唤醒一个工作线程。 - 若线程池未启动或无工作线程,任务直接在当前线程执行。
同步机制
MutexLock
保护队列和状态变量(running_
)。- 两个
Condition
变量(notEmpty_
、notFull_
)实现生产者-消费者模型,避免忙等待。
代码设计简洁高效,适合处理大量短期任务,典型应用如网络服务器的请求处理。
总结
功能 | 描述 | 作用/优势 |
---|---|---|
工作线程 | 基于muduo::Thread 的线程集合 | 提供独立于I/O线程的任务执行环境 |
任务队列 | 线程安全的双端队列存储std::function<void () 类型任务 | 解耦任务提交与执行,支持异步处理 |
run()方法 | 提交任务到队列,队列满时可能阻塞 | 用户接口,确保任务安全加入队列 |
take()方法 | 工作线程从队列取任务,队列空时阻塞 | 核心任务获取机制,保证工作线程高效等待 |
runInThread() | 工作线程主循环,执行take() 和任务 | 定义工作线程行为,维持任务处理循环 |
互斥锁 | 保护队列和运行状态 | 确保多线程访问共享数据的正确性 |
条件变量 | notEmpty_ 和notFull_ 协调线程间通信 | 实现高效的任务调度与资源管理 |
有界队列 | 通过maxQueueSize_ 限制队列容量 | 防止任务堆积导致内存溢出,提供背压机制 |
结论
muduo::ThreadPool
是Muduo库中处理异步任务的核心组件,通过线程安全的任务队列和工作线程池,有效隔离耗时操作与网络I/O处理。其价值体现在:
- 资源隔离:保护
EventLoop
线程免受阻塞,确保网络高吞吐 - 弹性扩展:通过线程数配置适应不同计算负载
- 流量控制:有界队列防止资源耗尽
- 简化并发:
封装底层线程同步细节
,提供简洁API
下一章我们将探讨TcpClient
类,了解如何主动连接远程服务并管理TCP连接。
第十章:TcpClient
第10章:TcpClient
-
在上一章第9章:线程池中,我们讨论了
muduo::ThreadPool
如何通过卸载阻塞任务来保持EventLoop
线程的响应能力。 -
在此之前,我们探讨了
TcpServer
(第8章:TcpServer),它允许使用EventLoop
线程池构建监听并管理
多个传入TCP连接的服务器。
但如果想编写主动连接到远程服务器的程序(而不是等待连接)呢?
这就是网络客户端的作用。
-
需要一种方法来连接到特定地址和端口,处理连接建立的
异步性
,管理连接后的数据交换,并可能需要处理连接失败和重试。 -
手动实现这些需要创建套接字、设置为
非阻塞模式
、发起connect(2)
系统调用(在非阻塞模式下会立即返回但稍后完成连接)、使用EventLoop
和Channel
监视套接字的可写性(表示连接完成)或错误,然后管理已连接的套接字
。
这非常复杂,尤其是在需要自动重连
等功能时。
这正是muduo::net::TcpClient
要解决的问题。
TcpClient 解决了什么问题?
muduo::net::TcpClient
为TCP连接的客户端提供了高级抽象。它处理以下完整流程:
- 发起连接:创建非阻塞套接字并调用
connect()
- 监控连接状态:使用
EventLoop
等待连接成功建立或失败 - 管理连接:连接成功后,创建并管理单个
TcpConnection
对象(第6章:TcpConnection)来处理数据传输(发送和接收),使用其EventLoop
和Buffer
(第7章:Buffer) - 处理断开连接:在连接关闭时通知
- 自动重试(可选):如果启用,在初始连接失败或连接丢失时自动尝试重连
将
TcpClient
想象成一位专门代表,其职责是呼叫
另一个办公室(TcpServer
)并建立单条通信线路。
它处理拨号、等待对方接听以及通话接通后的线路管理。
如果线路忙或通话中断,可以配置自动重拨。
TcpClient:连接发起者
muduo::net::TcpClient
是在Muduo中创建TCP客户端应用程序的主要类。
以下是muduo::net::TcpClient
的关键概念:
- 建立单一连接:与
TcpServer
不同,TcpClient
对象设计用于建立和管理到一个特定远程服务器地址的连接,不处理多个传入连接 - 由单个EventLoop拥有:
TcpClient
对象必须在单个EventLoop
线程中存在和使用,通常是客户端的主循环。所有回调和内部操作都在此循环线程中发生 - 拥有Connector:内部使用
Connector
对象,该组件负责异步connect()
系统调用、监视套接字通道的连接完成/失败,并实现重试逻辑 - 拥有TcpConnection(连接时):当
Connector
成功建立连接后,TcpClient
会为新套接字创建TcpConnection
对象,所有数据I/O都通过该对象进行 - 使用相同回调:与
TcpServer
类似,通过熟悉的ConnectionCallback
、MessageCallback
和WriteCompleteCallback
暴露连接生命周期和数据事件 - 连接/断开/停止控制:提供
connect()
、disconnect()
、stop()
方法控制客户端状态 - 自动重试:
enableRetry()
方法允许在连接失败或丢失时启用自动重连
使用TcpClient:连接到回声服务器
让我们编写一个连接到第8章:TcpServer中构建的回声服务器的简单客户端。
该客户端将连接、发送消息、接收回声并断开连接。
需要:
- 客户端的
EventLoop
- 服务器地址的
InetAddress
- 回调函数
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/InetAddress.h"
#include "muduo/base/Logging.h" // 用于LOG_INFO
#include <cstdio> // 用于printf
#include <string> // 用于std::string// 我们*单条连接*的TcpConnectionPtr(shared_ptr指向TcpConnection)
muduo::net::TcpConnectionPtr clientConnection;// 示例连接回调
void onConnection(const muduo::net::TcpConnectionPtr& conn) {LOG_INFO << "客户端连接 " << (conn->connected() ? "建立" : "断开");if (conn->connected()) {// 连接建立时存储连接指针clientConnection = conn;LOG_INFO << "已连接到 " << conn->peerAddress().toIpPort();// 发送初始消息std::string message = "你好,来自客户端!\n";printf("发送: %s", message.c_str());conn->send(message);} else {// 连接断开,清空存储的指针clientConnection.reset();LOG_INFO << "已断开与 " << conn->peerAddress().toIpPort() << " 的连接";// 实际应用中可在此处调用loop.quit()或处理重连conn->getLoop()->quit(); // 简单示例:断开时退出循环}
}// 示例消息回调(处理回声数据)
void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp receiveTime) {LOG_INFO << "从 " << conn->name() << " 接收到 " << buf->readableBytes() << " 字节";// 将回声数据转为字符串std::string msg = buf->retrieveAllAsString();printf("收到回声消息: %s", msg.c_str());// 简单示例:收到回声后发送下一条消息// 实际协议中应处理消息并决定是否响应// std::string nextMessage = "另一条消息...\n";// conn->send(nextMessage); // 按需持续发送
}int main() {LOG_INFO << "main(): 进程ID = " << getpid();muduo::net::EventLoop loop; // 客户端的事件循环// 服务器地址:localhost (127.0.0.1),端口9988muduo::net::InetAddress serverAddr("127.0.0.1", 9988);// 创建TcpClient实例muduo::net::TcpClient client(&loop, serverAddr, "EchoClient");// --- 配置客户端 ---// 设置回调client.setConnectionCallback(onConnection);client.setMessageCallback(onMessage);// 可选:启用连接失败/断开自动重连// client.enableRetry();// LOG_INFO << "已启用重连";// --- 启动连接过程 ---printf("启动客户端,正在连接到 %s...\n", serverAddr.toIpPort().c_str());client.connect();// --- 运行客户端循环 ---printf("运行客户端循环...\n");loop.loop(); // 阻塞直到调用loop.quit()// loop.loop()在连接关闭且onConnection调用loop->quit()后退出printf("客户端已停止。main()退出。\n");return 0;
}
(需要包含必要的Muduo头文件并链接库)
这段代码是使用muduo网络库(一个高性能C++网络库)实现的TCP客户端程序,负责连接到指定服务器
,发送初始消息 并接收服务器返回的数据(回声)。
关键组件说明
必要头文件
TcpClient.h
:客户端核心类EventLoop.h
:事件循环处理类InetAddress.h
:网络地址封装类Logging.h
:日志输出工具
主要执行流程
全局连接对象
muduo::net::TcpConnectionPtr clientConnection; // 保存当前有效连接
连接状态回调
void onConnection(const muduo::net::TcpConnectionPtr& conn) {// 连接建立时:// 1. 存储连接对象// 2. 发送初始问候消息// 连接断开时:// 1. 清除连接对象// 2. 退出事件循环
}
数据接收回调
void onMessage(/*参数略*/) {// 1. 读取接收缓冲区数据// 2. 打印接收内容(示例为回声服务)// 3. 可扩展发送后续消息
}
主程序逻辑
int main() {// 1. 创建事件循环对象// 2. 指定服务器地址(127.0.0.1:9988)// 3. 创建客户端实例// 4. 绑定回调函数// 5. 发起连接请求// 6. 运行事件循环(持续处理网络事件)
}
运行
- 客户端启动后自动连接服务器
- 连接成功后立即发送"你好,来自客户端!"
- 接收服务器返回的相同消息(回声)
- 断开连接时自动退出程序
注:实际使用时需配合对应的回声服务器运行,示例默认使用本地127.0.0.1:9988
main()
关键部分解析:
muduo::net::EventLoop loop;
:创建客户端的单一事件循环muduo::net::InetAddress serverAddr("127.0.0.1", 9988);
:指定服务器地址muduo::net::TcpClient client(...);
:创建关联事件循环和服务器地址的客户端实例client.setConnectionCallback(...);
:设置连接状态变更回调client.connect();
:启动异步连接过程loop.loop();
:启动事件循环
TcpClient内部机制:Connector与状态管理
TcpClient
依赖内部Connector
对象管理连接建立阶段和重试,同时管理状态标志和用于保存连接的shared_ptr
。
调用client.connect()
时的流程:
核心源码解析
TcpClient.h
关键成员(详见附带的代码):
class TcpClient : noncopyable {public:// 构造函数TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);~TcpClient();void connect(); // 启动连接void disconnect(); // 优雅断开void stop(); // 停止连接和重试// 设置用户回调void setConnectionCallback(ConnectionCallback cb);void setMessageCallback(MessageCallback cb);private:EventLoop* loop_;ConnectorPtr connector_; // 连接处理器TcpConnectionPtr connection_; // 当前连接// ... 其他成员
};
Connector
类(详见附带的Connector.h/cc
)关键方法:
start()
:由TcpClient::connect()
调用,加入事件循环队列handleWrite()
:处理连接完成事件retry()
:实现带退避策略的重连机制
总结
功能 | 描述 | 优势 |
---|---|---|
连接发起器 | 启动与远程服务器的连接过程 | 提供高级连接建立抽象 |
EventLoop绑定 | 完全在单个事件循环线程内运行 | 简化并发控制 |
Connector组件 | 处理异步connect 和重试逻辑 | 封装复杂的状态管理 |
单一连接管理 | 通过TcpConnection 管理活动连接 | 简化客户端连接管理逻辑 |
标准回调接口 | 使用与TcpServer 相同的回调机制 | 统一编程模型 |
自动重试机制 | 支持可配置的退避重试策略 | 提升客户端容错能力 |
结论
muduo::net::TcpClient
是构建Muduo客户端应用的核心抽象。
-
通过内部
Connector
封装异步连接建立的复杂性,并通过标准TcpConnection
对象管理通信。 -
通过回调机制定义连接生命周期行为,配合可选的重试功能,可轻松构建高可靠的网络客户端。
-
其
单线程
设计模型与TcpServer
形成完美互补,共同构成Muduo网络库的核心架构。
完结撒花~
test code: https://github.com/lvy010/Common-C-_Lib/tree/main/test