当前位置: 首页 > news >正文

moduo之缓冲区Buffer

简介

Buffer作为发送和接收网络数据的缓存区

结构

类定义为

class Buffer : public muduo::copyable
{public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){assert(readableBytes() == 0);assert(writableBytes() == initialSize);assert(prependableBytes() == kCheapPrepend);}// implicit copy-ctor, move-ctor, dtor and assignment are fine// NOTE: implicit move-ctor is added in g++ 4.6void swap(Buffer& rhs){buffer_.swap(rhs.buffer_);std::swap(readerIndex_, rhs.readerIndex_);std::swap(writerIndex_, rhs.writerIndex_);}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() + readerIndex_; }const char* findCRLF() const{// FIXME: replace with memmem()?const char* crlf = std::search(peek(), beginWrite(), kCRLF, kCRLF+2);return crlf == beginWrite() ? NULL : crlf;}const char* findCRLF(const char* start) const{assert(peek() <= start);assert(start <= beginWrite());// FIXME: replace with memmem()?const char* crlf = std::search(start, beginWrite(), kCRLF, kCRLF+2);return crlf == beginWrite() ? NULL : crlf;}const char* findEOL() const{const void* eol = memchr(peek(), '\n', readableBytes());return static_cast<const char*>(eol);}const char* findEOL(const char* start) const{assert(peek() <= start);assert(start <= beginWrite());const void* eol = memchr(start, '\n', beginWrite() - start);return static_cast<const char*>(eol);}// retrieve returns void, to prevent// string str(retrieve(readableBytes()), readableBytes());// the evaluation of two functions are unspecifiedvoid retrieve(size_t len){assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}void retrieveUntil(const char* end){assert(peek() <= end);assert(end <= beginWrite());retrieve(end - peek());}void retrieveInt64(){retrieve(sizeof(int64_t));}void retrieveInt32(){retrieve(sizeof(int32_t));}void retrieveInt16(){retrieve(sizeof(int16_t));}void retrieveInt8(){retrieve(sizeof(int8_t));}void retrieveAll(){readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}string retrieveAllAsString(){return retrieveAsString(readableBytes());}string retrieveAsString(size_t len){assert(len <= readableBytes());string result(peek(), len);retrieve(len);return result;}StringPiece toStringPiece() const{return StringPiece(peek(), static_cast<int>(readableBytes()));}void append(const StringPiece& str){append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}void append(const void* /*restrict*/ data, size_t len){append(static_cast<const char*>(data), len);}void ensureWritableBytes(size_t len){if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}char* beginWrite(){ return begin() + writerIndex_; }const char* beginWrite() const{ return begin() + writerIndex_; }void hasWritten(size_t len){assert(len <= writableBytes());writerIndex_ += len;}void unwrite(size_t len){assert(len <= readableBytes());writerIndex_ -= len;}////// Append int64_t using network endian///void appendInt64(int64_t x){int64_t be64 = sockets::hostToNetwork64(x);append(&be64, sizeof be64);}////// Append int32_t using network endian///void appendInt32(int32_t x){int32_t be32 = sockets::hostToNetwork32(x);append(&be32, sizeof be32);}void appendInt16(int16_t x){int16_t be16 = sockets::hostToNetwork16(x);append(&be16, sizeof be16);}void appendInt8(int8_t x){append(&x, sizeof x);}////// Read int64_t from network endian////// Require: buf->readableBytes() >= sizeof(int32_t)int64_t readInt64(){int64_t result = peekInt64();retrieveInt64();return result;}////// Read int32_t from network endian////// Require: buf->readableBytes() >= sizeof(int32_t)int32_t readInt32(){int32_t result = peekInt32();retrieveInt32();return result;}int16_t readInt16(){int16_t result = peekInt16();retrieveInt16();return result;}int8_t readInt8(){int8_t result = peekInt8();retrieveInt8();return result;}////// Peek int64_t from network endian////// Require: buf->readableBytes() >= sizeof(int64_t)int64_t peekInt64() const{assert(readableBytes() >= sizeof(int64_t));int64_t be64 = 0;::memcpy(&be64, peek(), sizeof be64);return sockets::networkToHost64(be64);}////// Peek int32_t from network endian////// Require: buf->readableBytes() >= sizeof(int32_t)int32_t peekInt32() const{assert(readableBytes() >= sizeof(int32_t));int32_t be32 = 0;::memcpy(&be32, peek(), sizeof be32);return sockets::networkToHost32(be32);}int16_t peekInt16() const{assert(readableBytes() >= sizeof(int16_t));int16_t be16 = 0;::memcpy(&be16, peek(), sizeof be16);return sockets::networkToHost16(be16);}int8_t peekInt8() const{assert(readableBytes() >= sizeof(int8_t));int8_t x = *peek();return x;}////// Prepend int64_t using network endian///void prependInt64(int64_t x){int64_t be64 = sockets::hostToNetwork64(x);prepend(&be64, sizeof be64);}////// Prepend int32_t using network endian///void prependInt32(int32_t x){int32_t be32 = sockets::hostToNetwork32(x);prepend(&be32, sizeof be32);}void prependInt16(int16_t x){int16_t be16 = sockets::hostToNetwork16(x);prepend(&be16, sizeof be16);}void prependInt8(int8_t x){prepend(&x, sizeof x);}void prepend(const void* /*restrict*/ data, size_t len){assert(len <= prependableBytes());readerIndex_ -= len;const char* d = static_cast<const char*>(data);std::copy(d, d+len, begin()+readerIndex_);}void shrink(size_t reserve){// FIXME: use vector::shrink_to_fit() in C++ 11 if possible.Buffer other;other.ensureWritableBytes(readableBytes()+reserve);other.append(toStringPiece());swap(other);}size_t internalCapacity() const{return buffer_.capacity();}/// Read data directly into buffer.////// It may implement with readv(2)/// @return result of read(2), @c errno is savedssize_t readFd(int fd, int* savedErrno);private:char* begin(){ return &*buffer_.begin(); }const char* begin() const{ return &*buffer_.begin(); }void makeSpace(size_t len){if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// move readable data to the front, make space inside bufferassert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}private:std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;static const char kCRLF[];
};

主要包含三个成员

private:std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;

buffer_:数据的存储
readerIndex_:缓存区的读开始位置
writerIndex_:缓存区的写开始位置
writerIndex_-readerIndex_为缓存区的有效数据
在这里插入图片描述
缓存区预留8个字节。

更新读位置 retrieve

 void retrieve(size_t len){assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}void retrieveAll(){readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}

追加数据到缓存 append

将数据添加到缓存区中,同时更新写位置

  void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}void hasWritten(size_t len){assert(len <= writableBytes());writerIndex_ += len;}

从读位置前追加数据prepend

将数据添加到读位置前,并且更新读位置

 void prepend(const void* /*restrict*/ data, size_t len){assert(len <= prependableBytes());readerIndex_ -= len;const char* d = static_cast<const char*>(data);std::copy(d, d+len, begin()+readerIndex_);}

扩容

当缓存区空间不足以容纳将添加的数据时

  • 看从可写位置开始到结束+从开始位置到可读位置的容量不足以容量数据时,则vector容器扩容
  • 否则将缓存区的数据移动到8字节开始位置
  void ensureWritableBytes(size_t len){if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}
void makeSpace(size_t len){if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// move readable data to the front, make space inside bufferassert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}

读数据到缓存readFd

当缓存区的不能容纳65535字节时,通过批量读,然后检查是否可以通过移动方式调整缓存区达到添加数据

ssize_t Buffer::readFd(int fd, int* savedErrno)
{// saved an ioctl()/FIONREAD call to tell how much to readchar extrabuf[65536];struct iovec vec[2];const size_t writable = writableBytes();vec[0].iov_base = begin()+writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extrabuf;vec[1].iov_len = sizeof extrabuf;// when there is enough space in this buffer, don't read into extrabuf.// when extrabuf is used, we read 128k-1 bytes at most.const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;const ssize_t n = sockets::readv(fd, vec, iovcnt);if (n < 0){*savedErrno = errno;}else if (implicit_cast<size_t>(n) <= writable){writerIndex_ += n;}else{writerIndex_ = buffer_.size();append(extrabuf, n - writable);}// if (n == writable + sizeof extrabuf)// {//   goto line_30;// }return n;
}

相关文章:

  • Ubuntu网络数据包发送工具大全
  • MT4完全操作指南:从零基础到EA自动交易
  • LLM复杂记忆存储-多会话隔离案例实战
  • 高斯混合模型(Gaussian Mixture Model, GMM)
  • Spark SQL to_json 函数介绍
  • Riverpod原理解析(实现一个自己的Riverpod)
  • 蜂鸟代理IP+云手机:跨境电商多账号运营的“隐形风控引擎”
  • 从提示工程(Prompt Engineering)到上下文工程(Context Engineering)
  • C++ 第三阶段:语言改进 - 第四节:nullptr vs NULL
  • Reactor Handle
  • MessagesPlaceholder和多轮AI翻译助手实战
  • ubuntu 远程桌面 xrdp + frp
  • 物奇微WQ5007A上手指南
  • opensbi从0到1入门学习
  • 基于dockerfile构建java springboot项目镜像
  • Java+Vue开发的SRM企业招采管理系统,一站式管理招采,助力企业高效运营
  • 系统分析师案例知识点
  • ​​Deepoc大模型在光电研发中的核心技术突破与应用​
  • 单例设计模式详解
  • vue3 定时刷新