在 Muduo 网络库中,Buffer类用于处理网络 I/O 中的数据缓冲。防止应用程序读写太快而网络链路收发速度慢导致的速度不匹配问题。这个类封装了一个内部缓冲区(使用了vector<char>
),并提供了一系列方法来操作这个缓冲区,如读取、写入、扩容等。
设计目标
Buffer类的设计目标主要有以下几点:
- 高性能:通过减少内存拷贝和分配次数来提高性能。
- 易用性:提供简洁的API来方便地进行数据的读写操作。
- 灵活性:支持动态扩容,以适应不同大小的数据。
主要成员变量
std::vector<char> buffer_
:存储数据的内部缓冲区。size_t readerIndex_
:指向可读数据的起始位置。size_t writerIndex_
:指向可写区域的下一个位置。static const size_t kCheapPrepend = 8
:预留的前置空间大小,用于优化小数据的写入。static const size_t kInitialSize = 1024
:初始缓冲区大小。
缓冲区结构
/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | 缓冲区头 | (CONTENT) | 可写空间 |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
/// @endcode
static const size_t kCheapPrepend = 8; // 缓冲区头长度
static const size_t kInitialSize = 1024; // 默认初始化大小
设计思想
Buffer类的设计思想主要体现在以下几个方面:
- 双指针设计:使用
readerIndex_
和writerIndex_
两个指针来分别追踪可读数据和可写区域的位置,实现了读写区域的分离。 - 预留空间:通过预留一定的前置空间(
kCheapPrepend
),可以减少小数据写入时的内存移动操作,提高性能。 - 动态扩容:当可写空间不足时,Buffer类会自动进行扩容,以满足写入需求。
重要成员函数
- 构造函数:初始化Buffer对象,设置初始大小和指针位置。
- readableBytes():返回可读字节数。
- writableBytes():返回可写字节数。
- prependableBytes():返回预留空间的大小。
- peek():返回指向可读数据的指针。
- retrieve(size_t len):将已读数据从缓冲区中移除,并更新
readerIndex_
。 - retrieveAll():移除所有已读数据,重置指针位置。
- append(const char data, size_t len):将数据追加到缓冲区末尾,并更新
writerIndex_
。 - ensureWritableBytes(size_t len):确保至少有
len
字节的可写空间,如果不足则进行扩容。 - makeSpace(size_t len):根据需要扩容或移动数据以腾出足够的可写空间。
扩容策略
Buffer类的扩容策略是其设计的一个亮点。当可写空间不足时,它首先检查整个缓冲区(包括预留空间和已读数据区域)是否足够容纳新的数据。如果足够,它会通过移动数据来腾出空间,而不是直接扩容。这样可以减少不必要的内存分配和拷贝操作,提高性能。只有当整个缓冲区都不足时,它才会进行扩容。
readFd和writeFd方法
- readFd(int fd, int saveErrno):从文件描述符
fd
中读取数据到Buffer中。这个方法使用了readv
系统调用来提高读取性能,特别是在读取大量数据时。它首先尝试将数据读取到Buffer的可写区域中,如果不够则使用额外的栈缓冲区来存储剩余的数据。最多可以读取 128k-1 个字节。 - writeFd(int fd, int saveErrno):将Buffer中的数据写入到文件描述符
fd
对应的发送缓冲区中。这个方法使用了write
系统调用来发送数据。它直接将指向可读数据的指针传递给write
函数,避免了不必要的内存拷贝。
源码
Buffer.h
#pragma once
#include <vector>
#include <string>
/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
/// 模仿
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | 缓冲区头 | (CONTENT) | 可写空间 |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
/// @endcode
// 网络库底层的缓冲区类型
class Buffer
{
public:
static const size_t kCheapPrepend = 8; // 缓冲区头长度
static const size_t kInitialSize = 1024; // 默认初始化大小
Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{
}
~Buffer() = default;
// writerIndex_ - readerIndex_
size_t readableBytes() const
{
return writerIndex_ - readerIndex_;
}
// buffer_.size() - writerIndex_
size_t writableBytes() const
{
return buffer_.size() - writerIndex_;
}
// return readerIndex_
size_t prependableBytes() const
{
return readerIndex_; // 随着上层的读取,readerIndex_会变
}
// 返回第一个可读的数据地址
const char *peek() const
{
// char * + readerIndex_
return begin() + readerIndex_;
}
// 将readerIndex_向后移动已经读走的len长度的字节
void retrieve(size_t len)
{
if (len < readableBytes())// 长度小于可读大小
{
readerIndex_ += len;
}
else // 否则即为读取所有
{
retrieveAll();
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
std::string retrieveAllAsString()
{
return retrieveAsString(readableBytes());
}
std::string retrieveAsString(size_t len)
{
std::string result(peek(), len); // 第一个可读数据的地址开始的len长度
retrieve(len);
return result;
}
/// @brief 从原位置data处开始的len个字节数据追加到缓冲区中
/// @param data 源数据位置
/// @param len 要追加的字节个数
void append(const char* /*restrict*/ data, size_t len)
{
ensureWritableBytes(len); // 确保容量足够
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}
void append(const void* /*restrict*/ data, size_t len)
{
append(static_cast<const char*>(data), len);
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
}
char* beginWrite(){
return begin() + writerIndex_;
}
const char* beginWrite() const
{ return begin() + writerIndex_; }
// 通过fd读取数据:从fd中read数据,写到缓冲区中的writable区域
ssize_t readFd(int fd, int* savedErrno);
// 通过fd发送数据:将buffer的readable区域的数据,写入到fd中
ssize_t writeFd(int fd, int* savedErrno);
private:
char* begin()
{ return &*buffer_.begin(); }
const char *begin() const
{
// *buffer_.begin()获取到vector第一个元素,使用&取地址
return &*buffer_.begin();
}
// buffer扩容
void makeSpace(size_t len)
{
// 若可写大小 + 0~可读位置 不够 要写的数据长度,则申请空间
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
// FIXME: move readable data
buffer_.resize(writerIndex_+len);
}
else // 否则不申请空间,将未读的数据向前移动到紧挨包头之后
{
// move readable data to the front, make space inside buffer
size_t readable = readableBytes(); // 还未读的数据长度
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend); // 紧挨包头之后
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};
Buffer.cc
#include "Buffer.h"
#include <sys/uio.h>
#include <sys/socket.h>
#include <unistd.h>
//
/**
* 从fd中read数据,写到缓冲区中的writable区域
* Poller工作在LT模式,数据不会丢
* Buffer缓冲区有大小,但是从fd读数据的时候不知道tcp数据最终大小
* 所以使用 iovec + extrabuf 使用额外空间
*/
ssize_t Buffer::readFd(int fd, int *savedErrno)
{
char extrabuf[65536] = {0}; // 栈上64k额外数据空间,防止读fd的时候buffer_数组不够用
/**
* struct iovec
* {
* void *iov_base; // Pointer to data.
* size_t iov_len; // Length of data.
* };
* 将iovcnt个iovec传入ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
* 可以在从fd读取的时候,顺序依次写入每个iovec中
*/
iovec vec[2]; // 创建两个iovec
const size_t writable = writableBytes(); // Buffer底层缓冲区剩余的可写大小
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if(n < 0){
*savedErrno = errno;
}
else if(n < writable){ // Buffer底层缓冲区剩余的可写大小 已经足够
writerIndex_ += n;
}
else{ // 使用了extrabuf,需要将其移动到Buffer中
writerIndex_ = buffer_.size(); // buffer_已经写满
// 将extrabuf中数据移动到buffer_中
append(extrabuf, n - writable);
}
return n;
}
/**
* 将buffer的readable区域的数据,写入到fd中
*/
ssize_t Buffer::writeFd(int fd, int *savedErrno)
{
ssize_t n = ::write(fd, peek(), readableBytes());
if(n <= 0){
*savedErrno = errno;
}
return n;
}