MultiSizePool
MultiSizePool 意思是说我们会分配多个不同大小的chunk,来满足不同size的分配的要求。就像下面的图一样, chunk1~chunk5是不同大小size的chunk,每个chunk下面都有一个block链表来存储数据,同样我们会把空闲的block 放到freelist 中去。
为了增加性能,我们同样也开辟了了线程缓存,每个线程都有一份自己的缓存,每次取的时候都会先从ThreadCache中先取空闲的block,如果没有了我们再从对应大小的chunk中的freelist 来分配。
Chunk的数组
定义了一个chunk的不同size的数组,可以看到size从8 byte到2k byte 大小不等,这些为小对象的size, 对于超过这些size的,我们直接分配对应的大小(大对象)。 这里的每个size都是一个chunk, 每个chunk都存储了若干个block;block 会预先放到freelist 里面。这里会记录一个block data的大小(block_size), 还有总的大小(block 结构体 + data size);我们会使用 ALIGNMENT
对齐,这里我们是根据C++标准定义’std::max_align_t’,我们这里是16. block_count 的大小我们就可以计算出来啦,CHUNK_SIZE 我们预定义为64k。
我们再看下FreeBlock结构体的定义,定义了一个指向next的指针和size的大小。这里会把存储的数据区放到结构体的后面。
内存结构: struct FreeBlock + data
所以这里有两个函数用于从结构体的首地址指向data 数据区的指针; 和从数据区转到指向结构体地址的指针。
static constexpr std::array<size_t, 16> SIZE_CALSSES = { //max 2k
8, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048
};
struct ChunkClass {
std::atomic<FreeBlock*> free_list;
std::atomic<size_t> allocated_count;
std::atomic<size_t> deallocated_count;
size_t block_size; // 每个block data size
size_t total_block_size; // 一个block + data的大小
size_t block_count; // 每个chunk的block数量
ChunkClass() = default;
explicit ChunkClass(size_t size):
free_list(nullptr),
block_size(size),
total_block_size(align_of(sizeof(FreeBlock) + size, ALIGNMENT)),
block_count(CHUNK_SIZE / total_block_size)
{}
};
// 内存结构: FreeBlock + data
struct FreeBlock {
size_t size;
std::atomic<FreeBlock*> next;
FreeBlock(size_t s):size(s), next(nullptr) {}
void* data() { // 指向data 数据
return reinterpret_cast<std::byte*>(this) + sizeof(FreeBlock);
}
static FreeBlock* from_data(void* ptr) { // 从data 数据获取FreeBlock指针
return reinterpret_cast<FreeBlock*>(reinterpret_cast<std::byte*>(ptr) - sizeof(FreeBlock));
}
};
MultiSizeThreadCache
是一个高效的线程本地缓存实现,它在无锁内存池中扮演着至关重要的角色。这个结构通过减少线程间的竞争,显著提升内存分配和释放的性能。
- 线程局部存储: 使用
thread_local
关键字确保每个线程拥有自己独立的缓存实例 - 分层缓存结构: 为每种内存块大小(SIZE_CLASSES)维护独立的缓存队列
- 高效访问: 通过数组实现O(1)时间复杂度的块访问
这里我们定义了一个ClassCache来缓存freelist的空闲block,每个chunk size都有一个大小32的block的缓存, count是记录还剩多少的缓存数量。
内存回收机制
当线程终止时,析构函数负责将缓存的所有内存块归还到全局池:
这个过程包含三个关键步骤:
- 链表构建: 将离散的块连接成一个链表,准备批量归还
- 原子交换: 使用
compare_exchange_weak
确保线程安全地将链表插入全局链表头部 - 计数重置: 归还完成后将缓存计数归零
通过这种线程本地缓存设计,内存池实现了高吞吐量和低延迟的内存分配,特别适合高并发环境下的性能关键型应用。
struct MultiSizeThreadCache {
struct ClassCache {
FreeBlock* blocks[32]; // 每个大小类缓存块
int count; // 当前缓存的数量
};
ClassCache caches[SIZE_CALSSES.size()];
LockFreeMultiSizePool* pool_ptr; // 指向全局内存池
~MultiSizeThreadCache() {
if (!pool_ptr) return;
// 遍历所有大小类
for (size_t index = 0; index < SIZE_CALSSES.size(); ++index) {
auto& class_cache = caches[index];
if (class_cache.count == 0) continue;
// 将块连接成链表
for (int i = 0; i < class_cache.count - 1; ++i) {
class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);
}
auto& chunk_class = pool_ptr->chunk_classes[index];
FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_relaxed);
FreeBlock* cache_head = class_cache.blocks[0];
FreeBlock* cache_tail = class_cache.blocks[class_cache.count - 1];
// 归还到全局链表
do {
cache_tail->next.store(old_head, std::memory_order_relaxed);
} while (!chunk_class.free_list.compare_exchange_weak(old_head, cache_head,
std::memory_order_release,
std::memory_order_relaxed));
class_cache.count = 0;
}
}
};
std::array<ChunkClass, SIZE_CALSSES.size()> chunk_classes;
LockFreeStack<std::unique_ptr<std::byte[]>> allocated_chunks;
static constexpr size_t ALIGNMENT = alignof(std::max_align_t);
static constexpr size_t CHUNK_SIZE = 64 * 1024; // 64k
static inline thread_local MultiSizeThreadCache thread_cache;
初始化
我们在构造的时候就把每个chunk对象的size分配好,这样我们在调用allocate的函数的时候就不需要分配了。注意这里的new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]);
不是分配操作,而是placement new
操作符,在已分配的内存地址上构造对象,而不进行新的分配。
LockFreeMultiSizePool::LockFreeMultiSizePool() {
for (size_t i = 0; i < SIZE_CALSSES.size(); ++i) {
new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]);
}
thread_cache.pool_ptr = this;
}
填充缓存
在我们调用allocate分配函数之前,先熟悉下填充缓存的操作。主要的操作有:
- 检查索引的有效性和缓存是否已有足够内存块
- 计算批量获取的数量(缓存容量的一半)
- 尝试从全局空闲列表中获取一串连续的内存块,如果全局空闲列表为空,则分配新的内存块(调用
allocate_chunk_for_size_class
) - 使用无锁方式更新全局空闲列表头部
- 将获取的内存块放入线程本地缓存
bool LockFreeMultiSizePool::fill_class_cache(size_t index) {
if (index >= SIZE_CALSSES.size()) return false;
ChunkClass& chunk_class = chunk_classes[index];
auto& class_cache = thread_cache.caches[index];
// 已经有缓存,不需要填充
if (class_cache.count > 0) return true;
int BATCH_SIZE = std::size(class_cache.blocks) / 2;
// 尝试从全局链表获取多个块
FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_acquire);
if (!old_head) {
allocate_chunk_for_size_class(index);
old_head = chunk_class.free_list.load(std::memory_order_acquire);
if (!old_head) return false;
}
//获取一串块
FreeBlock* current = old_head;
FreeBlock* new_head = nullptr;
int count = 0;
for (int i = 0; i < BATCH_SIZE - 1 && current->next.load(std::memory_order_relaxed); ++i) {
current = current->next.load(std::memory_order_relaxed);
++count;
}
new_head = current->next.load(std::memory_order_relaxed);
// 更新链表头
if(!chunk_class.free_list.compare_exchange_strong(old_head, new_head, std::memory_order_release, std::memory_order_relaxed)) {
return false;
}
//将获取的块放入本地缓存
current = old_head;
while (current != new_head) {
class_cache.blocks[class_cache.count++] = current;
current = current->next.load(std::memory_order_relaxed);
}
return true;
}
void LockFreeMultiSizePool::allocate_chunk_for_size_class(size_t index) {
if (index >= SIZE_CALSSES.size()) return; // 分配大对象
ChunkClass& chunk_class = chunk_classes[index];
size_t block_count = chunk_class.block_count;
if (block_count == 0) block_count = 1;
size_t actual_chunk_size = chunk_class.total_block_size * block_count;
auto chunk = std::make_unique<std::byte[]>(actual_chunk_size); // 分配一个chunk
std::byte* ptr = chunk.get();
FreeBlock* first_block = nullptr;
FreeBlock* prev_block = nullptr;
FreeBlock* block = nullptr;
// 将chunk 添加到对应大小的free_list 里面
for (size_t i = 0; i < block_count; ++i) {
// 获取每个block
block = new(ptr) FreeBlock(chunk_class.block_size);
ptr += chunk_class.total_block_size;
if (i == 0) {
first_block = block;
}
if (prev_block) {
prev_block->next.store(block, std::memory_order_relaxed);
}
prev_block = block;
}
// 将block 添加到free_list
FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);
do {
block->next.store(old_block, std::memory_order_relaxed);
} while(!chunk_class.free_list.compare_exchange_weak(old_block, first_block, std::memory_order_release, std::memory_order_relaxed));
allocated_chunks.push(std::move(chunk)); // 将chunk 添加到allocated_chunks 管理
}
size_t LockFreeMultiSizePool::get_size_class_index(size_t size) {
// 二分查找到对应的index
size_t left = 0;
size_t right = SIZE_CALSSES.size() - 1;
while (left <= right) {
size_t mid = left + (right - left) / 2;
if (SIZE_CALSSES[mid] < size)
left = mid + 1;
else if (SIZE_CALSSES[mid] > size && mid > 0)
right = mid - 1;
else
return mid;
}
return (left < SIZE_CALSSES.size()) ? left : SIZE_CALSSES.size();
}
allocate 操作
这个函数是内存池的核心操作,我们通过下面步骤:
- 大小对齐与分类:首先将请求的内存大小按照指定对齐值(ALIGNMENT)进行对齐,然后确定对应的大小类别索引。
- 分配策略层次:
- 首先尝试从线程本地缓存获取内存块
- 如果本地缓存为空,尝试批量填充本地缓存
- 如果仍然无法获取,从全局共享的无锁空闲列表获取
- 最后,如果空闲列表为空,则分配新的内存块
- 原子操作保证:使用无锁算法(CAS操作)确保在多线程环境中安全分配内存。
void* LockFreeMultiSizePool::allocate(size_t size) {
if (size == 0) return nullptr;
thread_cache.pool_ptr = this; // 设置当前线程的内存池实例
size_t aligned_size = align_of(size, ALIGNMENT);
size_t index = get_size_class_index(aligned_size);
if (index >= SIZE_CALSSES.size()) { // 分配大对象
void* ptr = std::aligned_alloc(ALIGNMENT, aligned_size);
return ptr;
}
ChunkClass& chunk_class = chunk_classes[index];
FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_acquire);
FreeBlock* block = nullptr;
// 尝试从本地缓存分配
auto& class_cache = thread_cache.caches[index];
if (class_cache.count > 0) {
FreeBlock* block = class_cache.blocks[--class_cache.count];
chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
return block->data(); // 返回数据指针
}
// 尝试批量获取块到本地缓存
if (fill_class_cache(index)) {
FreeBlock* block = class_cache.blocks[--class_cache.count];
chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
return block->data(); // 返回数据指针
}
// 尝试从空闲列表获取
while(old_block) {
if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {
block = old_block;
break;
}
}
if (!block) {
allocate_chunk_for_size_class(index);
old_block = chunk_class.free_list.load(std::memory_order_acquire);
// 再次尝试从空闲列表获取
while(old_block) {
if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {
block = old_block;
break;
}
}
}
if(block) {
chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);
return block->data(); // 返回数据指针
}
return nullptr;
}
deallocate 操作
deallocate 其实和allocate是一对相反的操作。主要把我们的分配的内存指针放回到cache里面中,如果是大对象,我们直接会释放。对于小对象,我们这里做了一个策略,如果检测到缓存已经满了,我们会先把缓存的空间归还到全局链表中,然后再放回到缓存,这样一次操作大大提高后面的释放的效率。
void LockFreeMultiSizePool::deallocate(void* ptr, size_t size) {
if (ptr == nullptr) return;
thread_cache.pool_ptr = this; // 设置当前线程的内存池实例
size_t align_size = align_of(size, ALIGNMENT);
size_t index = get_size_class_index(align_size);
if (index >= SIZE_CALSSES.size()) {
std::free(ptr); // 大对象直接释放
return;
}
ChunkClass& chunk_class = chunk_classes[index];
FreeBlock* block_ptr = FreeBlock::from_data(ptr); // 获取block
// 尝试先放入本地缓存
auto& class_cache = thread_cache.caches[index];
if (class_cache.count < static_cast<int>(sizeof(class_cache.blocks) / sizeof(class_cache.blocks[0]))) {
class_cache.blocks[class_cache.count++] = block_ptr;
} else {
// 本地缓存已满,将一半缓存归还全局链表
int half_count = class_cache.count / 2;
// 构建链表
for (int i = 0; i < half_count - 1; ++i) {
class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);
}
// 将新块添加到链表尾部
class_cache.blocks[half_count - 1]->next.store(block_ptr, std::memory_order_relaxed);
// 归还给全局链表
FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);
do {
block_ptr->next.store(old_block, std::memory_order_relaxed);
} while(!chunk_class.free_list.compare_exchange_weak(old_block, class_cache.blocks[0], std::memory_order_release, std::memory_order_relaxed));
// 压缩剩余缓存 更新class_cache
for (int i = 0; i < class_cache.count - half_count; ++i) {
class_cache.blocks[i] = class_cache.blocks[i + half_count];
}
class_cache.count -= half_count;
}
chunk_class.deallocated_count.fetch_add(1, std::memory_order_relaxed);
}
最后我们还封装了一个类来简单使用内存池,用户只需要调用allocate
, deallocate
就好。
class MemoryPoolAllocater {
private:
LockFreeMultiSizePool pool;
public:
void* allocate(size_t size) {
return pool.allocate(size);
}
void deallocate(void* ptr, size_t size) {
pool.deallocate(ptr, size);
}
template<typename T, typename... Args>
T* create(Args&&... args) {
void* ptr = allocate(sizeof(T));
if (ptr) {
return new (ptr) T(std::forward<Args>(args)...);
}
return nullptr;
}
template<typename T>
void destroy(T* ptr) {
if (ptr) {
ptr->~T();
deallocate(ptr, sizeof(T));
}
}
void print_stats() const {
pool.print_stats();
}
void reset_global_state() {
LockFreeMultiSizePool::reset_global_state();
}
};
详细代码见: MultiSizePool