Boost.Asio学习(3):异步读写
异步读写与同步读写一样,有读/写各有三种api,合计六种函数
分类 | 函数名 | 所属 | 完整性 | 用途简介 |
---|---|---|---|---|
写 | async_write_some | 成员函数 | ❌ 不完整 | 写部分数据 |
写 | async_write | 非成员函数 | ✅ 完整 | 写全部数据 |
写 | async_send | 成员函数 | ❌ 不完整 | 更贴近 socket 概念 |
读 | async_read_some | 成员函数 | ❌ 不完整 | 读部分数据 |
读 | async_read | 非成员函数 | ✅ 完整 | 读全部数据 |
读 | async_receive | 成员函数 | ❌ 不完整 | 更贴近 socket 概念 |
所有的回调类型都是:
void handler(const boost::system::error_code& error, // Result of operation.std::size_t bytes_transferred // 成功读或写的字节数
);
- 返回值是void类型
- 参数是const boost::system::error_code&,std::size_t
异步函数不会阻塞程序,只是执行一个注册步骤,读写在后台执行,执行完成后会调用回调函数
async_write_some
template<typename ConstBufferSequence,typename WriteHandler>
async_write_some(const ConstBufferSequence & buffers,WriteHandler && handler);
参数
buffers
: 可发送的 buffer(如boost::asio::buffer(data, len)
)handler
: 回调函数(const boost::system::error_code&, std::size_t)
,表示写入状态和写入的字节数
功能逻辑
仅尝试写一次,系统底层调用
send()
,可能不写完;如果写不完,你要在回调中继续发剩下的;
不自动重试,也不保证写满。
socket.async_write_some(boost::asio::buffer(data, len),[](boost::system::error_code ec, std::size_t bytes_written) {if (!ec) std::cout << "写了 " << bytes_written << " 字节\n";});
async_write
(封装)
头文件:<boost/asio/write.hpp>
template <typename AsyncWriteStream, typename ConstBufferSequence, typename WriteHandler>
void async_write(AsyncWriteStream& stream,const ConstBufferSequence& buffers,WriteHandler &&handler);
参数
stream
: 写对象(如 socket)buffers
: buffer,可是多个handler
: 写完之后的回调
功能逻辑
会自动调用多次
async_write_some
,直到完整写完为止;常用于发送固定大小数据;
可传入 completion condition 自定义“写够了”的条件。
boost::asio::async_write(socket, boost::asio::buffer(data, len),[](boost::system::error_code ec, std::size_t total) {if (!ec) std::cout << "全部写完:" << total << " 字节\n";});
async_send
template<typename ConstBufferSequence,typename WriteHandler>
async_send(const ConstBufferSequence & buffers,WriteHandler && handler);
//带标志
template<typename ConstBufferSequence,typename WriteHandler>
async_send(const ConstBufferSequence & buffers,socket_base::message_flags flags,WriteHandler && handler);
buffers
: bufferflags
: socket 的发送 flagshandler
: 回调(ec, bytes_sent)
功能逻辑
更接近底层 BSD socket 的
send()
,可以指定 flags;也是不保证写满,只写一次;
支持 UDP 和 TCP。
socket.async_send(boost::asio::buffer(msg),[](boost::system::error_code ec, std::size_t len) {std::cout << "send() 发送了 " << len << " 字节\n";});
async_read_some
template<typename MutableBufferSequence,typename ReadHandler>
async_read_some(const MutableBufferSequence & buffers,ReadHandler && handler);
参数
buffers
: 接收的 buffer(如std::array<char, 1024>
)handler
: 回调函数(error_code, std::size_t)
,表示读取成功的字节数
功能逻辑
尝试从 socket 中读取数据;
如果 socket 中已有数据,马上触发;
不会保证读满,读多少算多少;
最基础、性能最好的读接口。
std::array<char, 1024> buf;
socket.async_read_some(boost::asio::buffer(buf),[](boost::system::error_code ec, std::size_t len) {std::cout << "收到数据:" << len << " 字节\n";});
async_read
(封装)
头文件:<boost/asio/read.hpp>
template <typename AsyncReadStream, typename MutableBufferSequence, typename ReadHandler>
void async_read(AsyncReadStream& s, const MutableBufferSequence& buffers, ReadHandler &&handler);
//追加完成条件
template <typename AsyncReadStream, typename MutableBufferSequence, typename CompletionCondition, typename ReadHandler>
void async_read(AsyncReadStream& s, const MutableBufferSequence& buffers, CompletionCondition c, ReadHandler &&handler);
参数
s
: socket 或流对象buffers
: 接收缓冲区handler
: 成功或失败回调completion_condition
(可选):自定义读取结束的条件
功能逻辑
会调用
async_read_some
多次;直到 buffer 被填满,或者满足条件;
自动完成剩余读取,不用你手动继续调用。
std::vector<char> recv_buf(1024);
boost::asio::async_read(socket, boost::asio::buffer(recv_buf),[](boost::system::error_code ec, std::size_t total) {std::cout << "完整接收:" << total << " 字节\n";});
async_receive
template<typename MutableBufferSequence,typename ReadHandler>
async_receive(const MutableBufferSequence & buffers,ReadHandler && handler);template<typename MutableBufferSequence,typename ReadHandler>
async_receive(const MutableBufferSequence & buffers,socket_base::message_flags flags,ReadHandler && handler);
参数
buffers
: bufferflags
:MSG_PEEK
,MSG_WAITALL
等 socket flagshandler
: 回调(error_code, size_t)
功能逻辑
更贴近 socket 接口的
recv()
;可设置特殊标志位;
不一定读满,只读一次;
可用于 UDP、TCP、原始 socket。
回调函数都是执行在哪个线程的?
Boost.Asio 的回调函数(即你传给 async_*
的 handler),并不会自动开新线程,它的执行线程依赖于谁在调用 io_context.run()
。
在哪个线程里调用了
io_context.run()
,回调就在哪个线程里执行。
boost::asio::io_context io;
boost::asio::ip::tcp::socket socket(io);
std::thread t1([&] { io.run(); });
std::thread t2([&] { io.run(); });
...
回调函数会在多个线程中由 Asio 自动调度,谁空闲谁执行。只有一个线程会抢到一个待执行的回调任务
boost::asio::strand
strand
是 Boost.Asio 提供的用于串行化异步回调的工具,让你即使在多线程 io_context.run()
中,也可以保证某组回调顺序执行,不会并发执行。
Boost.Asio 的异步操作默认是线程安全的,但你的回调函数不一定是。
// 多线程调用 run()
io.run(); // 在线程 t1
io.run(); // 在线程 t2// 注册异步写和异步读
socket.async_write_some(..., handler1);
socket.async_read_some(..., handler2);
如果 handler1 和 handler2 都在修改类的同一成员变量?比如
_recv_buf
、_send_offset
?
那么问题来了:
handler1 和 handler2 可能同时在不同线程被执行!
导致竞态条件(race condition),产生数据错误、崩溃、未定义行为。
strand
的作用
让多个异步操作的回调,在多个线程中执行时,依然“按顺序、一个一个地执行”,而不是并发执行。
它不是锁,不是互斥量;
它是 Boost.Asio 提供的调度器;
可以理解为一个“任务队列”,队列中的任务由线程池执行,但一次只允许一个线程执行其中任务。
创建 strand:
boost::asio::strand<boost::asio::io_context::executor_type> strand(io.get_executor());
用 bind_executor(strand, handler)
绑定你的回调:
socket.async_read_some(buffer,boost::asio::bind_executor(strand,[](const boost::system::error_code& ec, std::size_t len) {// 被 strand 保证的线程串行安全区}));
所有通过 bind_executor(strand, handler)
注册的回调,会被放入 strand
维护的串行队列,不会同时在两个线程中执行。
你可以把 strand
想象成:
一个“只允许单线程访问的任务队列”。
即使队列里排满了回调函数;
也必须等当前一个 handler 执行完毕;
才允许调度执行下一个。
如果你自己已经保证了对同一个对象的状态访问是线程安全的,那你就可以不用 boost::asio::strand
。
特性 | strand | 锁(mutex) |
---|---|---|
性能 | 高,避免上下文切换、阻塞 | 低,容易造成等待 |
死锁风险 | 无 | 有可能 |
粒度 | 精确控制在哪些回调中串行 | 作用范围广,不易控 |
用途 | 异步回调顺序控制 | 任意共享资源保护 |
场景 | 是否需要 strand |
---|---|
单线程 io.run() | ❌ 不需要,默认就串行 |
多线程 io.run() ,多个异步操作可能访问相同成员变量 | ✅ 必须使用 strand |
多个连接,但每个连接只有一个线程访问 | ❌ 可不加 strand |
高性能服务器 | ✅ 推荐使用 strand 代替锁 |
async_accept
它是异步接收连接的函数,是acceptor类的一个成员函数
简化的函数原型
template <typename AcceptHandler>
void async_accept(socket_type& peer, AcceptHandler &&handler);
还有一些重载方法
基本用法
//这个acceptor构造时已经指定端口
acceptor.async_accept(socket, handler);
acceptor
: 类型为tcp::acceptor
socket
: 类型为tcp::socket
,用于保存新建立的连接(与客户端对话的socket)handler
: 回调函数,类型为void(const boost::system::error_code&)
// 指定 endpoint:
acceptor.async_accept(socket, endpoint, handler);
//省略 socket(内部会创建)
acceptor.async_accept(handler);
此时 handler
的类型为:
void (const boost::system::error_code& error,boost::asio::ip::tcp::socket peer)
这种方式 自动创建 socket 对象并传递给回调函数。
会话类:Session
下面我们用异步读写,创建一个回声的echo会话
我们把 Session
类设计成管理一次客户端连接的生命周期,每个Session独占一个用于通信的socket
Session.h头文件
#pragma once
#include<iostream>
#include<boost\asio.hpp>
#include<memory>
using boost::asio::ip::tcp;
class Session
{
public://构造函数,传递ioc给socketSession(boost::asio::io_context& ioc):_socket(ioc){}//获取sockettcp::socket& Socket() {return _socket;}//会话开启void Start();
private:void handle_read(const boost::system::error_code &error,size_t bytes_transferred);//读回调void handle_write(const boost::system::error_code& error, size_t bytes_transferred);//写回调tcp::socket _socket;//与客户端通信的socketstatic const int max_length = 1024;char _data[max_length] = {'\0'};//缓存
};
Session.cpp源文件
#include "Session.h"using namespace std;void Session::Start(){//清空缓存memset(_data, '\0', max_length);//注册异步读事件,通过lambda表达式创建回调函数(也可以通过bind)_socket.async_read_some(boost::asio::buffer(_data, max_length), //捕获this有隐患[this](const boost::system::error_code& error, size_t bytes_transferred) {//回调的逻辑封装成成员函数this->handle_read(error, bytes_transferred);});
}void Session::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{if (!error) {//error为0,没出错,成功读入数据到_data//打印来自客户端的数据cout << "server receive data is" << _data << endl;//注册异步写事件,将传过来的信息传回去(echo)boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), [this](const boost::system::error_code& error, size_t bytes_transferred) {this->handle_write(error, bytes_transferred);});}else {//有错误cout << "read error" <<error.message()<< endl;}
}void Session::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{if (!error) {//写成功//清空缓存,准备下一次接收memset(_data, '\0', max_length);//注册新的读事件_socket.async_read_some(boost::asio::buffer(_data, max_length),[this](const boost::system::error_code& error, size_t bytes_transferred) {this->handle_read(error, bytes_transferred);});}else {//有错误cout << "write error" <<error.message()<< endl;}
}
实际上,在 当前的 Session
设计中,读写是串行的 —— 而这也是推荐的写法!
首先要明确一个常见误区:
误区 | 真相 |
---|---|
异步调用 async_xxx() 就是并发 | ❌ 不对:异步是 非阻塞注册,而非并发执行 |
每次 async_write_some 会立刻跑在其他线程 | ❌ 错,它只是在等待写缓冲区准备好 |
回调函数并发执行 | ❌ 默认情况下(io_context单线程 run)所有回调都是串行调用的 |
在 io_context.run()
是单线程的前提下,所有 async 操作 + 回调 都是 顺序串行执行 的。
当前的Session类的执行过程实际上如下:
Start() → async_read_some(...)→ handle_read()→ async_write(...)→ handle_write()→ async_read_some(...)
这就形成了一个严格串行的逻辑链条:
只有
async_read_some()
完成(回调触发),才会发起async_write()
;只有
async_write()
完成,才会再次发起async_read_some()
;永远只有一个异步操作在“挂着”,保证逻辑清晰、数据一致性强。
那什么时候需要“并发”?
场景 | 建议 |
---|---|
多核服务器,需要并发处理大量连接 | 多线程 + 每个连接独立 |
每个连接内还要并行发多个消息 | 使用 strand 协调多 async |
各个 Session 要同时并发 read + write | 用 asio::strand 分离写锁 |
同时注意到:异步发送用 boost::asio::async_write
,而接收用 async_read_some
async_write
对于“我要发完就结束”的逻辑,非常合适!
async_read_some
适合边读边处理、流式解析;比如协议帧解包、行读取、json流等
服务器类:Server
我们将服务器端的监听socket(Asio可以直接用acceptor)封装成一个Server类,同时使用前面介绍的异步接收async_accept。
声明:
class Server {
public://构造函数传递ioc和监听的端口Server(boost::asio::io_context& ioc, short port);
private://开始监听void start_accept();void handle_accept(std::shared_ptr<Session> new_session,const boost::system::error_code &error);//连接的回调函数//ioc不可拷贝,只能引用;因为要在构造Session时使用ioc构造,所以类里使用ioc引用作为一个成员boost::asio::io_context& _ioc;//acceptor接收客户端的连接boost::asio::ip::tcp::acceptor _acceptor;
};
定义:
Server::Server(boost::asio::io_context& ioc, short port):_ioc(ioc),
_acceptor(_ioc,tcp::endpoint(tcp::v4(),port))//构造acceptor,构造完成即启动监听等待连接
{start_accept();//从连接队列里取出连接
}void Server::start_accept()
{//这里使用智能指针,结合Session类,有错误隐患,先提及一下std::shared_ptr<Session> new_session = std::make_shared<Session>(_ioc);//异步接收连接_acceptor.async_accept(new_session->Socket(), [this,new_session](const boost::system::error_code& error) {this->handle_accept(new_session, error);});
}void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& error)
{if (!error) {//为0,成功new_session->Start();//会话开启}else {//否则释放Session资源new_session.reset();}//继续取出下一个连接start_accept();
}
这里注意,new_session采用智能指针,但是Session类的回调函数捕获的是裸的this指针,将会有严重问题:野指针问题。
出错的流程如下:
start_accept函数通过智能指针的方式创建了一个Session变量为new_session,通过回调的lambda表达式进行捕获,注册回调函数;之后new_session结束作用域,但是因为lambda表达式依然捕获了该智能指针,Session对象并不会销毁
accept完成,执行回调函数lambda表达式,其中handle_accept里调用了Session类的Start函数。在Start函数中,注册了异步读操作,该回调lambda表达式捕获Session类本身的this指针。之后handle_accept执行结束,即lambda表达式执行结束,所有有关new_session的智能指针都销毁了,智能指针引用计数清零,Session对象释放。
异步读操作执行完毕,调用回调函数,该回调lambda表达式中的this指针指向的Session对象已经被销毁了,于是回调函数访问到了野指针this。
那怎么办?
1.在Server类里,不通过智能指针管理Session,而是通过裸指针管理,这样不会因为引用计数的问题导致Session对象自动释放。
2.Session类里的回调函数不是捕获this裸指针,而是通过this构造的新的智能指针,用来增加引用计数来保活
shared_from_this
以下是使用了shared_from_this的新的会话类Session
//.hclass Session : public std::enable_shared_from_this<Session> {
public:Session(boost::asio::io_context& ioc) : _socket(ioc) {}tcp::socket& Socket() { return _socket; }void Start();private:void handle_read(const boost::system::error_code& error, size_t bytes_transferred);void handle_write(const boost::system::error_code& error, size_t bytes_transferred);tcp::socket _socket;static const int max_length = 1024;char _data[max_length] = { '\0' };
};
//.cppvoid Session::Start() {auto self = shared_from_this(); // 保持 Session 活着直到回调memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_read(error, bytes_transferred);});
}void Session::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {if (!error) {cout << "server receive data is: " << string(_data, bytes_transferred) << endl;auto self = shared_from_this();boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_write(error, bytes_transferred);});}else {cout << "read error: " << error.message() << endl;}
}void Session::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {if (!error) {memset(_data, 0, max_length);auto self = shared_from_this();_socket.async_read_some(boost::asio::buffer(_data, max_length),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_read(error, bytes_transferred);});}else {cout << "write error: " << error.message() << endl;}
}
服务器main函数
#include <boost/asio.hpp>
#include <iostream>
#include"Session.h"
int main() {try {boost::asio::io_context ioc;Server s(ioc, 8080);ioc.run();}catch (const std::exception& ex) {std::cerr << "Exception: " << ex.what() << std::endl;return 1;}return 0;
}
oost::asio::io_context::run()
是阻塞的。
io_context::run()
会 一直阻塞,直到:所有的 工作(handler、异步任务)都完成了;
或者调用了
io_context::stop()
。
如果 没有任何挂起的异步操作,它会立即返回。
所以要先构造Server对象,再调用run函数
对端关闭
对端关闭连接时,Boost.Asio 的异步读写操作都会触发回调函数,并且回调的 boost::system::error_code
会反映出连接关闭的状态。
具体来说:
异步读(如
async_read_some
、async_read
):如果对端关闭连接,读操作的回调会被调用,
error_code
会被设置为boost::asio::error::eof
,表示连接已经关闭,没有更多数据可读。此时,
bytes_transferred
通常为0,表示没有读取到数据。
异步写(如
async_write_some
、async_write
):如果对端已经关闭连接,写操作的回调也会被调用,
error_code
会反映写失败的错误(如连接重置boost::asio::error::connection_reset
)。写操作可能因为对端关闭而失败。
所以不要在回调里delete this,如果同时注册异步读写函数,有可能对本对象进行重复释放
本文的回声服务器的例子中,同一时间永远只有一个异步操作被注册;然而实际上,同时注册异步读写是需要的(实现全双工通信),这要再以后学习了