当前位置: 首页 > news >正文

Boost.Asio学习(3):异步读写

异步读写与同步读写一样,有读/写各有三种api,合计六种函数

分类函数名所属完整性用途简介
async_write_some成员函数❌ 不完整写部分数据
async_write非成员函数✅ 完整写全部数据
async_send成员函数❌ 不完整更贴近 socket 概念
async_read_some成员函数❌ 不完整读部分数据
async_read非成员函数✅ 完整读全部数据
async_receive成员函数❌ 不完整更贴近 socket 概念

所有的回调类型都是:

void handler(const boost::system::error_code& error, // Result of operation.std::size_t bytes_transferred           // 成功读或写的字节数
);
  • 返回值是void类型
  • 参数是const boost::system::error_code&,std::size_t

异步函数不会阻塞程序,只是执行一个注册步骤,读写在后台执行,执行完成后会调用回调函数

async_write_some

template<typename ConstBufferSequence,typename WriteHandler>
async_write_some(const ConstBufferSequence & buffers,WriteHandler && handler);

参数

  • buffers: 可发送的 buffer(如 boost::asio::buffer(data, len)

  • handler: 回调函数 (const boost::system::error_code&, std::size_t),表示写入状态和写入的字节数

功能逻辑

  • 仅尝试写一次,系统底层调用 send(),可能不写完;

  • 如果写不完,你要在回调中继续发剩下的

  • 不自动重试,也不保证写满。

socket.async_write_some(boost::asio::buffer(data, len),[](boost::system::error_code ec, std::size_t bytes_written) {if (!ec) std::cout << "写了 " << bytes_written << " 字节\n";});

async_write(封装)

头文件:<boost/asio/write.hpp>

template <typename AsyncWriteStream, typename ConstBufferSequence, typename WriteHandler>
void async_write(AsyncWriteStream& stream,const ConstBufferSequence& buffers,WriteHandler &&handler);

参数

  • stream: 写对象(如 socket)

  • buffers: buffer,可是多个

  • handler: 写完之后的回调

功能逻辑

  • 会自动调用多次 async_write_some直到完整写完为止

  • 常用于发送固定大小数据;

  • 可传入 completion condition 自定义“写够了”的条件。

boost::asio::async_write(socket, boost::asio::buffer(data, len),[](boost::system::error_code ec, std::size_t total) {if (!ec) std::cout << "全部写完:" << total << " 字节\n";});

async_send

template<typename ConstBufferSequence,typename WriteHandler>
async_send(const ConstBufferSequence & buffers,WriteHandler && handler);
//带标志
template<typename ConstBufferSequence,typename WriteHandler>
async_send(const ConstBufferSequence & buffers,socket_base::message_flags flags,WriteHandler && handler);
  • buffers: buffer

  • flags: socket 的发送 flags

  • handler: 回调 (ec, bytes_sent)

功能逻辑

  • 更接近底层 BSD socket 的 send(),可以指定 flags;

  • 也是不保证写满,只写一次;

  • 支持 UDP 和 TCP。

socket.async_send(boost::asio::buffer(msg),[](boost::system::error_code ec, std::size_t len) {std::cout << "send() 发送了 " << len << " 字节\n";});

 async_read_some

template<typename MutableBufferSequence,typename ReadHandler>
async_read_some(const MutableBufferSequence & buffers,ReadHandler && handler);

参数

  • buffers: 接收的 buffer(如 std::array<char, 1024>

  • handler: 回调函数 (error_code, std::size_t),表示读取成功的字节数

功能逻辑

  • 尝试从 socket 中读取数据;

  • 如果 socket 中已有数据,马上触发

  • 不会保证读满,读多少算多少;

  • 最基础、性能最好的读接口。

std::array<char, 1024> buf;
socket.async_read_some(boost::asio::buffer(buf),[](boost::system::error_code ec, std::size_t len) {std::cout << "收到数据:" << len << " 字节\n";});

async_read(封装)

头文件:<boost/asio/read.hpp>

template <typename AsyncReadStream, typename MutableBufferSequence, typename ReadHandler>
void async_read(AsyncReadStream& s, const MutableBufferSequence& buffers, ReadHandler &&handler);
//追加完成条件
template <typename AsyncReadStream, typename MutableBufferSequence, typename CompletionCondition, typename ReadHandler>
void async_read(AsyncReadStream& s, const MutableBufferSequence& buffers, CompletionCondition c, ReadHandler &&handler);

参数

  • s: socket 或流对象

  • buffers: 接收缓冲区

  • handler: 成功或失败回调

  • completion_condition(可选):自定义读取结束的条件

功能逻辑

  • 会调用 async_read_some 多次;

  • 直到 buffer 被填满,或者满足条件;

  • 自动完成剩余读取,不用你手动继续调用

std::vector<char> recv_buf(1024);
boost::asio::async_read(socket, boost::asio::buffer(recv_buf),[](boost::system::error_code ec, std::size_t total) {std::cout << "完整接收:" << total << " 字节\n";});

async_receive

template<typename MutableBufferSequence,typename ReadHandler>
async_receive(const MutableBufferSequence & buffers,ReadHandler && handler);template<typename MutableBufferSequence,typename ReadHandler>
async_receive(const MutableBufferSequence & buffers,socket_base::message_flags flags,ReadHandler && handler);

参数

  • buffers: buffer

  • flags: MSG_PEEK, MSG_WAITALL 等 socket flags

  • handler: 回调 (error_code, size_t)

功能逻辑

  • 更贴近 socket 接口的 recv()

  • 可设置特殊标志位;

  • 不一定读满,只读一次

  • 可用于 UDP、TCP、原始 socket。

回调函数都是执行在哪个线程的?

Boost.Asio 的回调函数(即你传给 async_* 的 handler)并不会自动开新线程,它的执行线程依赖于谁在调用 io_context.run()。 

在哪个线程里调用了 io_context.run(),回调就在哪个线程里执行。 

boost::asio::io_context io;
boost::asio::ip::tcp::socket socket(io);
std::thread t1([&] { io.run(); });
std::thread t2([&] { io.run(); });
...

回调函数会在多个线程中由 Asio 自动调度谁空闲谁执行。只有一个线程会抢到一个待执行的回调任务

boost::asio::strand

strand 是 Boost.Asio 提供的用于串行化异步回调的工具,让你即使在多线程 io_context.run() 中,也可以保证某组回调顺序执行,不会并发执行

Boost.Asio 的异步操作默认是线程安全的,但你的回调函数不一定是

// 多线程调用 run()
io.run(); // 在线程 t1
io.run(); // 在线程 t2// 注册异步写和异步读
socket.async_write_some(..., handler1);
socket.async_read_some(..., handler2);

如果 handler1 和 handler2 都在修改类的同一成员变量?比如 _recv_buf_send_offset

那么问题来了:

  • handler1 和 handler2 可能同时在不同线程被执行!

  • 导致竞态条件(race condition),产生数据错误、崩溃、未定义行为

strand 的作用

让多个异步操作的回调,在多个线程中执行时,依然“按顺序、一个一个地执行”,而不是并发执行。

  • 它不是锁,不是互斥量;

  • 它是 Boost.Asio 提供的调度器

  • 可以理解为一个“任务队列”,队列中的任务由线程池执行,但一次只允许一个线程执行其中任务。

 创建 strand:

boost::asio::strand<boost::asio::io_context::executor_type> strand(io.get_executor());

bind_executor(strand, handler) 绑定你的回调:

socket.async_read_some(buffer,boost::asio::bind_executor(strand,[](const boost::system::error_code& ec, std::size_t len) {// 被 strand 保证的线程串行安全区}));

所有通过 bind_executor(strand, handler) 注册的回调,会被放入 strand 维护的串行队列,不会同时在两个线程中执行

你可以把 strand 想象成:

一个“只允许单线程访问的任务队列”。

  • 即使队列里排满了回调函数

  • 也必须等当前一个 handler 执行完毕

  • 才允许调度执行下一个

如果你自己已经保证了对同一个对象的状态访问是线程安全的,那你就可以不用 boost::asio::strand

特性strand锁(mutex)
性能高,避免上下文切换、阻塞低,容易造成等待
死锁风险有可能
粒度精确控制在哪些回调中串行作用范围广,不易控
用途异步回调顺序控制任意共享资源保护
场景是否需要 strand
单线程 io.run()❌ 不需要,默认就串行
多线程 io.run(),多个异步操作可能访问相同成员变量✅ 必须使用 strand
多个连接,但每个连接只有一个线程访问❌ 可不加 strand
高性能服务器✅ 推荐使用 strand 代替锁

 async_accept

它是异步接收连接的函数,是acceptor类的一个成员函数

简化的函数原型

template <typename AcceptHandler>
void async_accept(socket_type& peer, AcceptHandler &&handler);

还有一些重载方法

基本用法

//这个acceptor构造时已经指定端口
acceptor.async_accept(socket, handler);
  • acceptor: 类型为 tcp::acceptor

  • socket: 类型为 tcp::socket,用于保存新建立的连接(与客户端对话的socket)

  • handler: 回调函数,类型为 void(const boost::system::error_code&)

// 指定 endpoint:
acceptor.async_accept(socket, endpoint, handler);
//省略 socket(内部会创建)
acceptor.async_accept(handler);

此时 handler 的类型为:

void (const boost::system::error_code& error,boost::asio::ip::tcp::socket peer)

这种方式 自动创建 socket 对象并传递给回调函数

会话类:Session

下面我们用异步读写,创建一个回声的echo会话

我们把 Session 类设计成管理一次客户端连接的生命周期,每个Session独占一个用于通信的socket

Session.h头文件

#pragma once
#include<iostream>
#include<boost\asio.hpp>
#include<memory>
using boost::asio::ip::tcp;
class Session
{
public://构造函数,传递ioc给socketSession(boost::asio::io_context& ioc):_socket(ioc){}//获取sockettcp::socket& Socket() {return _socket;}//会话开启void Start();
private:void handle_read(const boost::system::error_code &error,size_t bytes_transferred);//读回调void handle_write(const boost::system::error_code& error, size_t bytes_transferred);//写回调tcp::socket _socket;//与客户端通信的socketstatic const int max_length = 1024;char _data[max_length] = {'\0'};//缓存
};

Session.cpp源文件

#include "Session.h"using namespace std;void Session::Start(){//清空缓存memset(_data, '\0', max_length);//注册异步读事件,通过lambda表达式创建回调函数(也可以通过bind)_socket.async_read_some(boost::asio::buffer(_data, max_length), //捕获this有隐患[this](const boost::system::error_code& error, size_t bytes_transferred) {//回调的逻辑封装成成员函数this->handle_read(error, bytes_transferred);});
}void Session::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{if (!error) {//error为0,没出错,成功读入数据到_data//打印来自客户端的数据cout << "server receive data is" << _data << endl;//注册异步写事件,将传过来的信息传回去(echo)boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), [this](const boost::system::error_code& error, size_t bytes_transferred) {this->handle_write(error, bytes_transferred);});}else {//有错误cout << "read error" <<error.message()<< endl;}
}void Session::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{if (!error) {//写成功//清空缓存,准备下一次接收memset(_data, '\0', max_length);//注册新的读事件_socket.async_read_some(boost::asio::buffer(_data, max_length),[this](const boost::system::error_code& error, size_t bytes_transferred) {this->handle_read(error, bytes_transferred);});}else {//有错误cout << "write error" <<error.message()<< endl;}
}

实际上,在 当前的 Session 设计中,读写是串行的 —— 而这也是推荐的写法!

首先要明确一个常见误区

误区真相
异步调用 async_xxx() 就是并发❌ 不对:异步是 非阻塞注册,而非并发执行
每次 async_write_some 会立刻跑在其他线程❌ 错,它只是在等待写缓冲区准备好
回调函数并发执行❌ 默认情况下(io_context单线程 run)所有回调都是串行调用的

 在 io_context.run() 是单线程的前提下,所有 async 操作 + 回调 都是 顺序串行执行 的。

当前的Session类的执行过程实际上如下:

Start() → async_read_some(...)→ handle_read()→ async_write(...)→ handle_write()→ async_read_some(...)

这就形成了一个严格串行的逻辑链条

  • 只有 async_read_some() 完成(回调触发),才会发起 async_write()

  • 只有 async_write() 完成,才会再次发起 async_read_some()

  • 永远只有一个异步操作在“挂着”,保证逻辑清晰、数据一致性强。

那什么时候需要“并发”?

场景建议
多核服务器,需要并发处理大量连接多线程 + 每个连接独立
每个连接内还要并行发多个消息使用 strand 协调多 async
各个 Session 要同时并发 read + writeasio::strand 分离写锁

同时注意到:异步发送用 boost::asio::async_write,而接收用 async_read_some

async_write对于“我要发完就结束”的逻辑,非常合适!

async_read_some适合边读边处理、流式解析;比如协议帧解包、行读取、json流等

服务器类:Server

我们将服务器端的监听socket(Asio可以直接用acceptor)封装成一个Server类,同时使用前面介绍的异步接收async_accept。

声明:

class Server {
public://构造函数传递ioc和监听的端口Server(boost::asio::io_context& ioc, short port);
private://开始监听void start_accept();void handle_accept(std::shared_ptr<Session> new_session,const boost::system::error_code &error);//连接的回调函数//ioc不可拷贝,只能引用;因为要在构造Session时使用ioc构造,所以类里使用ioc引用作为一个成员boost::asio::io_context& _ioc;//acceptor接收客户端的连接boost::asio::ip::tcp::acceptor _acceptor;
};

定义:


Server::Server(boost::asio::io_context& ioc, short port):_ioc(ioc),
_acceptor(_ioc,tcp::endpoint(tcp::v4(),port))//构造acceptor,构造完成即启动监听等待连接
{start_accept();//从连接队列里取出连接
}void Server::start_accept()
{//这里使用智能指针,结合Session类,有错误隐患,先提及一下std::shared_ptr<Session> new_session = std::make_shared<Session>(_ioc);//异步接收连接_acceptor.async_accept(new_session->Socket(), [this,new_session](const boost::system::error_code& error) {this->handle_accept(new_session, error);});
}void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& error)
{if (!error) {//为0,成功new_session->Start();//会话开启}else {//否则释放Session资源new_session.reset();}//继续取出下一个连接start_accept();
}

这里注意,new_session采用智能指针,但是Session类的回调函数捕获的是裸的this指针,将会有严重问题:野指针问题。

出错的流程如下:

start_accept函数通过智能指针的方式创建了一个Session变量为new_session,通过回调的lambda表达式进行捕获,注册回调函数;之后new_session结束作用域,但是因为lambda表达式依然捕获了该智能指针,Session对象并不会销毁

accept完成,执行回调函数lambda表达式,其中handle_accept里调用了Session类的Start函数。在Start函数中,注册了异步读操作,该回调lambda表达式捕获Session类本身的this指针。之后handle_accept执行结束,即lambda表达式执行结束,所有有关new_session的智能指针都销毁了,智能指针引用计数清零,Session对象释放。

异步读操作执行完毕,调用回调函数,该回调lambda表达式中的this指针指向的Session对象已经被销毁了,于是回调函数访问到了野指针this。

那怎么办?

1.在Server类里,不通过智能指针管理Session,而是通过裸指针管理,这样不会因为引用计数的问题导致Session对象自动释放。

2.Session类里的回调函数不是捕获this裸指针,而是通过this构造的新的智能指针,用来增加引用计数来保活

shared_from_this

以下是使用了shared_from_this的新的会话类Session

//.hclass Session : public std::enable_shared_from_this<Session> {
public:Session(boost::asio::io_context& ioc) : _socket(ioc) {}tcp::socket& Socket() { return _socket; }void Start();private:void handle_read(const boost::system::error_code& error, size_t bytes_transferred);void handle_write(const boost::system::error_code& error, size_t bytes_transferred);tcp::socket _socket;static const int max_length = 1024;char _data[max_length] = { '\0' };
};
//.cppvoid Session::Start() {auto self = shared_from_this();  // 保持 Session 活着直到回调memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_read(error, bytes_transferred);});
}void Session::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {if (!error) {cout << "server receive data is: " << string(_data, bytes_transferred) << endl;auto self = shared_from_this();boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_write(error, bytes_transferred);});}else {cout << "read error: " << error.message() << endl;}
}void Session::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {if (!error) {memset(_data, 0, max_length);auto self = shared_from_this();_socket.async_read_some(boost::asio::buffer(_data, max_length),[self](const boost::system::error_code& error, size_t bytes_transferred) {self->handle_read(error, bytes_transferred);});}else {cout << "write error: " << error.message() << endl;}
}

服务器main函数

#include <boost/asio.hpp>
#include <iostream>
#include"Session.h"
int main() {try {boost::asio::io_context ioc;Server s(ioc, 8080);ioc.run();}catch (const std::exception& ex) {std::cerr << "Exception: " << ex.what() << std::endl;return 1;}return 0;
}

oost::asio::io_context::run() 是阻塞的

  • io_context::run()一直阻塞,直到:

    1. 所有的 工作(handler、异步任务)都完成了

    2. 或者调用了 io_context::stop()

  • 如果 没有任何挂起的异步操作,它会立即返回。

所以要先构造Server对象,再调用run函数

对端关闭

对端关闭连接时,Boost.Asio 的异步读写操作都会触发回调函数,并且回调的 boost::system::error_code 会反映出连接关闭的状态。

具体来说:

  • 异步读(如 async_read_someasync_read

    • 如果对端关闭连接,读操作的回调会被调用,error_code 会被设置为 boost::asio::error::eof,表示连接已经关闭,没有更多数据可读。

    • 此时,bytes_transferred 通常为0,表示没有读取到数据。

  • 异步写(如 async_write_someasync_write

    • 如果对端已经关闭连接,写操作的回调也会被调用,error_code 会反映写失败的错误(如连接重置 boost::asio::error::connection_reset)。

    • 写操作可能因为对端关闭而失败。

所以不要在回调里delete this,如果同时注册异步读写函数,有可能对本对象进行重复释放

本文的回声服务器的例子中,同一时间永远只有一个异步操作被注册;然而实际上,同时注册异步读写是需要的(实现全双工通信),这要再以后学习了 

http://www.dtcms.com/a/272492.html

相关文章:

  • windows对\和/敏感吗?
  • 小白成长之路-NFS文件存储及论坛项目搭建(php)
  • C++之unordered_set和unordered_map基本介绍
  • jmeter如何让一个线程组中的多个请求同时触发
  • PyTorch中torch.eq()、torch.argmax()函数的详解和代码示例
  • 多线程交替打印ABC
  • Windows安装DevEco Studio
  • 解决问题:在cmd中能查看到pnpm版本,在vscode终端中却报错
  • [5种方法] 如何将iPhone短信保存到电脑
  • 搜索算法在前端的实践
  • G5打卡——Pix2Pix算法
  • Vue前端导出页面为PDF文件
  • 【HDLBits习题 2】Circuit - Sequential Logic(4)More Circuits
  • AI驱动的业务系统智能化转型:从静态配置到动态认知的范式革命
  • 基础 IO
  • Spring Boot中的中介者模式:终结对象交互的“蜘蛛网”困境
  • JAVA JVM的内存区域划分
  • Redis的常用命令及`SETNX`实现分布式锁、幂等操作
  • Redis Stack扩展功能
  • K8S数据流核心底层逻辑剖析
  • AI进化论06:连接主义的复兴——神经网络的“蛰伏”与“萌动”
  • k8s集群--证书延期
  • 项目进度管控依赖Excel,如何提升数字化能力
  • 调度器与闲逛进程详解,(操作系统OS)
  • UI前端与数字孪生结合案例分享:智慧城市的智慧能源管理系统
  • 数据结构笔记10:排序算法
  • Windows 本地 使用mkcert 配置HTTPS 自签名证书
  • Java并发 - 阻塞队列详解
  • XSS(ctfshow)
  • 文心大模型4.5开源测评:保姆级部署教程+多维度测试验证