文章目录
1 IO模型
- 如何设计一个高效且易的event loop,在让每一个线程run一个event loop;
2 为什么non-blocking是应用层buffer必需的
- non-blocking IO的核心思想是避免阻塞在IO系统调用上,可以最大限度地复用thread-of-control,让一个线程能服务于多个socket连接;
- IO线程只能阻塞在IO multiplexing函数上;
- 故应用层缓存时必需的;
【TcpConnection必须由output buffer场景】程序通过TCP发送100kb,但只write了80kb,剩下的20kb该如何处理
- 对于应用程序,只管生成数据即可,发送只需交给网络库;
- 网络库应接收20kb,将它存在output buffer,后注册POLLOUT,一旦socket编程可写即立刻发送数据;
- 当然如果20kb没有被完全发送,则继续执行相同操作;
- 如果写完20kb,则应注销POLLOUT,避免造成busy loop;
- 如果此时还有数据要写入50kb,则应追加在没有写完的数据后面;
- 若buffer里有数据,程序想要关闭连接,则需要先将数据发送完毕再关闭;
【TcpConnection必须有input buffer】
- TCP时无边界的字节流,故接收方需要处理“收到的数据不构成一条完整的消息"和”一次收到两条消息的数据“等情况;
- 当发送方send两条1kb,接收方收到数据的可能:
- 一次性收到2kb;
- 分两次收到,第一次600B,第二次1400B;
- 等等;
- 故网络库再处理socket可读时,需要一次性把socket数据读完(从sys的buf搬到应用层buf),否则反复触发POLLIN,造成busy-loop;
- 若收到数据不完整,则先存放再input buffer,等完整的消息再通知程序的业务逻辑;
【moduo采用epoll LT】
- 为了兼容poll,在文件描述符少时,活动fd比例较高,poll较高效;
- LT编程更高效,可继续使用select/poll;
- 读写时不必等候出现EAGAIN,节省系统调用;
3 Buffer的功能需求
- 对外表现为一块连续的内存,便于让客户编写;
- size可自增长,适用不同大小的数据;
- 内部用std::vector<char>来保存数据,并提供相应的访问函数;
- 减少系统调用,一次读取越多的数据越划算;
【TcpConnetcion 中的input buffer与output buffer】
- TcpConnetcion从socket读取数据,在写入input buffer,客户代码从input buffer读取数据;
- 客户代码会把数据写入output buffer,TcpConnection从output buffer读取并写入socket;
【如何设计并适用缓冲区】
- 在栈上准备衣柜好65535字节的extrabuf,利用readv来读取数据,iovec有两块,第一块指向buffer中的writeable字节,另一个指向栈上的extrabuf;若读入字节数不多,则读到buffer;若长度超过writeable字节,则读到栈上的extrabuf里,在将其append到Buffer;
- 利用临时栈上空间,避免每个连接的初始Buffer过大造成的内存浪费,避免反复调用read的系统开销;
【buffer没有线程安全】
- 对于input buffer,onMessage回调始终会发生在TcpConnection所属的IO线程;
4 Buffer数据结构
- 是vecot<char>的连续内存块,有两个index:readIndex、writeIndex;prependable、readable、writeable;
初始化状态
5 Buffer的操作
【基本的read-write cycle】
- 以下可看成发送方发送两条消息,长度为50、350字节,接受方分别收到2次200字节,在进行分包;
- 【自动增长】:适用vector可自动增长
【size和capacity】
适用vector其中的capcity减少了内存分配的次数,内存以指数增长;
可优化,我们使用resize,会将内存进行初始化,但我们并不需要;
【内存腾挪】
【前方添加prepend】
提供prependable,可让程序能以很低的代价在数据前添加几个字节;
6 Buffer从TCP中读取数据
为了避免读取的内容过多,buf存放不下,则先开辟一个栈空间,并使用iovec来存储,让其他自动填充内存块;
当读取好数据后,根据读取的字节数判断buf是否满足,若读取的字节数超过buf的可写长度,说明有部分内容
存放不下,存放与栈空间中,则需要将该栈空间的内容继续扩充到buf中;
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
char extrabuf[65536]; // 先分配一个栈空间的内存,防止buf中内存不够存放
struct iovec vec[2]; // 创建两个iovec
const size_t writable = writableBytes(); // 获取buf中可写长度
vec[0].iov_base = begin()+writerIndex_; // 指定第一块内存起始地址
vec[0].iov_len = writable; // 指定第一块内存长度
vec[1].iov_base = extrabuf; // 指定第二块内存起始地址
vec[1].iov_len = sizeof extrabuf; // 第二块内存长度
// 判断需要使用iovec的个数
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
// 读取数据
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0) // 读取出现错误
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable) // 说明buf的内存足够存储
{
writerIndex_ += n;
}
else // 说明buf的内存不够存储,剩余的内存存储在extrabuf中,需要将extrabuf的内存继续扩充到buf中
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}
7 其他设计方案
不使用STL
- 可以使用指针来操作,但该做法可能会使代码变得复杂不好维护,性能也没有提高;
zero copy - 若无法接收vector中的resize和copy,则可以考虑分段连续的zero copy buffer;
8 sylar实现
8.1 参考知识
具体代码:https://gitee.com/jaibuti/log_server.git
8.2 简介
内部使用链表将各个内存块串起来,方便后续内存分配;
【数据成员】:
- size_t m_baseSize:内存块的大小,固定大小不可改变
- size_t m_position:当前操作位置,读时可设置
- size_t m_capacity:当前的总容量,在写入时会自动增加
- size_t m_size:当前数据的大小,数据添加在更新
- int8_t m_endian:字节序,默认大端
- Node* m_root:第一个内存块指针
- Node* m_cur:当前操作的内存块指针
【成员函数】:
- addCapacity:扩容内存块;
- 若满足则不需要扩容;
- 若不满足,则先计算需要扩容几个内存块,在分配,且需要记录第一个分配的内存块交给m_cur,便于使用;
- toString:将可读的内容都转化为字符串;
- toHexString:将可读的内容都转化为二进制;
- readFromFile:从文件中读取内容;
- writeToFile:将内容写入到文件;
- setPosition:设置操作位置;
- write:
1.会先检查容量
2.获取当前节点可写入的位置,获取当前内存块所剩的大小
3.开始写入:
- 若内存块的容量足够写入,则直接拷贝进去,并修改m_position,若将内存块写满,则当前内存块需要移动到下一个内存块;
- 若不满足,则会选用下一个内存块
4.写入成功后,需要更新当前m_size大小
-read:
1. 若size大于当前可读大小,抛出错误
2. 先计算出需要读取的位置
3. 当m_position开始往后读取
8.3 zigzag
一种压缩算法,将数据压缩去除无意义的数据位,提高数据的传输效率;
如:00000000_00000000_00000000_00000001该数据,我们只需要其中的1,那么我们可以只发送00000001或1就可以;
(计算机的处理数据的基本)
但在负数的情况下,在最高位会出现1,来干扰我们对数据的处理,那么我们不能直接去除,则需要通过zigzag算法来处理;
【1 首先处理负数的问题】
若该数为负数,则左移一位,让最低位的数变为0;
在将原该数,右移31位,将符号位移动到最低位;
在将两个数值进行异或;
int int_to_zigzag(int n) {
return (n << 1) ^ (n >> 31);
}
// 还原
int zigzag_to_int(int n) {
// 注意右移的时候需要用不带符号的,否则如果为负数,则会补1
return (((unsignedint)n) >>1) ^ -(n & 1);
}
【2 压缩数据】
另外,我们需要表示字节的有效长度,在这里zigzag的方法是,使用自己表示自己;
int write_to_buffer(int zz,byte* buf,int size){
int ret =0;
for (int i =0; i < size; i++)
{
// 若除了后7位没有信息后,则将最后8位取出并跳出循环
if ((zz & (~0x7f)) == 0)
{
buf[i] = (byte)zz;
ret = i +1;
break;
// 若有信息,则将zz的第7个bit补上一个1(即0x80)
} else {
buf[i] = (byte)((zz &0x7f) |0x80);
zz = ((unsignedint)zz)>>7;
}
}
return ret;
}
=====> (~0x7f)16 =(11111111_11111111_11111111_10000000)补
- 从倒数第八位开始,高位全为1的数。他的作用:看除开最后七位后,还有没有信息;
还原数据
int read_from_buffer(byte* buf,intmax_size) {
int ret =0;
int offset =0;
for (int i =0; i < max_size; i++, offset +=7)
{
byte n = buf[i];
if ((n &0x80) !=0x80)
{
ret |= (n <<offset);
break;
} else {
ret |= ((n &0x7f) << offset);
}
}
return ret;
}
8.4 代码
/**
* @file bytearray.h
* @brief 二进制数组(序列化/反序列化)
* @author sylar.yin
* @email 564628276@qq.com
* @date 2019-06-05
* @copyright Copyright (c) 2019年 sylar.yin All rights reserved (www.sylar.top)
*/
#ifndef __SYLAR_BYTEARRAY_H__
#define __SYLAR_BYTEARRAY_H__
#include <memory>
#include <string>
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <vector>
namespace sylar {
/**
* @brief 二进制数组,提供基础类型的序列化,反序列化功能
*/
class ByteArray {
public:
typedef std::shared_ptr<ByteArray> ptr;
/**
* @brief ByteArray的存储节点,解析出二进制的内容
*/
struct Node {
/**
* @brief 构造指定大小的内存块
* @param[in] s 内存块字节数
*/
Node(size_t s);
/**
* 无参构造函数
*/
Node();
/**
* 析构函数,释放内存
*/
~Node();
/// 内存块地址指针
char* ptr;
/// 下一个内存块地址
Node* next;
/// 内存块大小
size_t size;
};
/**
* @brief 使用指定长度的内存块构造ByteArray
* @param[in] base_size 内存块大小
*/
ByteArray(size_t base_size = 4096);
/**
* @brief 析构函数
*/
~ByteArray();
/**
* @brief 写入固定长度int8_t类型的数据
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFint8 (int8_t value);
/**
* @brief 写入固定长度uint8_t类型的数据
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFuint8 (uint8_t value);
/**
* @brief 写入固定长度int16_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFint16 (int16_t value);
/**
* @brief 写入固定长度uint16_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFuint16(uint16_t value);
/**
* @brief 写入固定长度int32_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFint32 (int32_t value);
/**
* @brief 写入固定长度uint32_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFuint32(uint32_t value);
/**
* @brief 写入固定长度int64_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFint64 (int64_t value);
/**
* @brief 写入固定长度uint64_t类型的数据(大端/小端)
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFuint64(uint64_t value);
/**
* @brief 写入有符号Varint32类型的数据
* @post m_position += 实际占用内存(1 ~ 5)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeInt32 (int32_t value);
/**
* @brief 写入无符号Varint32类型的数据
* @post m_position += 实际占用内存(1 ~ 5)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeUint32 (uint32_t value);
/**
* @brief 写入有符号Varint64类型的数据
* @post m_position += 实际占用内存(1 ~ 10)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeInt64 (int64_t value);
/**
* @brief 写入无符号Varint64类型的数据
* @post m_position += 实际占用内存(1 ~ 10)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeUint64 (uint64_t value);
/**
* @brief 写入float类型的数据
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeFloat (float value);
/**
* @brief 写入double类型的数据
* @post m_position += sizeof(value)
* 如果m_position > m_size 则 m_size = m_position
*/
void writeDouble (double value);
/**
* @brief 写入std::string类型的数据,用uint16_t作为长度类型
* @post m_position += 2 + value.size()
* 如果m_position > m_size 则 m_size = m_position
*/
void writeStringF16(const std::string& value);
/**
* @brief 写入std::string类型的数据,用uint32_t作为长度类型
* @post m_position += 4 + value.size()
* 如果m_position > m_size 则 m_size = m_position
*/
void writeStringF32(const std::string& value);
/**
* @brief 写入std::string类型的数据,用uint64_t作为长度类型
* @post m_position += 8 + value.size()
* 如果m_position > m_size 则 m_size = m_position
*/
void writeStringF64(const std::string& value);
/**
* @brief 写入std::string类型的数据,用无符号Varint64作为长度类型
* @post m_position += Varint64长度 + value.size()
* 如果m_position > m_size 则 m_size = m_position
*/
void writeStringVint(const std::string& value);
/**
* @brief 写入std::string类型的数据,无长度
* @post m_position += value.size()
* 如果m_position > m_size 则 m_size = m_position
*/
void writeStringWithoutLength(const std::string& value);
/**
* @brief 读取int8_t类型的数据
* @pre getReadSize() >= sizeof(int8_t)
* @post m_position += sizeof(int8_t);
* @exception 如果getReadSize() < sizeof(int8_t) 抛出 std::out_of_range
*/
int8_t readFint8();
/**
* @brief 读取uint8_t类型的数据
* @pre getReadSize() >= sizeof(uint8_t)
* @post m_position += sizeof(uint8_t);
* @exception 如果getReadSize() < sizeof(uint8_t) 抛出 std::out_of_range
*/
uint8_t readFuint8();
/**
* @brief 读取int16_t类型的数据
* @pre getReadSize() >= sizeof(int16_t)
* @post m_position += sizeof(int16_t);
* @exception 如果getReadSize() < sizeof(int16_t) 抛出 std::out_of_range
*/
int16_t readFint16();
/**
* @brief 读取uint16_t类型的数据
* @pre getReadSize() >= sizeof(uint16_t)
* @post m_position += sizeof(uint16_t);
* @exception 如果getReadSize() < sizeof(uint16_t) 抛出 std::out_of_range
*/
uint16_t readFuint16();
/**
* @brief 读取int32_t类型的数据
* @pre getReadSize() >= sizeof(int32_t)
* @post m_position += sizeof(int32_t);
* @exception 如果getReadSize() < sizeof(int32_t) 抛出 std::out_of_range
*/
int32_t readFint32();
/**
* @brief 读取uint32_t类型的数据
* @pre getReadSize() >= sizeof(uint32_t)
* @post m_position += sizeof(uint32_t);
* @exception 如果getReadSize() < sizeof(uint32_t) 抛出 std::out_of_range
*/
uint32_t readFuint32();
/**
* @brief 读取int64_t类型的数据
* @pre getReadSize() >= sizeof(int64_t)
* @post m_position += sizeof(int64_t);
* @exception 如果getReadSize() < sizeof(int64_t) 抛出 std::out_of_range
*/
int64_t readFint64();
/**
* @brief 读取uint64_t类型的数据
* @pre getReadSize() >= sizeof(uint64_t)
* @post m_position += sizeof(uint64_t);
* @exception 如果getReadSize() < sizeof(uint64_t) 抛出 std::out_of_range
*/
uint64_t readFuint64();
/**
* @brief 读取有符号Varint32类型的数据
* @pre getReadSize() >= 有符号Varint32实际占用内存
* @post m_position += 有符号Varint32实际占用内存
* @exception 如果getReadSize() < 有符号Varint32实际占用内存 抛出 std::out_of_range
*/
int32_t readInt32();
/**
* @brief 读取无符号Varint32类型的数据
* @pre getReadSize() >= 无符号Varint32实际占用内存
* @post m_position += 无符号Varint32实际占用内存
* @exception 如果getReadSize() < 无符号Varint32实际占用内存 抛出 std::out_of_range
*/
uint32_t readUint32();
/**
* @brief 读取有符号Varint64类型的数据
* @pre getReadSize() >= 有符号Varint64实际占用内存
* @post m_position += 有符号Varint64实际占用内存
* @exception 如果getReadSize() < 有符号Varint64实际占用内存 抛出 std::out_of_range
*/
int64_t readInt64();
/**
* @brief 读取无符号Varint64类型的数据
* @pre getReadSize() >= 无符号Varint64实际占用内存
* @post m_position += 无符号Varint64实际占用内存
* @exception 如果getReadSize() < 无符号Varint64实际占用内存 抛出 std::out_of_range
*/
uint64_t readUint64();
/**
* @brief 读取float类型的数据
* @pre getReadSize() >= sizeof(float)
* @post m_position += sizeof(float);
* @exception 如果getReadSize() < sizeof(float) 抛出 std::out_of_range
*/
float readFloat();
/**
* @brief 读取double类型的数据
* @pre getReadSize() >= sizeof(double)
* @post m_position += sizeof(double);
* @exception 如果getReadSize() < sizeof(double) 抛出 std::out_of_range
*/
double readDouble();
/**
* @brief 读取std::string类型的数据,用uint16_t作为长度
* @pre getReadSize() >= sizeof(uint16_t) + size
* @post m_position += sizeof(uint16_t) + size;
* @exception 如果getReadSize() < sizeof(uint16_t) + size 抛出 std::out_of_range
*/
std::string readStringF16();
/**
* @brief 读取std::string类型的数据,用uint32_t作为长度
* @pre getReadSize() >= sizeof(uint32_t) + size
* @post m_position += sizeof(uint32_t) + size;
* @exception 如果getReadSize() < sizeof(uint32_t) + size 抛出 std::out_of_range
*/
std::string readStringF32();
/**
* @brief 读取std::string类型的数据,用uint64_t作为长度
* @pre getReadSize() >= sizeof(uint64_t) + size
* @post m_position += sizeof(uint64_t) + size;
* @exception 如果getReadSize() < sizeof(uint64_t) + size 抛出 std::out_of_range
*/
std::string readStringF64();
/**
* @brief 读取std::string类型的数据,用无符号Varint64作为长度
* @pre getReadSize() >= 无符号Varint64实际大小 + size
* @post m_position += 无符号Varint64实际大小 + size;
* @exception 如果getReadSize() < 无符号Varint64实际大小 + size 抛出 std::out_of_range
*/
std::string readStringVint();
/**
* @brief 清空ByteArray
* @post m_position = 0, m_size = 0
*/
void clear();
/**
* @brief 写入size长度的数据
* @param[in] buf 内存缓存指针
* @param[in] size 数据大小
* @post m_position += size, 如果m_position > m_size 则 m_size = m_position
*/
void write(const void* buf, size_t size);
/**
* @brief 读取size长度的数据
* @param[out] buf 内存缓存指针
* @param[in] size 数据大小
* @post m_position += size, 如果m_position > m_size 则 m_size = m_position
* @exception 如果getReadSize() < size 则抛出 std::out_of_range
*/
void read(void* buf, size_t size);
/**
* @brief 读取size长度的数据
* @param[out] buf 内存缓存指针
* @param[in] size 数据大小
* @param[in] position 读取开始位置
* @exception 如果 (m_size - position) < size 则抛出 std::out_of_range
*/
void read(void* buf, size_t size, size_t position) const;
/**
* @brief 返回ByteArray当前位置
*/
size_t getPosition() const { return m_position;}
/**
* @brief 设置ByteArray当前位置
* @post 如果m_position > m_size 则 m_size = m_position
* @exception 如果m_position > m_capacity 则抛出 std::out_of_range
*/
void setPosition(size_t v);
/**
* @brief 把ByteArray的数据写入到文件中
* @param[in] name 文件名
*/
bool writeToFile(const std::string& name) const;
/**
* @brief 从文件中读取数据
* @param[in] name 文件名
*/
bool readFromFile(const std::string& name);
/**
* @brief 返回内存块的大小
*/
size_t getBaseSize() const { return m_baseSize;}
/**
* @brief 返回可读取数据大小
*/
size_t getReadSize() const { return m_size - m_position;}
/**
* @brief 是否是小端
*/
bool isLittleEndian() const;
/**
* @brief 设置是否为小端
*/
void setIsLittleEndian(bool val);
/**
* @brief 将ByteArray里面的数据[m_position, m_size)转成std::string
*/
std::string toString() const;
/**
* @brief 将ByteArray里面的数据[m_position, m_size)转成16进制的std::string(格式:FF FF FF)
*/
std::string toHexString() const;
/**
* @brief 获取可读取的缓存,保存成iovec数组
* @param[out] buffers 保存可读取数据的iovec数组
* @param[in] len 读取数据的长度,如果len > getReadSize() 则 len = getReadSize()
* @return 返回实际数据的长度
*/
uint64_t getReadBuffers(std::vector<iovec>& buffers, uint64_t len = ~0ull) const;
/**
* @brief 获取可读取的缓存,保存成iovec数组,从position位置开始
* @param[out] buffers 保存可读取数据的iovec数组
* @param[in] len 读取数据的长度,如果len > getReadSize() 则 len = getReadSize()
* @param[in] position 读取数据的位置
* @return 返回实际数据的长度
*/
uint64_t getReadBuffers(std::vector<iovec>& buffers, uint64_t len, uint64_t position) const;
/**
* @brief 获取可写入的缓存,保存成iovec数组
* @param[out] buffers 保存可写入的内存的iovec数组
* @param[in] len 写入的长度
* @return 返回实际的长度
* @post 如果(m_position + len) > m_capacity 则 m_capacity扩容N个节点以容纳len长度
*/
uint64_t getWriteBuffers(std::vector<iovec>& buffers, uint64_t len);
/**
* @brief 返回数据的长度
*/
size_t getSize() const { return m_size;}
private:
/**
* @brief 扩容ByteArray,使其可以容纳size个数据(如果原本可以可以容纳,则不扩容)
*/
void addCapacity(size_t size);
/**
* @brief 获取当前的可写入容量
*/
size_t getCapacity() const { return m_capacity - m_position;}
private:
/// 内存块的大小,固定大小不可改变
size_t m_baseSize;
/// 当前操作位置,读时可设置
size_t m_position;
/// 当前的总容量,在写入时会自动增加
size_t m_capacity;
/// 当前数据的大小,数据添加在更新
size_t m_size;
/// 字节序,默认大端
int8_t m_endian;
/// 第一个内存块指针
Node* m_root;
/// 当前操作的内存块指针
Node* m_cur;
};
}
#endif
本文含有隐藏内容,请 开通VIP 后查看