当前位置: 首页 > news >正文

使用ASIO的协程实现高并发服务器

使用ASIO的协程实现高并发服务器

在 C++ 网络编程领域,Asio 库提供了两种主要的异步编程范式:传统的回调模式和基于协程的现代模式,传统的回调模式大家都很清楚,这里不多做介绍,本文主要介绍基于协程的模式,Asio 协程基于 C++20 标准协程,提供了更简洁的异步编程模型。

ASIO协程和回调对比

下面先列举一下ASIO使用回调和使用协程的区别,等看完区别后再讲讲ASIO使用协程的核心关键函数有哪些

让我用一个更生动的定时器示例来展示协程如何优雅地解决回调地狱问题。我们将实现一个简单但常见需求:

每隔3秒打印一次消息,共打印5次。

要实现上面这个功能,用传统的回调实现是很麻烦的,下面是用回调实现的代码(为了更清晰,这里不用lambda表达式):

#include <boost/asio.hpp>
#include <iostream>namespace asio = boost::asio;// 前向声明
void timer_callback1(asio::steady_timer* timer, int count);
void timer_callback2(asio::steady_timer* timer, int count);
void timer_callback3(asio::steady_timer* timer, int count);
void timer_callback4(asio::steady_timer* timer, int count);
void timer_callback5(asio::steady_timer* timer, int count);
void final_callback(asio::steady_timer* timer, int count);// 处理第一次定时器到期
void timer_callback1(asio::steady_timer* timer, int count) {std::cout << "回调模式: 第1次打印 - 3秒已过\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback2, timer, count + 1));
}// 处理第二次定时器到期
void timer_callback2(asio::steady_timer* timer, int count) {std::cout << "回调模式: 第2次打印 - 3秒已过\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback3, timer, count + 1));
}// 处理第三次定时器到期
void timer_callback3(asio::steady_timer* timer, int count) {std::cout << "回调模式: 第3次打印 - 3秒已过\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback4, timer, count + 1));
}// 处理第四次定时器到期
void timer_callback4(asio::steady_timer* timer, int count) {std::cout << "回调模式: 第4次打印 - 3秒已过\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback5, timer, count + 1));
}// 处理第五次定时器到期
void timer_callback5(asio::steady_timer* timer, int count) {std::cout << "回调模式: 第5次打印 - 3秒已过\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(final_callback, timer, count + 1));
}// 最终回调
void final_callback(asio::steady_timer* timer, int count) {std::cout << "回调模式: 完成" << count << "次打印\n";delete timer; // 清理资源
}// 启动定时器序列
void start_timer_sequence(asio::io_context& io) {// 在堆上创建定时器(需要在整个序列中保持存活)asio::steady_timer* timer = new asio::steady_timer(io);// 设置第一次等待timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback1, timer, 0));
}int main() {asio::io_context io;// 启动回调地狱start_timer_sequence(io);io.run();return 0;
}

正如上面的例子,每个异步操作需要独立的处理函数,5次操作需要6个函数(5个回调+1个启动函数),逻辑上深层嵌套:start → callback1 → callback2 → ... → final,回调函数形成链式调用,如果业务很复杂,你自己都不知道哪一个回调调用了哪一个,这是非常不友好的。c++11后有了lambda表达式这个情况有所好转,但lambda嵌套lambda看的还是很费劲

上面代码用协程来实现就比较简单了:

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>namespace asio = boost::asio;// 协程版 - 单一函数顺序执行
asio::awaitable<void> sequential_operations() {asio::steady_timer timer(co_await asio::this_coro::executor);// 操作1std::cout << "协程: 启动操作1\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "协程: 操作1完成 - 3秒已过\n";// 操作2std::cout << "协程: 启动操作2\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "协程: 操作2完成 - 3秒已过\n";// 操作3std::cout << "协程: 启动操作3\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "协程: 操作3完成 - 3秒已过\n";// 操作4std::cout << "协程: 启动操作4\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "协程: 操作4完成 - 3秒已过\n";// 操作5std::cout << "协程: 启动操作5\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "协程: 操作5完成 - 3秒已过\n";std::cout << "协程: 所有操作完成\n";co_return;
}int main() {asio::io_context io;asio::co_spawn(io, sequential_operations(), asio::detached);io.run();return 0;
}

使用协程可以让整个序列在一个函数中实现,执行流程一目了然,从上到下顺序执行,无嵌套,无跳转,调整操作顺序只需调整代码顺序,另外,错误处理上,一个 try/catch 块覆盖所有操作

协程与传统回调对比

特性协程回调
代码结构顺序执行,类似同步代码嵌套回调,容易形成"回调地狱"
可读性高,逻辑清晰低,跳转复杂
错误处理使用 try/catch错误码参数
局部变量保持状态需要手动保持状态
控制流标准控制结构手动状态管理
资源管理RAII 自然适用需要额外注意生命周期

Asio 协程通过 C++20 标准协程提供了更简洁、更易读的异步编程模型,同时保持了高性能特性。掌握 co_awaitco_spawn 等关键机制,可以显著提高网络应用的开发效率和可维护性。

ASIO协程的核心概念和关键字

ASIO协程的关键概念和关键字如下:

1. co_await

  • 作用:挂起当前协程,等待异步操作完成

  • 用法:以asio的读取数据的例子举例,异步读取等到读取到数据后继续执行数据处理,传统的回调是这样实现的:

// 读取完成处理函数
void handle_read(boost::system::error_code ec, size_t n, std::shared_ptr<std::array<char, 1024>> buffer) {if (!ec) {std::cout << "读取到 " << n << " 字节数据\n";} else {std::cerr << "读取错误: " << ec.message() << "\n";}
}// 启动读取操作
void start_read(tcp::socket& socket) {// 使用 shared_ptr 管理缓冲区auto buffer = std::make_shared<std::array<char, 1024>>();socket.async_read_some(asio::buffer(*buffer),std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, buffer));
}

使用协程是这样实现的

asio::awaitable<void> read_data(tcp::socket& socket) {char data[1024];// 使用 co_await 等待异步读取size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);std::cout << "读取到 " << n << " 字节数据\n";
}

协程直接"等待"操作完成,代码线性执行,从这个例子也看到协程的另外一个好处,就是可以使用局部变量来管理缓冲区,因为从语法上看,协程它是在一个函数里执行,缓冲区生命周期不会释放,而回调你只能用成员变量或者堆分配空间了,当然,你用lambda表达式也可以实现类型效果

2. co_return和asio::awaitable

  • 作用co_return类似return结束协程执行并返回值,这个返回值实际是asio::awaitable
//这里T是result对应类型
asio::awaitable<T> calculate_value() {co_return result; // 对于有返回值的协程
}asio::awaitable<void>  do_something() {co_return;        // 对于 void 协程
}

下面举例一个具体的例子

//calculate_value函数将延时1秒后返回42
asio::awaitable<int> calculate_value() {asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(std::chrono::seconds(1));co_await timer.async_wait(asio::use_awaitable);// 使用 co_return 返回值co_return 42;
}

协程直接返回结果,类似同步函数,co_return搭配asio::awaitable使用

3. asio::use_awaitable

  • 作用:将异步操作转换为可等待对象,这个作用是告诉asio的函数这是一个协程函数,是为了和异步函数能进行重载区分用的,这个标志能让asio的协程函数和异步回调函数都一样,仅仅是传入的参数不同
//协程版本
asio::awaitable<void> wait_for_timer() {
asio::steady_timer timer(co_await asio::this_coro::executor);// 使用 use_awaitable 使异步操作可等待
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait(asio::use_awaitable);std::cout << "定时器完成\n";
}//回调版本
void wait_for_timer(asio::io_context& io) {auto timer = std::make_shared<asio::steady_timer>(io, std::chrono::seconds(1));timer->async_wait([](const asio::error_code& ec) {if (!ec) {std::cout << "定时器完成\n";} else {std::cerr << "定时器错误: " << ec.message() << "\n";}});
}

4. 启动协程 (co_spawn)

  • 作用co_spawn的作用是启动一个协程
asio::co_spawn(executor,           // 执行上下文 (io_context/strand)coroutine_function, // 协程函数completion_handler   // 完成回调
);

completion_handler是完成处理程序选项有两种:

  • asio::detached: 不关心协程结束(最常用)
asio::co_spawn(executor, session(std::move(socket)), asio::detached);
  • 自定义处理:
asio::co_spawn(executor, session(), [](std::exception_ptr e, int result) {if (e) std::rethrow_exception(e);std::cout << "Result: " << result << "\n";});

这里用ASIO回调方式写一个最简单的tcp服务器是这样的:

// 会话状态管理类
class Session : public std::enable_shared_from_this<Session> {
public:Session(tcp::socket socket) : socket_(std::move(socket)) {}void start() {start_read();}void start_read() {auto self = shared_from_this();auto buffer = std::make_shared<std::array<char, 1024>>();socket_.async_read_some(asio::buffer(*buffer),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, buffer));}void handle_read(boost::system::error_code ec, size_t n, std::shared_ptr<std::array<char, 1024>> buffer) {if (!ec) {// 处理数据...start_read(); // 继续读取}}private:tcp::socket socket_;
};int main() {asio::io_context io;tcp::socket socket(io);// 创建会话并启动auto session = std::make_shared<Session>(std::move(socket));session->start();io.run();
}

用协程可以变为这样:

int main() {asio::io_context io;tcp::socket socket(io);// 使用 co_spawn 启动协程asio::co_spawn(io, session(std::move(socket)), asio::detached);io.run();
}// 协程函数
asio::awaitable<void> session(tcp::socket socket) {// ...协程逻辑...co_return;
}

asio::co_spawn启用一个协程进入session函数,然后等待其完成。

上面例子中asio::detached说明启动这个协程后就不管他的返回了,如果你要获取返回,可以传入一个回调(协程最终还是不能完全避免回调),例如:

// 完成处理函数
void handle_completion(std::exception_ptr e, int result) {if (e) {try {std::rethrow_exception(e);} catch (const std::exception& ex) {std::cerr << "计算错误: " << ex.what() << "\n";}} else {std::cout << "计算结果: " << result << "\n";}
}// 可能失败的协程
asio::awaitable<int> compute_value() {// ...可能抛出异常的计算...co_return 42;
}int main() {asio::io_context io;// 启动协程并指定完成处理函数asio::co_spawn(io, compute_value(), handle_completion);io.run();
}

ASIO协程的其它操作

1. 切换执行上下文

asio::dispatch:协程执行上下文切换,它允许开发者精确控制协程在哪个线程或执行器上运行

// 切换到指定线程的 io_context
co_await asio::dispatch(target_io_context, asio::use_awaitable);

当协程执行到co_await asio::dispatch(...)时:

  • 挂起当前协程:暂停当前执行流程
  • 调度到目标执行器:将协程续体(continuation)提交到目标执行器的队列
  • 在目标上下文恢复:当目标执行器调度该任务时,协程在目标线程恢复执行

例如:网络请求后,有些复杂计算切换到别的线程进行操作:

asio::awaitable<void> process_request() {// 在网络线程池处理请求Request req = co_await receive_request();// 切换到计算线程池处理CPU密集型任务co_await asio::dispatch(compute_pool, asio::use_awaitable);Result res = co_await heavy_computation(req);// 切回网络线程池发送响应co_await asio::dispatch(network_pool, asio::use_awaitable);co_await send_response(res);
}

还有就是线程和GUI的切换,GUI线程一般不会和网络请求在一个线程中

asio::awaitable<void> update_ui() {Data data = co_await fetch_data();// 切换到GUI线程更新界面co_await asio::dispatch(gui_executor, asio::use_awaitable);ui_label.set_text(data.message);ui_chart.update(data.values);
}

2. 协程取消

asio::cancellation_signal cancel_signal;// 启动可取消协程
asio::co_spawn(executor, [](asio::cancellation_signal sig) -> asio::awaitable<void> {asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(10s);// 绑定取消信号co_await timer.async_wait(asio::bind_cancellation_slot(sig.slot(), asio::use_awaitable));}(cancel_signal),asio::detached
);// 在需要时取消
cancel_signal.emit(asio::cancellation_type::all);

协程操作符

协程操作符主要是||&&,允许多个协程同时运行,并等待所有(&&)或者某个(||)协程完成。

1.||操作符

下面举个网络编程中最常见的场景之一"带超时的异步读取"来展示协程的操作符,网络编程经常有一个需求,就是你给对方写一个数据,要n秒内等待对方回复,如果对方n秒内不回复,你要重发,最多重发m次,这个逻辑梳理为:

  • 向对方发送请求数据
  • 等待回复,设置超时时间(如3秒)
  • 如果超时未收到回复,重发请求
  • 最多重发M次(如3次)

用协程这样实现的(注意,这里协程的||操作符要include asio/experimental/awaitable_operators.hpp):

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <system_error>
#include <chrono>namespace asio = boost::asio;
using asio::ip::tcp;
using namespace asio::experimental::awaitable_operators;
namespace chrono = std::chrono;// 带超时重发的可靠请求
asio::awaitable<bool> reliable_request(tcp::socket& socket, const std::string& request,int max_retries = 3,chrono::seconds timeout = 3s) 
{int attempt = 0;while (attempt <= max_retries) {attempt++;std::cout << "尝试 #" << attempt << "/" << max_retries << "\n";try {// 发送请求co_await asio::async_write(socket, asio::buffer(request), asio::use_awaitable);// 准备接收响应std::string response;response.resize(1024);// 设置定时器asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(timeout);// 同时等待读取和超时,这里演示了||操作符,这里面有两个协程函数auto result = co_await (socket.async_read_some(asio::buffer(response), asio::use_awaitable) ||timer.async_wait(asio::use_awaitable));//只要上面有一个返回结果就会往下执行// 处理结果if (result.index() == 0) {  // 收到响应size_t bytes_read = std::get<0>(result).first;response.resize(bytes_read);std::cout << "收到响应: " << response << "\n";co_return true;}else {  // 超时std::cout << "请求超时, ";if (attempt < max_retries) {std::cout << "准备重试...\n";} else {std::cout << "已达最大重试次数\n";}}}catch (const std::exception& e) {std::cerr << "错误: " << e.what() << "\n";co_return false;}}co_return false;  // 所有尝试失败
}// 示例使用
asio::awaitable<void> client_session(tcp::socket socket) {try {// 构造请求数据std::string request = "QUERY_DATA";// 发送可靠请求(最多重试3次,超时3秒)bool success = co_await reliable_request(socket, request, 3, 3s);if (success) {std::cout << "请求成功完成\n";} else {std::cout << "请求失败\n";}}catch (const std::exception& e) {std::cerr << "会话错误: " << e.what() << "\n";}
}int main() {asio::io_context io;// 创建并连接socket(示例)tcp::socket socket(io);// 实际中应连接服务器: socket.connect(endpoint);// 启动客户端会话asio::co_spawn(io, client_session(std::move(socket)), asio::detached);io.run();return 0;
}

上面这个协程实现还是比较清晰的,如果用回调实现,则非常麻烦:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <functional>namespace asio = boost::asio;
using asio::ip::tcp;
namespace chrono = std::chrono;// 前向声明
class ReliableRequest;
void handle_write(boost::system::error_code ec, size_t, std::shared_ptr<ReliableRequest> request);
void handle_read(boost::system::error_code ec, size_t bytes_read,std::shared_ptr<ReliableRequest> request);
void handle_timeout(boost::system::error_code ec,std::shared_ptr<ReliableRequest> request);
void start_request(std::shared_ptr<ReliableRequest> request);// 请求状态管理类
class ReliableRequest : public std::enable_shared_from_this<ReliableRequest> {
public:ReliableRequest(tcp::socket& socket, std::string request, int max_retries, chrono::seconds timeout): socket_(std::move(socket)), request_(std::move(request)), max_retries_(max_retries), timeout_(timeout), timer_(socket_.get_executor()), attempt_(0){}void start() {attempt_ = 0;start_attempt();}private:friend void handle_write(boost::system::error_code, size_t, std::shared_ptr<ReliableRequest>);friend void handle_read(boost::system::error_code, size_t,std::shared_ptr<ReliableRequest>);friend void handle_timeout(boost::system::error_code,std::shared_ptr<ReliableRequest>);void start_attempt() {attempt_++;std::cout << "尝试 #" << attempt_ << "/" << max_retries_ << "\n";auto self = shared_from_this();// 启动异步写入asio::async_write(socket_, asio::buffer(request_),std::bind(handle_write, std::placeholders::_1, std::placeholders::_2, self));}void start_wait() {auto self = shared_from_this();response_buffer_ = std::make_shared<std::array<char, 1024>>();// 启动异步读取socket_.async_read_some(asio::buffer(*response_buffer_),std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, self));// 启动超时定时器timer_.expires_after(timeout_);timer_.async_wait(std::bind(handle_timeout, std::placeholders::_1, self));}void handle_success(size_t bytes_read) {std::string response(response_buffer_->data(), bytes_read);std::cout << "收到响应: " << response << "\n";if (completion_) completion_(true);}void handle_failure() {std::cout << "请求失败\n";if (completion_) completion_(false);}void handle_retry() {if (attempt_ < max_retries_) {std::cout << "准备重试...\n";start_attempt();} else {std::cout << "已达最大重试次数\n";handle_failure();}}public:// 完成回调using CompletionHandler = std::function<void(bool)>;CompletionHandler completion_;tcp::socket socket_;std::string request_;int max_retries_;chrono::seconds timeout_;asio::steady_timer timer_;int attempt_;std::shared_ptr<std::array<char, 1024>> response_buffer_;
};// 写入完成处理
void handle_write(boost::system::error_code ec, size_t, std::shared_ptr<ReliableRequest> request) {if (ec) {std::cerr << "写入错误: " << ec.message() << "\n";request->handle_failure();return;}request->start_wait();
}// 读取完成处理
void handle_read(boost::system::error_code ec, size_t bytes_read,std::shared_ptr<ReliableRequest> request) {// 取消定时器request->timer_.cancel();if (ec == asio::error::operation_aborted) {return; // 超时已处理}if (ec) {std::cerr << "读取错误: " << ec.message() << "\n";request->handle_retry();return;}request->handle_success(bytes_read);
}// 超时处理
void handle_timeout(boost::system::error_code ec,std::shared_ptr<ReliableRequest> request) {if (ec == asio::error::operation_aborted) {return; // 读取已完成}if (ec) {std::cerr << "定时器错误: " << ec.message() << "\n";request->handle_retry();return;}// 取消读取操作request->socket_.cancel();std::cout << "请求超时, ";request->handle_retry();
}// 启动可靠请求
void start_reliable_request(tcp::socket socket, std::string request,ReliableRequest::CompletionHandler completion,int max_retries = 3, chrono::seconds timeout = 3s) {auto request_obj = std::make_shared<ReliableRequest>(std::move(socket), std::move(request), max_retries, timeout);request_obj->completion_ = completion;request_obj->start();
}// 示例使用
void handle_request_complete(bool success) {if (success) {std::cout << "请求成功完成\n";} else {std::cout << "请求失败\n";}
}int main() {asio::io_context io;// 创建socket(示例)tcp::socket socket(io);// 实际中应连接服务器// 启动可靠请求start_reliable_request(std::move(socket), "QUERY_DATA", handle_request_complete,3, chrono::seconds(3));io.run();return 0;
}

2. &&操作符

&&操作符允许你同时启动多个异步操作,等待所有操作完成,以元组形式获取所有结果

auto [result1, result2] = co_await (async_op1(use_awaitable) &&async_op2(use_awaitable)
);

例如要多个接口结果执行完成后才返回,那么久可以用&&操作符

asio::awaitable<SearchResults> search_products(string_view query) {// 并行搜索多个分类auto [electronics, clothing, books] = co_await (product_service.search("electronics", query) &&product_service.search("clothing", query) &&product_service.search("books", query));...//合并到resultsco_return results;
}

总结

Asio协程方便了C++高并发服务器的开发,在保持异步高性能的同时,提供同步代码的简洁性,同步执行流简化调试和性能分析,随着C++20标准的广泛采用,协程已成为构建下一代高性能服务器的首选范式。

但要注意的是,协程并不会提高程序运行的效率,从原理上讲,协程有可能还没有回调高效,但这个性能并不会有太大的差距

相关文章:

  • thinkcmf做网站快不快北京网站定制公司
  • 在网站里怎么做图片超链接潍坊做网站公司
  • 做白酒有没有必要做个网站包就业的培训学校
  • 网站快排是怎么做的百度网盘下载
  • 影楼行业网站营销网络是什么意思
  • 四川做网站的关键词挖掘工具站
  • 数据结构-第三节-树与二叉树
  • 《汇编语言:基于X86处理器》第5章 过程(1)
  • IDE如何快速切换JLINK版本
  • HarmonyOs开发之——TypeScript介绍、入门,及 TypeScript、JavaScript、ArkTs的具体区别解读。
  • 制药行业的精细化管理:GCOM80-2NET自动化解决方案
  • Python+selenium自动化生成测试报告
  • 营业额统计-02.代码开发及功能测试
  • 命名数据网络 | 兴趣包(Interest Packet)
  • GitLab 18.1 正式发布Maven 虚拟仓库、密码泄露检测等功能,可升级体验!
  • 广州华锐互动:技术与创意双驱动的 VR 先锋​
  • Claude 3.7 的 token 预算机制详解:可控深度的混合推理范式
  • HDFS(Hadoop分布式文件系统)总结
  • 【缓存技术】深入分析如果使用好缓存及注意事项
  • 基于SpringBoot和Leaflet的区域冲突可视化-以伊以冲突为例
  • 6.26_JAVA_微服务_Elasticsearch
  • CRON表达式编辑器与定时任务实现技术文档
  • Linux 统一方式安装多版本 JDK 指南
  • LINUX 626 DNS报错
  • 【工具推荐】WaybackLister——发现潜在目录列表
  • JavaEE:分布式session