当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列(3)(详解使用muduo库)

C++ - 仿 RabbitMQ 实现消息队列(3)(详解使用muduo库)

  • muduo库的基层原理
        • 核心概念总结:
        • 通俗例子:餐厅模型
        • 优势体现
        • 典型场景
  • muduo库中的主要类
    • Eventloop
      • Muduo 的 `EventLoop` 核心解析
        • 1. 核心机制:事件循环(Reactor 模式)
        • 2. 线程绑定:One Loop Per Thread
        • 3. 跨线程任务调度
        • 4. 定时器功能
        • 5. 唤醒机制
        • 6. Channel 管理
      • 核心设计思想
    • TcpServer
      • Muduo 的 `TcpServer` 核心解析
    • 1. 核心机制:主从 Reactor 模型
      • (1)`Acceptor` 处理新连接
      • (2)`EventLoopThreadPool` 线程池管理
    • 2. 连接管理
      • (1)`TcpConnection` 封装 TCP 连接
      • (2)连接移除
    • 3. 服务器控制
      • (1)启动服务器
      • (2)`ReusePort` 支持
    • 1. 三大核心回调函数
      • (1)`setConnectionCallback` —— 连接建立/关闭回调
      • (2)`setMessageCallback` —— 数据到达回调(最常用!)
      • (3)`setWriteCompleteCallback` —— 数据发送完成回调
    • 2. 回调函数的底层机制
      • (1)回调存
      • (2)回调触发流程
    • 3. 完整示例代码
  • 服务器的大致结构
    • TcpClient
    • 1. 核心功能
    • 2. 关键成员变量
    • 3. 连接生命周期管理
      • (1) 连接建立流程
      • (2) 连接断开处理
    • 4. 回调函数
    • 5. 线程安全性
    • 6. 断线重连机制
    • 7. 典型使用场景
      • (1) 简单客户端
      • (2) 带重连的客户端
  • Buffer
        • 1. 核心回调类型总览
  • 客户端大致结构
      • 为什么使用 `EventLoopThread` 而不是 `EventLoop`?
        • 1. `EventLoopThread` 的核心作用
        • 2. 与直接使用 `EventLoop` 的关键区别
        • 3. 在 `TranslateClient` 中的必要性
      • `muduo::CountDownLatch _latch` 的作用
        • 1. `CountDownLatch` 的核心功能
        • 2. 在 `TranslateClient` 中的用途
        • 3. 典型工作流程

我们前面简单介绍了一下protobuf和muduo库,对他们有了一个基本的了解,如果还不熟悉的小伙伴可以点击这里:

https://blog.csdn.net/qq_67693066/article/details/147979379?spm=1011.2415.3001.5331

我们今天的任务就是对muduo库进行比较细致的了解,然后搭建一个简单的翻译服务器。

muduo库的基层原理

muduo库的基层是基于主从Reactor模型的网络库
在这里插入图片描述

核心概念总结:
  1. 主从 Reactor 模型

    • 主 Reactor(通常由 main EventLoop 负责):监听新连接(accept),将新连接分发给从 Reactor
    • 从 Reactor(每个线程一个 EventLoop):负责已建立连接的读写事件(read/write)和定时任务。
  2. One Loop Per Thread

    • 每个线程独立运行一个事件循环(EventLoop),处理自己的 IO 和定时事件。
    • 一个 TCP 连接从建立到销毁,全程由同一个线程管理,避免多线程竞争。
  3. 非阻塞 IO + 事件驱动

    • 通过 epoll(Linux)监听文件描述符(FD)事件,数据到来时触发回调,不阻塞线程。

通俗例子:餐厅模型

想象一个高并发的餐厅(服务器),采用 Muduo 的工作模式:

  1. 主 Reactor(前台经理)

    • 专职站在门口接待新顾客(accept 新连接)。
    • 每来一个新顾客,经理分配一个专属服务员(从 Reactor 线程)全程服务。
  2. 从 Reactor(专属服务员)

    • 每个服务员(线程)负责固定几桌顾客(TCP 连接),全程处理点菜、上菜、结账(read/write)。
    • 服务员在自己的工作区(EventLoop)循环检查负责的餐桌是否有需求(事件驱动)。
    • 如果某桌顾客长时间不点菜(空闲连接),服务员会主动检查(定时任务)。
  3. 为什么不用多服务员服务一桌?

    • 避免两个服务员同时给同一桌上菜时撞翻盘子(多线程竞争 FD)。
    • 专属服务员更熟悉顾客的需求(连接状态管理更简单)。

优势体现
  • 高并发:前台经理快速分配,每个服务员专注自己的餐桌,不互相干扰。
  • 低延迟:服务员非阻塞工作,没菜上时就去做其他事(如清理餐具)。
  • 线程安全:每桌数据由专属服务员处理,无需加锁。
典型场景
  • 聊天服务器:每个用户连接固定由一个线程处理消息。
  • 游戏服务器:玩家 TCP 连接的读写和逻辑在同一线程中完成。

Muduo 的设计,简单来说就是开了一家餐馆,门口会有一个负责揽客的(主Reactor),把客人招进来之后,会有专门的服务员(从Reactor)对客人进行服务。

muduo库中的主要类

Eventloop

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H#include <atomic>
#include <functional>
#include <vector>#include <boost/any.hpp>#include "muduo/base/Mutex.h"
#include "muduo/base/CurrentThread.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/TimerId.h"namespace muduo
{
namespace net
{class Channel;
class Poller;
class TimerQueue;///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : noncopyable
{public:typedef std::function<void()> Functor;EventLoop();~EventLoop();  // force out-line dtor, for std::unique_ptr members.////// Loops forever.////// Must be called in the same thread as creation of the object.///void loop();/// Quits loop.////// This is not 100% thread safe, if you call through a raw pointer,/// better to call through shared_ptr<EventLoop> for 100% safety.void quit();////// Time when poll returns, usually means data arrival.///Timestamp pollReturnTime() const { return pollReturnTime_; }int64_t iteration() const { return iteration_; }/// Runs callback immediately in the loop thread./// It wakes up the loop, and run the cb./// If in the same loop thread, cb is run within the function./// Safe to call from other threads.void runInLoop(Functor cb);/// Queues callback in the loop thread./// Runs after finish pooling./// Safe to call from other threads.void queueInLoop(Functor cb);size_t queueSize() const;// timers////// Runs callback at 'time'./// Safe to call from other threads.///TimerId runAt(Timestamp time, TimerCallback cb);////// Runs callback after @c delay seconds./// Safe to call from other threads.///TimerId runAfter(double delay, TimerCallback cb);////// Runs callback every @c interval seconds./// Safe to call from other threads.///TimerId runEvery(double interval, TimerCallback cb);////// Cancels the timer./// Safe to call from other threads.///void cancel(TimerId timerId);// internal usagevoid wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);// pid_t threadId() const { return threadId_; }void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }// bool callingPendingFunctors() const { return callingPendingFunctors_; }bool eventHandling() const { return eventHandling_; }void setContext(const boost::any& context){ context_ = context; }const boost::any& getContext() const{ return context_; }boost::any* getMutableContext(){ return &context_; }static EventLoop* getEventLoopOfCurrentThread();private:void abortNotInLoopThread();void handleRead();  // waked upvoid doPendingFunctors();void printActiveChannels() const; // DEBUGtypedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;const pid_t threadId_;Timestamp pollReturnTime_;std::unique_ptr<Poller> poller_;std::unique_ptr<TimerQueue> timerQueue_;int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variablesChannelList activeChannels_;Channel* currentActiveChannel_;mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};}  // namespace net
}  // namespace muduo#endif  // MUDUO_NET_EVENTLOOP_H

Muduo 的 EventLoop 核心解析

EventLoop 是 Muduo 网络库的核心,实现了 Reactor 模式的事件循环,采用 one loop per thread 模型。以下是其最核心的部分:


1. 核心机制:事件循环(Reactor 模式)
  • loop() 方法
    核心事件循环,通过 Poller(底层用 epoll/poll)监听文件描述符(FD)事件,触发回调。
    void loop();  // 永不退出,直到调用 quit()
    
  • quit() 方法
    安全退出事件循环(通过原子变量 quit_ 控制)。
    std::atomic<bool> quit_;  // 线程安全标志
    

2. 线程绑定:One Loop Per Thread
  • 每个 EventLoop 仅属于一个线程
    const pid_t threadId_;  // 创建 EventLoop 的线程 ID
    bool isInLoopThread() const;  // 检查当前线程是否属于该 EventLoop
    
    • 通过 assertInLoopThread() 确保线程安全,禁止跨线程操作。

3. 跨线程任务调度
  • runInLoop(Functor cb)
    立即在 EventLoop 所在线程执行回调(如果当前线程是 EventLoop 线程,直接执行;否则唤醒 EventLoop 并排队)。
  • queueInLoop(Functor cb)
    将回调函数加入任务队列(通过 pendingFunctors_),由事件循环下次迭代时执行。
    std::vector<Functor> pendingFunctors_;  // 待执行任务队列
    MutexLock mutex_;  // 保护任务队列的互斥锁
    

4. 定时器功能
  • TimerQueue 定时器管理
    通过 runAtrunAfterrunEvery 注册定时任务,底层用 timerfd 或时间堆实现。
    std::unique_ptr<TimerQueue> timerQueue_;  // 定时器队列
    TimerId runAt(Timestamp time, TimerCallback cb);  // 在指定时间触发回调
    

5. 唤醒机制
  • wakeup() 方法
    通过 eventfd(或管道)唤醒阻塞在 epoll 上的 EventLoop,用于处理跨线程任务或立即退出。
    int wakeupFd_;  // 用于唤醒的文件描述符
    std::unique_ptr<Channel> wakeupChannel_;  // 封装 wakeupFd_ 的 Channel
    

6. Channel 管理
  • Channel 是事件处理的封装
    每个 FD 对应一个 Channel,注册读写事件回调。
    void updateChannel(Channel* channel);  // 更新监听事件
    void removeChannel(Channel* channel);  // 移除监听
    

核心设计思想

  1. 线程隔离

    • 一个 EventLoop 仅由一个线程操作,避免锁竞争。
    • 跨线程调用通过 runInLoop/queueInLoop 安全派发任务。
  2. 事件驱动

    • 所有 IO 和定时任务均由 Poller 监听,回调在事件循环中触发。
  3. 高效唤醒

    • 通过 wakeupFd_ 打破 epoll 阻塞,及时处理新任务。

TcpServer

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H#include "muduo/base/Atomic.h"
#include "muduo/base/Types.h"
#include "muduo/net/TcpConnection.h"#include <map>namespace muduo
{
namespace net
{class Acceptor;
class EventLoop;
class EventLoopThreadPool;///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : noncopyable
{public:typedef std::function<void(EventLoop*)> ThreadInitCallback;enum Option{kNoReusePort,kReusePort,};//TcpServer(EventLoop* loop, const InetAddress& listenAddr);TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option = kNoReusePort);~TcpServer();  // force out-line dtor, for std::unique_ptr members.const string& ipPort() const { return ipPort_; }const string& name() const { return name_; }EventLoop* getLoop() const { return loop_; }/// Set the number of threads for handling input.////// Always accepts new connection in loop's thread./// Must be called before @c start/// @param numThreads/// - 0 means all I/O in loop's thread, no thread will created.///   this is the default value./// - 1 means all I/O in another thread./// - N means a thread pool with N threads, new connections///   are assigned on a round-robin basis.void setThreadNum(int numThreads);void setThreadInitCallback(const ThreadInitCallback& cb){ threadInitCallback_ = cb; }/// valid after calling start()std::shared_ptr<EventLoopThreadPool> threadPool(){ return threadPool_; }/// Starts the server if it's not listening.////// It's harmless to call it multiple times./// Thread safe.void start();/// Set connection callback./// Not thread safe.void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }/// Set message callback./// Not thread safe.void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }/// Set write complete callback./// Not thread safe.void setWriteCompleteCallback(const WriteCompleteCallback& cb){ writeCompleteCallback_ = cb; }private:/// Not thread safe, but in loopvoid newConnection(int sockfd, const InetAddress& peerAddr);/// Thread safe.void removeConnection(const TcpConnectionPtr& conn);/// Not thread safe, but in loopvoid removeConnectionInLoop(const TcpConnectionPtr& conn);typedef std::map<string, TcpConnectionPtr> ConnectionMap;EventLoop* loop_;  // the acceptor loopconst string ipPort_;const string name_;std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptorstd::shared_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;ThreadInitCallback threadInitCallback_;AtomicInt32 started_;// always in loop threadint nextConnId_;ConnectionMap connections_;
};}  // namespace net
}  // namespace muduo#endif  // MUDUO_NET_TCPSERVER_H

Muduo 的 TcpServer 核心解析

TcpServer 是 Muduo 网络库的核心类,用于构建 TCP 服务器,支持 单线程线程池 两种模型。以下是其最核心的部分:


1. 核心机制:主从 Reactor 模型

(1)Acceptor 处理新连接

  • Acceptor
    负责监听新连接(listen + accept),通过 Channel 封装 listen_fd,将新连接分发给 EventLoop
    std::unique_ptr<Acceptor> acceptor_;  // 监听新连接的组件
    
  • newConnection 回调
    当新连接到达时,Acceptor 调用 newConnection(),创建 TcpConnection 并分配 IO 线程。
    void newConnection(int sockfd, const InetAddress& peerAddr);  // 在新连接到达时调用
    

(2)EventLoopThreadPool 线程池管理

  • threadPool_
    管理多个 EventLoop 线程(从 Reactor),用于处理已建立的连接。
    std::shared_ptr<EventLoopThreadPool> threadPool_;  // IO 线程池
    
  • 线程数量设置
    • setThreadNum(0):所有连接由主线程(acceptor_loop)处理(单线程)。
    • setThreadNum(1):单独一个 IO 线程处理所有连接。
    • setThreadNum(N):线程池模式,轮询分配连接。
    void setThreadNum(int numThreads);  // 必须在 start() 前调用
    

2. 连接管理

(1)TcpConnection 封装 TCP 连接

  • ConnectionMap
    存储所有活跃的 TcpConnection,以 name 为键。
    typedef std::map<string, TcpConnectionPtr> ConnectionMap;
    ConnectionMap connections_;  // 当前所有连接
    
  • 连接生命周期回调
    • connectionCallback_:连接建立/关闭时触发。
    • messageCallback_:收到数据时触发。
    • writeCompleteCallback_:数据发送完成时触发。
    ConnectionCallback connectionCallback_;   // 连接状态变化回调
    MessageCallback messageCallback_;         // 数据到达回调
    WriteCompleteCallback writeCompleteCallback_;  // 数据发送完成回调
    

(2)连接移除

  • removeConnection
    线程安全地移除连接(可能跨线程调用)。
    void removeConnection(const TcpConnectionPtr& conn);  // 线程安全
    
  • removeConnectionInLoop
    在正确的 EventLoop 线程中销毁连接。
    void removeConnectionInLoop(const TcpConnectionPtr& conn);  // 必须在 IO 线程执行
    

3. 服务器控制

(1)启动服务器

  • start()
    开始监听端口,启动 Acceptor
    void start();  // 启动服务器(线程安全)
    
  • started_ 原子标志
    防止重复启动。
    AtomicInt32 started_;  // 是否已启动
    

(2)ReusePort 支持

  • Option 选项
    支持 SO_REUSEPORT,允许多个进程/线程绑定相同端口(提高 accept 性能)。
    enum Option { kNoReusePort, kReusePort };
    TcpServer(..., Option option = kNoReusePort);  // 构造函数选项
    

TcpServer中最重要的是三个回调函数,用来处理不同情况下的消息处理:


1. 三大核心回调函数

(1)setConnectionCallback —— 连接建立/关闭回调

  • 触发时机
    • 当新连接建立时(onConnection)。
    • 当连接关闭时(onClose)。
  • 典型用途
    • 记录连接日志。
    • 管理连接状态(如用户上线/下线)。
  • 示例
    server.setConnectionCallback([](const TcpConnectionPtr& conn) {if (conn->connected()) {LOG_INFO << "New connection: " << conn->peerAddress().toIpPort();} else {LOG_INFO << "Connection closed: " << conn->peerAddress().toIpPort();}
    });
    

(2)setMessageCallback —— 数据到达回调(最常用!)

  • 触发时机
    • 当收到对端发送的数据时(onMessage)。
  • 典型用途
    • 解析协议(如 HTTP、Redis 命令)。
    • 业务逻辑处理(如聊天消息转发)。
  • 示例
    server.setMessageCallback([](const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) {// 从 buf 中读取数据string msg = buf->retrieveAllAsString();LOG_INFO << "Received " << msg.size() << " bytes: " << msg;conn->send(msg); // 回显数据
    });
    

(3)setWriteCompleteCallback —— 数据发送完成回调

  • 触发时机
    • 当数据全部写入内核缓冲区(send 完成)时。
  • 典型用途
    • 流量控制(如高水位回调配合使用)。
    • 发送完成后的日志记录。
  • 示例
    server.setWriteCompleteCallback([](const TcpConnectionPtr& conn) {LOG_INFO << "Data sent to " << conn->peerAddress().toIpPort();
    });
    

2. 回调函数的底层机制

(1)回调存

TcpServer 中,这三个回调通过成员变量保存:

ConnectionCallback connectionCallback_;     // 连接回调
MessageCallback messageCallback_;         // 数据回调
WriteCompleteCallback writeCompleteCallback_; // 发送完成回调

(2)回调触发流程

  1. 连接建立
    • Acceptor 接受新连接 → 创建 TcpConnection → 调用 connectionCallback_
  2. 数据到达
    • EventLoop 监听到 sockfd 可读 → TcpConnection::handleRead() → 调用 messageCallback_
  3. 数据发送完成
    • TcpConnection::sendInLoop() 完成写入 → 调用 writeCompleteCallback_

3. 完整示例代码

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>using namespace muduo;
using namespace muduo::net;int main() {EventLoop loop;InetAddress listenAddr(8888);TcpServer server(&loop, listenAddr, "EchoServer");// 设置回调server.setConnectionCallback([](const TcpConnectionPtr& conn) {if (conn->connected()) {LOG_INFO << "New connection from " << conn->peerAddress().toIpPort();} else {LOG_INFO << "Connection closed: " << conn->peerAddress().toIpPort();}});server.setMessageCallback([](const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) {string msg = buf->retrieveAllAsString();LOG_INFO << "Received: " << msg;conn->send(msg); // 回显});server.setThreadNum(4); // 4个IO线程server.start();loop.loop(); // 启动事件循环
}

服务器的大致结构

如果我们想要通过muduo库来搭建一个简单的服务器,大概的框架就是这样的:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>class TranslateServer{public:TranslateServer(int port):_sever(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"TranslateSever",muduo::net::TcpServer::kNoReusePort){}//开始运行void start(){}private:void onConnection(const muduo::net::TcpConnectionPtr& conn){//新建链接时的回调函数}void onMessage(const TcpConnectionPtr& conn,Buffer*buffer,muduo::Timestamp){//收到消息时的回调函数}private:muduo::net::EventLoop _baseloop; //基本事件循环muduo::net::TcpServer _server; //翻译服务器
};int main()
{TranslateServer server(8085);server.start(); //开始运行
}

添加完整应该是这个样子:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <iostream>
#include <functional>
#include <unordered_map>class TranslateServer
{
public:TranslateServer(int port): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"TranslateSever", muduo::net::TcpServer::kNoReusePort){// 绑定std::bind// 类的成员设定为服务器的回调处理函数_server.setConnectionCallback(std::bind(&TranslateServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&TranslateServer::onMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));}// 开始运行void start(){_server.start();  // 开始事件监听_baseloop.loop(); // 开启事件监控}private:void onConnection(const muduo::net::TcpConnectionPtr &conn){// 新建链接时的回调函数if (conn->connected() == true){std::cout << "新连接建立成功" << std::endl;}else{std::cout << "新连接关闭" << std::endl;}}std::string translate(const std::string &str){static const std::unordered_map<std::string, std::string> dict_map = {{"Hello", "你好"},{"hello", "你好"},{"你好", "Hello"},{"hi", "嗨"} // 扩展示例};auto it = dict_map.find(str);if (it == dict_map.end()){std::cout << "未识别的输入: " << str << std::endl;return "未识别的输入"; // 必须返回默认值!}return it->second;}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp){// 收到消息时的回调函数// 1.从buffer中把请求的数据拿出来std::string str = buffer->retrieveAllAsString();// 2.调用接口std::string resp = translate(str);// 3.对客户端进行响应conn->send(resp);}private:muduo::net::EventLoop _baseloop; // 基本事件循环muduo::net::TcpServer _server;   // 翻译服务器
};int main()
{TranslateServer server(8085);server.start(); // 开始运行
}

TcpClient

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H#include "muduo/base/Mutex.h"
#include "muduo/net/TcpConnection.h"namespace muduo
{
namespace net
{class Connector;
typedef std::shared_ptr<Connector> ConnectorPtr;class TcpClient : noncopyable
{public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop* loop,const InetAddress& serverAddr,const string& nameArg);~TcpClient();  // force out-line dtor, for std::unique_ptr members.void connect();void disconnect();void stop();TcpConnectionPtr connection() const{MutexLockGuard lock(mutex_);return connection_;}EventLoop* getLoop() const { return loop_; }bool retry() const { return retry_; }void enableRetry() { retry_ = true; }const string& name() const{ return name_; }/// Set connection callback./// Not thread safe.void setConnectionCallback(ConnectionCallback cb){ connectionCallback_ = std::move(cb); }/// Set message callback./// Not thread safe.void setMessageCallback(MessageCallback cb){ messageCallback_ = std::move(cb); }/// Set write complete callback./// Not thread safe.void setWriteCompleteCallback(WriteCompleteCallback cb){ writeCompleteCallback_ = std::move(cb); }private:/// Not thread safe, but in loopvoid newConnection(int sockfd);/// Not thread safe, but in loopvoid removeConnection(const TcpConnectionPtr& conn);EventLoop* loop_;ConnectorPtr connector_; // avoid revealing Connectorconst string name_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;bool retry_;   // atomicbool connect_; // atomic// always in loop threadint nextConnId_;mutable MutexLock mutex_;TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};}  // namespace net
}  // namespace muduo#endif  // MUDUO_NET_TCPCLIENT_H

1. 核心功能

方法作用
connect()发起连接(非阻塞,异步完成)
disconnect()断开当前连接
stop()停止客户端(不再重连)
connection()获取当前连接的 TcpConnectionPtr(线程安全)
setXXXCallback()设置连接、消息、写完成的回调函数

2. 关键成员变量

变量作用
connector_负责实际连接操作的 Connector 对象(内部用 socket + non-blocking connect
connection_当前活跃的 TcpConnection 对象(受互斥锁保护)
retry_是否启用断线自动重连(默认关闭)
nextConnId_为每个连接分配唯一 ID(用于日志跟踪)

3. 连接生命周期管理

(1) 连接建立流程

TcpClient client(loop, serverAddr, "Client1");
client.setConnectionCallback(onConnection);
client.setMessageCallback(onMessage);
client.connect();  // 触发连接
  1. connect()
    • 调用 connector_->start(),开始异步连接(非阻塞)。
  2. newConnection(int sockfd)(回调)
    • 连接成功后,创建 TcpConnection 对象。
    • 设置用户回调(connectionCallback_messageCallback_)。
  3. 连接就绪
    • 通过 connectionCallback_ 通知用户。

(2) 连接断开处理

void removeConnection(const TcpConnectionPtr& conn);
  • 当连接关闭时,由 TcpConnection 回调触发。
  • 如果 retry_=true,会自动重新发起连接。

4. 回调函数

回调类型触发时机典型用途
ConnectionCallback连接建立或关闭时记录日志、状态管理
MessageCallback收到数据时(Buffer* 包含数据)解析协议、业务处理
WriteCompleteCallback数据完全写入内核缓冲区时流量控制、发送完成通知

示例:设置回调

client.setMessageCallback([](const TcpConnectionPtr& conn, Buffer* buf, Timestamp) {std::string msg = buf->retrieveAllAsString();LOG_INFO << "Received: " << msg;
});

5. 线程安全性

  • connection() 方法通过 mutex_ 保证线程安全。
  • 回调函数 的执行始终在 EventLoop 绑定的线程中(无竞态条件)。
  • connect()/disconnect() 可在任意线程调用,但实际操作会派发到 EventLoop 线程。

6. 断线重连机制

  • 默认关闭:需调用 enableRetry() 启用。
  • 重试逻辑:由 Connector 实现,采用指数退避策略(避免频繁重连)。

7. 典型使用场景

(1) 简单客户端

EventLoop loop;
InetAddress serverAddr("127.0.0.1", 8888);
TcpClient client(&loop, serverAddr, "DemoClient");client.setConnectionCallback(onConnection);
client.setMessageCallback(onMessage);
client.connect();loop.loop();  // 启动事件循环

(2) 带重连的客户端

client.enableRetry();  // 启用断线重连
client.connect();

Buffer

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_CALLBACKS_H
#define MUDUO_NET_CALLBACKS_H#include "muduo/base/Timestamp.h"#include <functional>
#include <memory>namespace muduo
{using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;// should really belong to base/Types.h, but <memory> is not included there.template<typename T>
inline T* get_pointer(const std::shared_ptr<T>& ptr)
{return ptr.get();
}template<typename T>
inline T* get_pointer(const std::unique_ptr<T>& ptr)
{return ptr.get();
}// Adapted from google-protobuf stubs/common.h
// see License in muduo/base/Types.h
template<typename To, typename From>
inline ::std::shared_ptr<To> down_pointer_cast(const ::std::shared_ptr<From>& f) {if (false){implicit_cast<From*, To*>(0);}#ifndef NDEBUGassert(f == NULL || dynamic_cast<To*>(get_pointer(f)) != NULL);
#endifreturn ::std::static_pointer_cast<To>(f);
}namespace net
{// All client visible callbacks go here.class Buffer;
class TcpConnection;
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void()> TimerCallback;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&)> CloseCallback;
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;// the data has been read to (buf, len)
typedef std::function<void (const TcpConnectionPtr&,Buffer*,Timestamp)> MessageCallback;void defaultConnectionCallback(const TcpConnectionPtr& conn);
void defaultMessageCallback(const TcpConnectionPtr& conn,Buffer* buffer,Timestamp receiveTime);}  // namespace net
}  // namespace muduo#endif  // MUDUO_NET_CALLBACKS_H

1. 核心回调类型总览
回调类型触发时机参数说明
ConnectionCallback连接建立/关闭时TcpConnectionPtr&
MessageCallback收到数据时TcpConnectionPtr&, Buffer*, Timestamp
WriteCompleteCallback数据完全写入内核缓冲区时TcpConnectionPtr&
HighWaterMarkCallback发送缓冲区超过高水位线时TcpConnectionPtr&, size_t(水位值)
CloseCallback连接关闭时(更精细的控制)TcpConnectionPtr&

简单了解之后,我们可以理清客户端的一个基本框架了:

客户端大致结构

#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include "../base/CountDownLatch.h"
#include <muduo/net/EventLoopThread.h>
#include <iostream>
#include <functional>class TranslateClient{public:TranslateClient(const std::string& sip,int port){}void connect() //连接服务器{}void send(const std::string& msg) //发送数据{}private:void onConnection(const muduo::net::TcpConnectionPtr& conn);void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer*buffer,muduo::Timestamp)private:muduo::net::EventLoopThread _loopthread; //客户端的Event对象muduo::CountDownLatch _latch;muduo::net::TcpClient _client; //客户端对象muduo::net::TcpConnection _conn; //连接对象
};int main()
{TranslateClient client("127.0.0.1",8085);client.connect();while(1){std::string buf;std::cin >> buf;client.send(buf);}return 0;
}

这里给解答两个疑点:
在这里插入图片描述

为什么使用 EventLoopThread 而不是 EventLoop

1. EventLoopThread 的核心作用

EventLoopThread 是 Muduo 提供的一个封装类,它:

  • 自动创建并管理一个 EventLoop(在独立线程中运行)
  • 提供线程安全的 EventLoop 获取接口(通过 startLoop()
2. 与直接使用 EventLoop 的关键区别
对比项EventLoopEventLoopThread
线程模型必须在当前线程创建和运行自动在新线程中创建和运行 EventLoop
线程安全性非线程安全(只能在其所属线程操作)通过 startLoop() 安全获取 EventLoop
典型用途单线程程序需要后台运行事件循环的多线程程序
3. 在 TranslateClient 中的必要性
  • 客户端需要非阻塞:如果直接在主线程使用 EventLooploop.loop() 会阻塞主线程,导致无法响应终端输入。
  • 自动线程管理EventLoopThread 简化了多线程下 EventLoop 的生命周期管理。

muduo::CountDownLatch _latch 的作用

1. CountDownLatch 的核心功能
  • 线程同步工具:允许一个或多个线程等待,直到其他线程完成某些操作。
  • 关键方法
    • countDown():计数器减1
    • wait():阻塞直到计数器归零
2. 在 TranslateClient 中的用途

通常用于确保 EventLoopThreadEventLoop 已初始化完成

TranslateClient::TranslateClient(...) : _loopthread([](EventLoop* loop) { /* 初始化代码 */ }),_latch(1)  // 初始计数器为1
{// 启动 EventLoop 线程_loopthread.startLoop();// 等待 EventLoop 初始化完成_latch.wait();
}
3. 典型工作流程
  1. 主线程创建 EventLoopThread 并启动
  2. EventLoopThread 在新线程中初始化 EventLoop
  3. 初始化完成后调用 _latch.countDown()
  4. 主线程通过 _latch.wait() 解除阻塞

客户端补充完之后应该是这样的:

#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include "../base/CountDownLatch.h"
#include "../net/EventLoopThread.h"
#include <iostream>
#include <functional>class TranslateClient {
public:TranslateClient(const std::string& sip, int port): _latch(1),_client(_loopthread.startLoop(), muduo::net::InetAddress(sip, port), "TranslateClient"){_client.setConnectionCallback(std::bind(&TranslateClient::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&TranslateClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void connect() // 连接服务器{_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}bool send(const std::string& msg) // 发送数据{if (_conn && _conn->connected()) // 检查连接是否有效{_conn->send(msg);return true;}return false;}private:void onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){_latch.countDown(); // 唤醒主线程中的阻塞_conn = conn;}else{// 连接关闭std::cout << "新连接关闭" << std::endl;_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp){std::string msg = buffer->retrieveAllAsString();std::cout << "翻译结果:" << msg << std::endl;}private:muduo::net::EventLoopThread _loopthread; // 客户端的 EventLoop 线程muduo::CountDownLatch _latch;muduo::net::TcpClient _client; // 客户端对象muduo::net::TcpConnectionPtr _conn; // 连接对象
};int main()
{TranslateClient client("127.0.0.1", 8085);client.connect();while (true){std::string buf;std::getline(std::cin, buf); // 使用 getline 读取整行输入if (!client.send(buf)){std::cerr << "发送失败" << std::endl;}}return 0;
}

最后效果展示:
在这里插入图片描述
在这里插入图片描述
附上CMakeLists.txt:

cmake_minimum_required(VERSION 3.10)
project(TranslateServer)# 设置 C++ 标准
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)# 查找 muduo 网络库
set(MUDUO_INCLUDE_DIRS ./include)
set(MUDUO_LIBRARY_DIR ./lib)# 添加可执行文件
add_executable(translate_server./server/server.cpp  # 假设你的代码保存在 main.cpp 文件中
)add_executable(translate_client./client/client.cpp  # 假设你的代码保存在 main.cpp 文件中
)# 链接 muduo 库
target_link_libraries(translate_servermuduo_netmuduo_basepthread  # muduo 需要 pthread 库
)# 链接 muduo 库
target_link_libraries(translate_clientmuduo_netmuduo_basepthread  # muduo 需要 pthread 库
)

相关文章:

  • docker面试题(5)
  • 【C++ Primer 学习札记】智能指针
  • selenium——元素定位
  • Java 定时任务中Cron 表达式与固定频率调度的区别及使用场景
  • Unity-编辑器扩展-其二
  • auto关键字解析
  • 【算法】滑动窗口(细节探究,易错解析)5.21
  • 使用Vite创建一个动态网页的前端项目
  • IGBT的结电容大小对实际使用有哪些影响,双脉冲测试验证
  • Cmake 使用教程
  • 【时时三省】Python 语言----文件
  • 使用MATLAB输出1000以内所有完美数
  • golang库源码学习——Pond,小而精的工作池库
  • 移动端前端调试调研纪实:从痛点出发,到 WebDebugX 的方案落地
  • 【C++ 真题】P1075 [NOIP 2012 普及组] 质因数分解
  • 论文篇-1.4.一篇好的论文是改出来的
  • 【18. 四数之和 】
  • 内存屏障指令
  • 人工智能价值:技术革命下的职业新坐标
  • 信息系统项目进度管理实践:从规划到控制的全流程解析
  • 木门行业做网站有什么好处/产品推广宣传方案
  • wordpress图片上浮特效/seo还能赚钱吗
  • 如何用wordpress做网站/外贸网站建设优化推广