赛题简单介绍
比赛地址:第四届全球数据库大赛赛道1:云原生共享内存数据库性能优化
赛题大致内容:
本地读写速度快,但空间小,远端读写速度慢,但空间大(通过eRDMA读写远端数据)
初赛时实现一个简化、高效的KV存储引擎,支持Write、Read接口,此时key-value皆为定值
复赛额外实现一个Delete接口和重建(rebuild)功能,此时value为变长值。
评测程序分为2个阶段:
1)程序正确性验证
验证KV操作的正确性(包括加密/解密过程),这部分的耗时不计入运行时间的统计。如果正确性测试不通过,则终止,评测失败。
2)性能评测
引擎使用的本地内存和远端内存限制在 8 GB 和 32 GB。 阶段1. 每个线程分别写入约 12 M个Key大小为 16 Bytes,Value大小为 80-1024 Bytes 的 KV对象,并选择性读取验证;阶段2. 每个线程会进行并发删除,每个线程删除 10 M个Key,删除操作耗时将计入运行时间;阶段3. 每个线程分别再次写入约 10 M个Key大小为 16 Bytes,Value大小为 80-256 Bytes 的 KV对象;接着会进行读写混合测试,开启16个线程以75%:25%的读写比例调用64M次。其中75%的读访问具有热点的特征,大部分的读访问集中在少量的Key上面。最后的分数为以上操作耗时的总和。
数据安排如下:本阶段保证任意时刻数据的value部分长度和不超过30G。纯写入的12M次操作中大约70%的操作Value长度在80-128Bytes之间;大约20%的操作Value长度在129-256Bytes之间;大约10%的操作Value长度在257-1024Bytes之间。读写混合的64M操作中,所有Set操作的Value长度均不超过128Bytes。
评测程序输出大致如下:
Start local encryption evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh
Starting Write-Read Testing.
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 4
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 13
##################### Start Write from index: 14
##################### Start Write from index: 15
##################### End Write from index: 0
##################### End Write from index: 13
##################### End Write from index: 14
##################### End Write from index: 15
##################### End Write from index: 11
##################### End Write from index: 4
##################### End Write from index: 8
##################### End Write from index: 5
##################### End Write from index: 2
##################### End Write from index: 10
##################### End Write from index: 9
##################### End Write from index: 3
##################### End Write from index: 6
##################### End Write from index: 7
##################### End Write from index: 1
##################### End Write from index: 12
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 6
##################### Start Read from index: 7
##################### Start Read from index: 8
##################### Start Read from index: 9
##################### Start Read from index: 10
##################### Start Read from index: 12
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 15
##################### Start Read from index: 11
##################### End Read from index: 0
##################### End Read from index: 13
##################### End Read from index: 14
##################### End Read from index: 15
##################### End Read from index: 11
##################### End Read from index: 4
##################### End Read from index: 8
##################### End Read from index: 5
##################### End Read from index: 2
##################### End Read from index: 10
##################### End Read from index: 9
##################### End Read from index: 3
##################### End Read from index: 7
##################### End Read from index: 1
##################### End Read from index: 6
##################### End Read from index: 12
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 14
##################### Start Write from index: 13
##################### Start Write from index: 15
##################### Start Write from index: 4
##################### End Write from index: 12
##################### End Write from index: 11
##################### End Write from index: 14
##################### End Write from index: 0
##################### End Write from index: 4
##################### End Write from index: 1
##################### End Write from index: 8
##################### End Write from index: 13
##################### End Write from index: 3
##################### End Write from index: 7
##################### End Write from index: 10
##################### End Write from index: 9
##################### End Write from index: 6
##################### End Write from index: 15
##################### End Write from index: 5
##################### End Write from index: 2
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 7
##################### Start Read from index: 6
##################### Start Read from index: 8
##################### Start Read from index: 10
##################### Start Read from index: 11
##################### Start Read from index: 12
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 15
##################### Start Read from index: 9
##################### End Read from index: 12
##################### End Read from index: 11
##################### End Read from index: 14
##################### End Read from index: 0
##################### End Read from index: 4
##################### End Read from index: 1
##################### End Read from index: 8
##################### End Read from index: 13
##################### End Read from index: 3
##################### End Read from index: 7
##################### End Read from index: 10
##################### End Read from index: 9
##################### End Read from index: 6
##################### End Read from index: 15
##################### End Read from index: 5
##################### End Read from index: 2
##################### End The Write-Read Test ##############################
Time for Write-Read Test 32.000000 seconds
LocalEngine::stop finsh
##################### Evaluation Success ##############################
Start local perf evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh
Starting Write-Read Testing.
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 4
##################### Start Write from index: 3
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 13
##################### Start Write from index: 14
##################### Start Write from index: 15
##################### End Write from index: 7
##################### End Write from index: 8
##################### End Write from index: 1
##################### End Write from index: 14
##################### End Write from index: 10
##################### End Write from index: 12
##################### End Write from index: 3
##################### End Write from index: 6
##################### End Write from index: 11
##################### End Write from index: 13
##################### End Write from index: 15
##################### End Write from index: 9
##################### End Write from index: 0
##################### End Write from index: 5
##################### End Write from index: 2
##################### End Write from index: 4
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 7
##################### Start Read from index: 9
##################### Start Read from index: 6
##################### Start Read from index: 11
##################### Start Read from index: 8
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 12
##################### Start Read from index: 15
##################### Start Read from index: 10
##################### End Read from index: 7
##################### End Read from index: 8
##################### End Read from index: 1
##################### End Read from index: 14
##################### End Read from index: 10
##################### End Read from index: 12
##################### End Read from index: 3
##################### End Read from index: 6
##################### End Read from index: 11
##################### End Read from index: 13
##################### End Read from index: 15
##################### End Read from index: 9
##################### End Read from index: 0
##################### End Read from index: 5
##################### End Read from index: 2
##################### End Read from index: 4
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 4
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 13
##################### Start Write from index: 12
##################### Start Write from index: 14
##################### Start Write from index: 5
##################### Start Write from index: 15
##################### End Write from index: 13
##################### End Write from index: 12
##################### End Write from index: 5
##################### End Write from index: 14
##################### End Write from index: 7
##################### End Write from index: 6
##################### End Write from index: 15
##################### End Write from index: 2
##################### End Write from index: 10
##################### End Write from index: 4
##################### End Write from index: 11
##################### End Write from index: 3
##################### End Write from index: 9
##################### End Write from index: 0
##################### End Write from index: 8
##################### End Write from index: 1
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 6
##################### Start Read from index: 7
##################### Start Read from index: 8
##################### Start Read from index: 9
##################### Start Read from index: 10
##################### Start Read from index: 12
##################### Start Read from index: 14
##################### Start Read from index: 13
##################### Start Read from index: 5
##################### Start Read from index: 11
##################### Start Read from index: 15
##################### End Read from index: 13
##################### End Read from index: 12
##################### End Read from index: 5
##################### End Read from index: 14
##################### End Read from index: 7
##################### End Read from index: 6
##################### End Read from index: 15
##################### End Read from index: 2
##################### End Read from index: 10
##################### End Read from index: 4
##################### End Read from index: 11
##################### End Read from index: 3
##################### End Read from index: 9
##################### End Read from index: 0
##################### End Read from index: 8
##################### End Read from index: 1
##################### Start Update from index: 0
##################### Start Update from index: 1
##################### Start Update from index: 2
##################### Start Update from index: 3
##################### Start Update from index: 4
##################### Start Update from index: 5
##################### Start Update from index: 7
##################### Start Update from index: 6
##################### Start Update from index: 8
##################### Start Update from index: 10
##################### Start Update from index: 11
##################### Start Update from index: 12
##################### Start Update from index: 9
##################### Start Update from index: 13
##################### Start Update from index: 14
##################### Start Update from index: 15
##################### After updating, Start Reading from index: 15
##################### End Read in Updating from index: 15
##################### After updating, Start Reading from index: 10
##################### End Read in Updating from index: 10
##################### After updating, Start Reading from index: 7
##################### End Read in Updating from index: 7
##################### After updating, Start Reading from index: 5
##################### End Read in Updating from index: 5
##################### After updating, Start Reading from index: 4
##################### End Read in Updating from index: 4
##################### After updating, Start Reading from index: 11
##################### End Read in Updating from index: 11
##################### After updating, Start Reading from index: 8
##################### End Read in Updating from index: 8
##################### After updating, Start Reading from index: 1
##################### End Read in Updating from index: 1
##################### After updating, Start Reading from index: 0
##################### End Read in Updating from index: 0
##################### After updating, Start Reading from index: 6
##################### End Read in Updating from index: 6
##################### After updating, Start Reading from index: 2
##################### End Read in Updating from index: 2
##################### After updating, Start Reading from index: 3
##################### End Read in Updating from index: 3
##################### After updating, Start Reading from index: 9
##################### End Read in Updating from index: 9
##################### After updating, Start Reading from index: 12
##################### End Read in Updating from index: 12
##################### After updating, Start Reading from index: 14
##################### End Read in Updating from index: 14
##################### After updating, Start Reading from index: 13
##################### End Read in Updating from index: 13
##################### End The Write-Read Test ##############################
Time for Write-Read Test 158.000000 seconds
Starting Deleting Testing.
##################### Start Delete from index: 0
##################### Start Delete from index: 1
##################### Start Delete from index: 2
##################### Start Delete from index: 3
##################### Start Delete from index: 4
##################### Start Delete from index: 6
##################### Start Delete from index: 7
##################### Start Delete from index: 5
##################### Start Delete from index: 9
##################### Start Delete from index: 10
##################### Start Delete from index: 11
##################### Start Delete from index: 14
##################### Start Delete from index: 8
##################### Start Delete from index: 15
##################### Start Delete from index: 13
##################### Start Delete from index: 12
##################### After Deleting, Start Reading from index: 7
##################### After Deleting, Start Reading from index: 8
##################### After Deleting, Start Reading from index: 1
##################### After Deleting, Start Reading from index: 14
##################### After Deleting, Start Reading from index: 12
##################### After Deleting, Start Reading from index: 10
##################### After Deleting, Start Reading from index: 6
##################### After Deleting, Start Reading from index: 3
##################### After Deleting, Start Reading from index: 11
##################### After Deleting, Start Reading from index: 15
##################### After Deleting, Start Reading from index: 13
##################### After Deleting, Start Reading from index: 9
##################### After Deleting, Start Reading from index: 0
##################### After Deleting, Start Reading from index: 5
##################### After Deleting, Start Reading from index: 2
##################### After Deleting, Start Reading from index: 4
##################### ReInserting from index: 8
##################### ReInserting from index: 7
##################### ReInserting from index: 1
##################### ReInserting from index: 12
##################### ReInserting from index: 6
##################### ReInserting from index: 11
##################### ReInserting from index: 10
##################### ReInserting from index: 13
##################### ReInserting from index: 15
##################### ReInserting from index: 9
##################### ReInserting from index: 14
##################### ReInserting from index: 0
##################### ReInserting from index: 3
##################### ReInserting from index: 5
##################### ReInserting from index: 2
##################### ReInserting from index: 4
##################### End Delete Test from index: 11
##################### End Delete Test from index: 13
##################### End Delete Test from index: 8
##################### End Delete Test from index: 5
##################### End Delete Test from index: 1
##################### End Delete Test from index: 7
##################### End Delete Test from index: 6
##################### End Delete Test from index: 10
##################### End Delete Test from index: 0
##################### End Delete Test from index: 14
##################### End Delete Test from index: 2
##################### End Delete Test from index: 9
##################### End Delete Test from index: 3
##################### End Delete Test from index: 15
##################### End Delete Test from index: 12
##################### End Delete Test from index: 4
##################### End The Delete Test ##############################
Time for Delete Test 47.000000 seconds
Starting HOT Data Testing.
##################### Start test from index: 0
##################### Start test from index: 1
##################### Start test from index: 2
##################### Start test from index: 3
##################### Start test from index: 4
##################### Start test from index: 6
##################### Start test from index: 5
##################### Start test from index: 8
##################### Start test from index: 9
##################### Start test from index: 10
##################### Start test from index: 11
##################### Start test from index: 12
##################### Start test from index: 13
##################### Start test from index: 7
##################### Start test from index: 14
##################### Start test from index: 15
##################### End The Hot Data Test ##############################
Time for Hot Data Test 46.000000 seconds
Hot:46
LocalEngine::stop finsh
Total:253
##################### Evaluation Success ##############################
Success evaluation, update score...37.2979146325685141834147
复赛排名第20名,正好是极客奖最后一名,嘻嘻。
比赛经历
在初赛时官方提供了一个简单的demo,将key和远端地址存于本地,value全部存于远端,初赛结束时我们的代码大致架构为:
key-value数据以页面的方式存储起来,本地存储key的元数据(key,(页号,索引))(5G),缓存少量页(2G),远端页的远端地址,远端存储大部分页数据(30G)。
写入时:
插入:将数据插入新申请的页中,写入元数据(key,(页号,页索引))
更新:若对应页面当前存于远端,则视作插入操作处理,并更新key元数据,
若对应页面存于本地,则直接更新value
更新LRU列表
读取时:
若页面存于远端,则读取远端数据,将该页加入本地缓存
若页面存于本地,则直接读取数据
更新LRU列表
淘汰:
开启一个后台线程,当本地缓存页大于阈值时,将最久未被访问的页写入远端,记录远端地址
哈希:
为减小锁争用,我们构建了许多个执行请求的实体,并通过对key进行哈希将请求分发至某一实体
(LocalEngine->LocalEngineEntity),而后对key进行第二次哈希,写入/读取key的元数据。
由于STL的map占用空间较大,官方提供了哈希表的简单实现,直接刚开始就申请足够的空间,然后使
用拉链法连接哈希值相同的数据,不需要动态扩容。
初赛结束时,我们只得了9分,最大的原因在于第一次哈希与第二次哈希使用同样的哈希函数(std::hash),导致LocalEngineEntity里的自定义哈希表中很大的一部分空间永远不会被访问(哈希值皆为LocalEngineEntity下标的整数),增大了哈希冲突的概率。
在初赛的基础上编写复赛代码,主要实现三个功能:value的加密,删除操作,重构操作(删除被标记为无效的数据,整理有效数据使其排列更紧凑)。
加密:value的加密根据IPP-Crypto的接口简单实现一个加密算法即可,没有几行代码。
bool LocalEngine::set_aes() {
// Current algorithm is not supported, just for demonstration.
m_aes_.algo = CBC;
m_aes_.key_len = 16;
m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
if (m_aes_.key == nullptr) return false;
m_aes_.blk_size = 16;
m_aes_.piv_len = 16;
m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
if (m_aes_.piv == nullptr) return false;
int ctxSize; // AES context size
ippsAESGetSize(&ctxSize); // evaluating AES context size
// allocatting memory for AES context
m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
// AES context initialization
ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
// encrypting plaintext
ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
encrypt_value = std::move(tmp);
return true;
}
删除: 代码的数据通路为:key——(page, index) ——本地缓存m_data_map——远端地址m_addr_map。故实现删除操作首先需要将key—>(page, index)的映射删除,这个只需要增加自定义哈希表的删除功能,注意将删除后的slot插入另一个链表中,以便复用该slot
data_info_t hash_map_t::remove(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
hash_map_slot *parent = nullptr;
if (cur == nullptr) {
return kNullInfo;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
// 在bucket中删除该slot
if (parent == nullptr) {
m_bucket_[index] = cur->next;
} else {
parent->next = cur->next;
}
// 加入后备链表
cur->next = m_slot_head_->next;
m_slot_head_->next = cur;
return cur->info;
}
parent = cur;
cur = cur->next;
}
return kNullInfo;
}
将该映射删除后,无法通过key访问相应的value,但value仍然占据存储空间,故需要标记该位置,表示该value已经被删除,在重构操作时不需要迁移该位置的数据。
std::bitset<kMaxIndex> m_bitmap_[kBitmapSize]; // 删除为1,正常为0
每一页增加一个位图,标记页中记录是否有效。其中kMaxIndex表示页中最大记录数,kBitmapSize表示运行过程中的最大页号。
与位图相关的另一个操作是更新操作,如果更新操作对应的数据当前在远端,若此时读取远端数据再进行本地更新效率太低;故将这个操作拆分为删除远端数据+插入新数据;这时也需要将远端数据标记为无效。
bool LocalEngineEntity::deleteK(const std::string &key) {
m_delete_envent_ = true;
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
m_mutex_.lock();
data_info_t info = m_page_map_.remove(key, hash_index); // 删除对应key的元数据
m_mutex_.unlock();
m_bitmap_[info.page_id].set(info.index, true); // 将对应记录标记为删除
return true;
}
// 数据在远端的更新操作
m_bitmap_[slot->info.page_id].set(slot->info.index, true); // 将之前数据标记为删除
slot->info = info; // 更新元数据信息
重构:
本地缓存与远端数据交互的基本单位是页,程序运行过程中,无效记录会越来越多,故需定时读取所有页,将有效记录写入到新页中,删除旧页,类似于一种垃圾回收。
在读取远端页时,先读取其头部元数据,再依次读取有效记录,而不是读取整个页数据,这是因为rebuild时远端页有效记录占比较小,这样的读取方式可以减小读取量。
void LocalEngineEntity::rebuild_index() {
std::lock_guard<std::mutex> lk(m_mutex_);
std::string key;
std::string value;
data_info_t data_info;
std::shared_ptr<Page> page;
std::unordered_map<page_id_t, remote_info_t> tmp_addr_map; // 暂时存储页号与远端地址映射
uint32_t new_cache_size = 1;
std::vector<page_id_t> local_id = m_lru_list_.clear();
auto new_page = m_cur_page_;
page_id_t new_page_id = m_cur_page_id_;
// 处理本地缓存页
for (auto &page_id : local_id) {
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
m_lru_list_.insert(page_id);
new_cache_size++;
} else if (!bitmap.all()) { //存在有效记录
auto page = m_data_map_[page_id];
int record_num = page->record_number();
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) { // 该记录未被删除
if (new_page->is_full()) { // 页满,写入本地
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
key = page->read_key(i);
data_info.index = new_page->insert(key, page->read_value(i));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
m_data_map_[page_id] = nullptr; // 删除原先页
} else {
m_data_map_[page_id] = nullptr; // 删除原先页
}
}
page_id_t page_id;
remote_info_t info;
char head_data[kMaxIndex * 10];
char kv_data[2 * kMaxValueSize];
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
uint16_t head_length;
uint16_t offset, length;
// 处理远端内存页
for (auto &kv : m_addr_map_) {
page_id = kv.first;
info = kv.second;
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
tmp_addr_map.insert({page_id, info});
} else if (bitmap.all()) { // 不存在有效记录,将后端地址加入地址列表
m_addr_list_.emplace(info);
} else {
bool avai_info = true; // 是否将该远端地址加入地址列表
head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
// 读取页头部数据
m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
int record_num = m_max_index_[page_id];
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) {
if (new_page->is_full()) {
if (new_cache_size > kPageThreshold) { // 本地页满,写入远程
std::string &&page_data = new_page->to_string();
uint32_t len = page_data.length();
m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
tmp_addr_map.insert({new_page_id, info}); // 暂时记录页号与远程地址映射
avai_info = false;
} else { // 写入本地缓存
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
}
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
key = std::string(kv_data, kv_data + 16);
value = std::string(kv_data + 16, kv_data + length);
data_info.index = new_page->insert(key, std::move(value));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
if (avai_info) {
m_addr_list_.emplace(info);
}
}
}
m_addr_map_ = std::move(tmp_addr_map);
m_cur_page_id_ = new_page_id;
m_cur_page_ = new_page;
m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_cache_size_ = new_cache_size;
m_last_update_id_ = kNullPage;
m_data_map_[new_page_id] = new_page;
}
一些细节
变长字符编码方式演化
auto &&vicitm_page_data = victim_page->to_string(); // 记录淘汰页数据,开始写入远端内存
m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), len, remote_info.remote_addr,
remote_info.rkey);
kv数据在本地缓存是以string数组(Page类)的形式存储的,当本地缓存达到阈值时,需将很久未访问的页写入到远端,此时是写一个大字符串;故需要将string数组转换为一个大字符串。由于之后有可能再访问该页,需要把各记录的大小也编码进字符串中。刚开始我将每个记录编码成记录大小 +‘\0’ +记录内容的形式。
/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小 '\0' 记录内容
记录大小 '\0' 记录内容
...
'\0'(页结束标志)
*/
class Page {
public:
Page(std::string &data); // 以字符串填充页
std::string to_string(); // 页数据转换成字符串
private:
int get_size(const std::string &data, int &start); // 字符串转数字
std::string get_string(int size); // 数字转字符串
std::vector<std::string> m_value_; // 以vector存储记录
std::vector<std::string> m_key_; // 以vector存储记录
uint32_t m_cur_size_; // 当前页数据大小
uint32_t m_cur_index_; // 当前索引
};
// 将start为起点的字符串转换成数字
int Page::get_size(const std::string &data, int &start) {
std::string size_str = "";
int value_size = 0;
while (true) {
if (data[start] != '\0') { // 未遇到结束标志,加入字符串
size_str.push_back(data[start]);
} else {
if (!size_str.empty()) { // 字符串不为空,转换成数字
value_size = std::stoi(size_str);
}
break;
}
start++;
}
start++; // 跳过当前的结束字符
return value_size; // 页末尾结束字符返回0
}
// 将数字转换成字符串并在其后填充结束字符
std::string Page::get_string(int size) {
auto res = std::to_string(size);
res.push_back('\0');
return res;
}
Page::Page(std::string &data) {
int start = 0;
int value_num = get_size(data, start); // 读取记录数
int page_size = get_size(data, start); // 读取页大小
m_key_.reserve(value_num);
m_value_.reserve(value_num);
m_cur_index_ = value_num;
m_cur_size_ = page_size;
std::string kv;
int value_size;
while (true) { // 循环读取记录
value_size = get_size(data, start);
if (value_size <= 0) { // 遇到页末尾结束字符则返回
break;
}
kv = data.substr(start, value_size);
start += value_size;
m_key_.emplace_back(kv.substr(0, 16));
m_value_.emplace_back(kv.substr(16));
}
}
std::string Page::to_string() {
assert(this != nullptr);
std::string data;
data.reserve(m_cur_size_ + m_cur_index_ * 6); // 提前预订字符串空间
data.append(get_string(m_cur_index_)); // 加入记录数
data.append(get_string(m_cur_size_)); // 加入页大小
assert(m_cur_index_ == m_key_.size());
assert(m_key_.size() == m_value_.size());
for (uint32_t i = 0; i < m_cur_index_; i++) {
data.append(get_string(16 + m_value_[i].length())); // 加入记录大小
data.append(m_key_[i]); // 加入记录数据
data.append(m_value_[i]);
}
data.push_back('\0'); // 页末尾填充结束字符
return data;
}
而后稍微改变一下编码方式,将记录大小全部放在头部,记录数据放在尾部。
/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小'\0'
记录大小'\0'
记录内容(key value)
记录内容(key value)
保存key值以便在rebuild时能够更改元数据信息
...
*/
class Page {
public:
Page();
Page(std::string &data); // 以字符串填充页
std::string to_string(); // 页数据转换成字符串
private:
std::string get_string(int size); // 数字转字符串
std::string m_key_[kMaxIndex];
std::string m_value_[kMaxIndex];
uint32_t m_cur_size_; // 当前页数据大小
uint32_t m_cur_index_; // 当前索引
// std::vector<std::string> m_value_; // 以vector存储key值,方便rebuild时更新元数据
// std::vector<std::string> m_key_; // 以vector存储记录
};
std::string Page::to_string() {
std::string data;
data.reserve(m_cur_size_ + m_cur_index_ * 4); // 提前预订字符串空间
data.append(get_string(m_cur_index_)); // 加入记录数
data.append(get_string(m_cur_size_)); // 加入页大小
for (uint32_t i = 0; i < m_cur_index_; i++) { // 加入各个记录大小
data.append(get_string(16 + m_value_[i].size()));
}
for (uint32_t i = 0; i < m_cur_index_; i++) {
data.append(m_key_[i]); // 加入记录数据
data.append(m_value_[i]);
}
return data;
}
Page::Page(std::string &str) {
std::string::size_type pos;
std::string::size_type size = str.size();
// 获取页大小与最大记录数
int start = 0;
pos = str.find('\0', start);
m_cur_index_ = std::stoi(str.substr(start, pos - start));
start = pos + 1;
pos = str.find('\0', start);
m_cur_size_ = std::stoi(str.substr(start, pos - start));
start = pos + 1;
// 获取各个记录长度
std::vector<int> record_lengths(m_cur_index_);
for (uint32_t i = 0; i < m_cur_index_; i++) {
pos = str.find('\0', start);
record_lengths[i] = std::stoi(str.substr(start, pos - start));
start = pos + 1;
}
// 获取各个记录内容
int record_num = record_lengths.size();
for (int i = 0; i < record_num; i++) {
m_key_[i] = str.substr(start, 16);
m_value_[i] = str.substr(start + 16, record_lengths[i] - 16);
start += record_lengths[i];
}
}
这时候编码解码比之前稍微简单一点(去掉了get_size函数),但将长度编码进字符串的方式还是觉得有点低效。后面一想,为什么不直接把大字符串视作一个整数数组,对数组元素赋值就可以了,也就是说 123没必要转换成’1’ ‘2’ '3’后存入数组(3个字节),而是直接对一个short类型(2字节)的数赋值。 这样就不再需要编码额外的结束字符作为标记了。
字符数组的头部存储各种元数据,尾部存储实际key-value数据。
/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
std::string data;
data.resize(kAllocSize);
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = char_pointer + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
return data;
}
Page::Page(std::string &data) {
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
m_cur_index_ = u16_pointer[0];
m_cur_size_ = u16_pointer[1];
uint16_t offset, length;
for (uint16_t i = 0; i < m_cur_index_; i++) {
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_key_[i] = data.substr(offset, 16);
m_value_[i] = data.substr(offset + 16, length - 16);
}
m_is_dirty_ = false;
}
#pragma pack使用错误
本地缓存总共8G空间,有5G是存储key的元数据。其结构体定义如下:
using page_id_t = uint16_t;
using index_t = uint16_t;
struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};
/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};
#pragma pack(4)
struct hash_map_slot_test {
char key[16];
data_info_t info;
hash_map_slot *next;
};
// 没有#pragma pack()
可以看到struct data_info_t占4字节,但struct hash_map_slot因为对齐的原因占32字节。自然而然的,为了节省空间,可以强制结构体4字节对齐,这样就能节省4字节的空间,也就节省了1/8的空间。但由于不熟悉#pragma pack,#pragma pack(4)并没有以#pragma pack()结束。然后一运行程序就段错误,gdb调试时bt显示调用栈,f3时传递参数为32位,f2突然截断,参数变成了16位。我觉得这个错误过于诡异,因为就修改了字节对齐,故到比赛结束我都没再用#pragma pack,一般也不建议使用#pragma pack。
示例程序:
using page_id_t = uint16_t;
using index_t = uint16_t;
struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};
/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};
#pragma pack(4)
struct hash_map_slot_test {
char key[16];
data_info_t info;
hash_map_slot_test *next;
};
#pragma pack()
int main() {
cout << sizeof(hash_map_slot) << endl;
cout << sizeof(hash_map_slot_test) << endl;
hash_map_slot slot1;
hash_map_slot_test slot2;
printf("hash_map_slot layout:\n%p \n%p \n%p\n", &(slot1.key), &(slot1.info), &(slot1.next));
printf("hash_map_slot test layout:\n%p \n%p \n%p\n", &(slot2.key), &(slot2.info), &(slot2.next));
}
/*
输出:
32
28
hash_map_slot layout:
0x7ffef416d1f0
0x7ffef416d200
0x7ffef416d208
hash_map_slot test layout:
0x7ffef416d1d0
0x7ffef416d1e0
0x7ffef416d1e4
*/
相关博客:
C/C++中结构体内存对齐(边界对齐),#pragma pack设置
关于#pragma pack(n)引发的一系列问题
右值引用本身是左值
1: void func(Test&& t);
2: void func(Test& t);
Test t1
Test&& t2 = std::move(t1);
func(t2);
对于右值引用我一直有一个误区,认为右值引用是右值,例如以上代码片段,实参类型为Test&&,很容易认为此时调用的函数为第二个,但实际上此时调用的却是第一个。这是因为右值引用本身是左值,更为具体来说,右值引用类型既可以被当作左值也可以被当作右值,判断的标准是,如果它有名字,那就是左值,否则就是右值。
示例程序:
#include <bits/stdc++.h>
using namespace std;
class Test {
public:
Test() = default;
Test(const Test &test) : str(test.str) { printf("enter copy construction\n"); }
Test(Test &&test) : str(std::move(test.str)) { printf("enter move construction\n"); }
Test &operator=(const Test &test) {
str = test.str;
printf("enter copy assign\n");
return *this;
}
Test &operator=(Test &&test) {
str = std::move(test.str);
printf("enter move assign\n");
return *this;
}
private:
string str;
};
int main() {
Test p1, p2, p3;
Test &&p4 = std::move(p3);
Test p5(p1); // 调用复制构造
Test p6(std::move(p2)); // 调用移动构造
Test p7(p4); // 调用复制构造
Test t1, t2, t3, t4;
Test &&t5 = std::move(t4);
t1 = t2; // 调用复制赋值
t1 = std::move(t3); // 调用移动赋值
t1 = t5; // 调用复制赋值(误区)
}
/*
输出:
enter copy construction
enter move construction
enter copy construction
enter copy assign
enter move assign
enter copy assign
*/
实际上使用vscode将鼠标点到函数调用的地方就能看到所调用的函数
注意move后不应该再使用变量值
我有一次错误就在于调用move函数后仍然使用对象的size方法,导致未定义的行为。
template<class T>
void swap(T& a, T& b)
{
T tmp(std::move(a));
a = std::move(b);
b = std::move(tmp);
}
X a, b;
swap(a, b);
参考博客:
详解C++右值引用
其余优化
读者互斥锁
当读请求未命中时,程序需读取远端的字符串数据,并将其解码成string数组(Page类),插入本地缓存后再读取value;该操作耗时较长,且没必要持有锁;但如果多个请求同时请求该远端页,如果让每一个请求都读取远端页,既浪费IO资源,也没啥实际用处;故增加一个读者互斥锁,保证每一页只有一个读者正在请求远端页,其余请求同一远端页的读者会阻塞互斥锁前;待读取远端页的读者读取页数据完成并将该页插入本地缓存中,其余读者发现本地缓存已存在该页,就不会重复读取远端页了。
相关代码:
if (m_data_map_[info.page_id] == nullptr) { // 数据在远端内存
m_mutex_.unlock();
m_same_reader_mutex_[info.page_id].lock(); // 读者互斥
if (m_data_map_[info.page_id] == nullptr) { // 双重检查
auto remote_info = m_addr_map_[info.page_id];
std::string &&page_data = std::string(remote_info.size, '0');
m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
auto new_page = std::make_shared<Page>(page_data); // 构建缓存页
m_mutex_.lock();
m_data_map_[info.page_id] = new_page;
m_cache_size_++;
m_lru_list_.insert(info.page_id);
need_update_lru = false;
m_addr_map_.erase(info.page_id); // 在远端地址映射中删除该项
m_addr_list_.emplace(remote_info); // 将该页加入地址列表
m_mutex_.unlock();
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}
m_same_reader_mutex_[info.page_id].unlock();
m_mutex_.lock();
}
时不时用sizeof看看占用空间,增加一个读者互斥锁相当于每页多了40字节,占比不大。
int main() {
cout << "mutex:" << sizeof(std::mutex) << endl;
cout << "shared_ptr:" << sizeof(std::shared_ptr<int>) << endl;
}
/*
输出:
mutex:40
shared_ptr:16
*/
LRU优化
不论是读操作还是写操作,末尾都需要更新LRU列表;为了减小冲突,LRU采用独立的锁,不在m_mutex_锁内更新;当前页(新申请的页)并不加入LRU列表,也不会被淘汰,待当前页写满后再插入到LRU列表中;上次LRU更新的页本次也不再更新(已在队首)。
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}
map改成数组 vector改数组
由于最大页号确定,map<page_id,page>改成了page[max_page_id];由于页最大记录数确定, std::vector< std::string > m_value_变成了std::string m_value_[kMaxIndex]; 通过使用固定的数据结构,减少了扩容带来的开销。
程序说明
程序正确性预设
1:运行中页号不得超过kBitmapSize,页中记录数不得超过kMaxIndex,页导出字符串长度不得超过kAllocSize
2:程序经历连续的删除操作后才进入读写操作,使得rebuild最大化。在删除之前远端内存足容纳所有数据
3:即使每次插入时会检查当前页是否已满,但仍然无法阻止程序通过更新value值来增大页大小,所以页数据大小大于kAllocSize便会出现问题
4:kAllocSize小于65536,页成员就可以使用uint16存储,大于则需要使用uint32存储
5:mutex之外的操作耗时极短,可以在rebuild处理前完成
1: debug时系统高效充分打印所需的信息
2:架构越简洁。越不容易出错
3: 可编写简单的测试用例检验模块的正确性
4: 利用sizeof和system_clock简单估计内存占用与执行速度
5 右值引用变量在用于表达式时是左值,move后不应该再使用变量值
int &&x = 1;
f(x); // 调用 f(int& x)
f(std::move(x)); // 调用 f(int&& x)
6 编写代码时明确锁所维持的不变量是什么,而不是仅仅确保操作的原子性
m_mutex_:确保保护成员操作的原子性,并保证页数据仅存在本地(m_data_map_)与远端(m_addr_map_)中的一个位置
m_remote_read_lock_:淘汰时在数据真正写入前阻塞对应页的请求
相关代码
Engine
// kv_engine.h
namespace kv {
/* Encryption algorithm competitor can choose. */
enum aes_algorithm { CTR = 0, CBC, CBC_CS1, CBC_CS2, CBC_CS3, CFB, OFB };
/* Algorithm relate message. */
typedef struct crypto_message_t {
aes_algorithm algo;
Ipp8u *key;
Ipp32u key_len;
Ipp8u *counter;
Ipp32u counter_len;
Ipp8u *piv;
Ipp32u piv_len;
Ipp32u blk_size;
Ipp32u counter_bit;
IppsAESSpec *ctx;
} crypto_message_t;
/* Abstract base engine */
class Engine {
public:
virtual ~Engine(){};
virtual bool start(const std::string addr, const std::string port) = 0;
virtual void stop() = 0;
virtual bool alive() = 0;
};
class LocalEngineEntity; // 前置声明
/* Local-side engine */
class LocalEngine : public Engine {
public:
~LocalEngine();
bool start(const std::string addr, const std::string port) override;
void stop() override;
bool alive() override;
/* Init aes context message. */
bool set_aes();
bool encrypted(const std::string &value, std::string &encrypt_value);
/* Evaluation problem will call this function. */
crypto_message_t *get_aes() { return &m_aes_; }
bool write(const std::string &key, const std::string &value, bool use_aes = false);
bool read(const std::string &key, std::string &value);
/** The delete interface */
bool deleteK(const std::string &key);
/** Rebuild the hash index to recycle the unsed memory */
void rebuild_index();
private:
// static inline int get_index(const std::string &key) { return std::hash<std::string>()(key) & (kSharedNumber - 1); }
static inline int get_index(const std::string &key) { return CityHash16(key.c_str()) & (kSharedNumber - 1); }
kv::ConnectionManager *m_rdma_conn_;
// /* NOTE: should use some concurrent data structure, and also should take the
// * extra memory overhead into consideration */
// RDMAMemPool *m_rdma_mem_pool_;
crypto_message_t m_aes_;
LocalEngineEntity *m_entity_[kSharedNumber]; // 执行请求的实体
};
/* Remote-side engine */
class RemoteEngine : public Engine {
public:
struct WorkerInfo {
CmdMsgBlock *cmd_msg;
CmdMsgRespBlock *cmd_resp_msg;
struct ibv_mr *msg_mr;
struct ibv_mr *resp_mr;
rdma_cm_id *cm_id;
struct ibv_cq *cq;
};
~RemoteEngine(){};
bool start(const std::string addr, const std::string port) override;
void stop() override;
bool alive() override;
private:
void handle_connection();
int create_connection(struct rdma_cm_id *cm_id);
struct ibv_mr *rdma_register_memory(void *ptr, uint64_t size);
int remote_write(WorkerInfo *work_info, uint64_t local_addr, uint32_t lkey, uint32_t length, uint64_t remote_addr, uint32_t rkey);
int allocate_and_register_memory(uint64_t &addr, uint32_t &rkey, uint64_t size);
void worker(WorkerInfo *work_info, uint32_t num);
struct rdma_event_channel *m_cm_channel_;
struct rdma_cm_id *m_listen_id_;
struct ibv_pd *m_pd_;
struct ibv_context *m_context_;
bool m_stop_;
std::thread *m_conn_handler_;
WorkerInfo **m_worker_info_;
uint32_t m_worker_num_;
std::thread **m_worker_threads_;
};
} // namespace kv
// local_engine.cc
#include "assert.h"
#include "atomic"
#include "kv_engine.h"
#include "local_engine_entity.h"
#include <iostream>
namespace kv {
/**
* @description: start local engine service
* @param {string} addr the address string of RemoteEngine to connect
* @param {string} port the port of RemoteEngine to connect
* @return {bool} true for success
*/
bool LocalEngine::start(const std::string addr, const std::string port) {
m_rdma_conn_ = new ConnectionManager();
if (m_rdma_conn_ == nullptr) return false;
if (m_rdma_conn_->init(addr, port, 4, 72)) return false;
for (int i = 0; i < kSharedNumber; i++) {
m_entity_[i] = new LocalEngineEntity(this, m_rdma_conn_);
}
printf("LocalEngine::start finsh\n");
auto watcher = std::thread([]() {
sleep(60 * 9);
printf("timeout\n");
fflush(stdout);
abort();
});
watcher.detach();
return true;
}
/**
* @description: stop local engine service
* @return {void}
*/
void LocalEngine::stop() {
for (int i = 0; i < kSharedNumber; i++) {
delete m_entity_[i];
}
delete m_rdma_conn_;
printf("LocalEngine::stop finsh\n");
};
/**
* @description: get engine alive state
* @return {bool} true for alive
*/
bool LocalEngine::alive() { return true; }
/**
* @description: provide message about the aes_ecb mode
* @return {bool} true for success
*/
bool LocalEngine::set_aes() {
// Current algorithm is not supported, just for demonstration.
m_aes_.algo = CBC;
m_aes_.key_len = 16;
m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
if (m_aes_.key == nullptr) return false;
m_aes_.blk_size = 16;
m_aes_.piv_len = 16;
m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
if (m_aes_.piv == nullptr) return false;
int ctxSize; // AES context size
ippsAESGetSize(&ctxSize); // evaluating AES context size
// allocatting memory for AES context
m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
// AES context initialization
ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
// encrypting plaintext
ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
encrypt_value = std::move(tmp);
return true;
}
bool LocalEngine::write(const std::string &key, const std::string &value, bool use_aes) {
int index = get_index(key);
return m_entity_[index]->write(key, value, use_aes);
}
/**
* @description: read value from engine via key
* @param {string} key
* @param {string} &value
* @return {bool} true for success
*/
bool LocalEngine::read(const std::string &key, std::string &value) {
int index = get_index(key);
return m_entity_[index]->read(key, value);
}
bool LocalEngine::deleteK(const std::string &key) {
int index = get_index(key);
return m_entity_[index]->deleteK(key);
}
/* When we delete a number of KV pairs, we should rebuild the index to
* reallocate remote addr for each KV to recycle fragments. This will block
* front request processing. This solution should be optimized. */
void LocalEngine::rebuild_index() {
/** rebuild all the index to recycle the fragment */
/** step-1: block the database and not allowe any writes
* step-2: transverse the index to read each value
* step-3: read the value from the remote and write it to a new remote addr
* step-4: free all old addr
*/
printf("*********rebuild_index*********\n");
for (int i = 0; i < kSharedNumber; i++) {
m_entity_[i]->rebuild_index();
}
}
LocalEngine::~LocalEngine() {
if (nullptr != m_aes_.key) delete[] m_aes_.key;
if (nullptr != m_aes_.counter) delete[] m_aes_.counter;
if (nullptr != m_aes_.piv) delete[] m_aes_.piv;
if (nullptr != m_aes_.ctx) delete (Ipp8u *)m_aes_.ctx;
m_aes_.key = nullptr;
m_aes_.counter = nullptr;
m_aes_.piv = nullptr;
m_aes_.ctx = nullptr;
}
} // namespace kv
type
// type.h
namespace kv {
using page_id_t = uint16_t;
using index_t = uint16_t;
struct remote_info_t { // 远端内存信息
uint64_t remote_addr;
uint32_t rkey;
uint32_t size;
};
struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};
/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};
const page_id_t kNullPage = 0;
const data_info_t kNullInfo = {0xffff, 0xffff}; // 无效的信息
const int kSharedNumber = 1 << 6; // 缓存实体数目
const uint32_t kBucketNum = 1 << 21; // hash中bucket数
const uint32_t kSlotSize = (1 << 22) * 1.2 * 32 / kSharedNumber; // hash中slot数
const uint64_t kMaxDataSize = (uint64_t)1 << 36; // 测试数据大小
const uint32_t kPageSize = 60 * 1 << 10; // 页容量
const uint32_t kMaxValueSize = 1024; // value最大值
const uint32_t kMinValueSize = 80; // value最小值
const uint32_t kMaxIndex = kPageSize / 96; // 页中最大记录数,96=80+16
const uint64_t kBitmapSize = 1.4 * kMaxDataSize / kSharedNumber / kPageSize; // 运行过程中最大页号
const uint32_t kAllocSize = 1 << 16; // 分配的远端内存大小
const uint32_t kPageThreshold = 1 << 8; // 本地存储页的阈值
const uint32_t kEvictNumber = kPageThreshold * 0.05; // 一次淘汰的页数
const uint32_t kPrintFreq = 1 << 24; // 输出信息频率
const uint64_t kRebuildThreshold = (uint64_t)(1 << 30) * 36 / kSharedNumber; // 重建阈值
const uint32_t kEralyRegisterNumber = (uint64_t)(1 << 30) * 30 / kSharedNumber / kAllocSize;
class hash_map_t {
public:
/* Initialize all the pointers to nullptr. */
hash_map_t();
// index为key对应hash值
/* Find the corresponding key. */
hash_map_slot *find(const std::string &key, int index);
/* Insert into the head of the list. */
void insert(const std::string &key, const data_info_t &info, int index);
data_info_t remove(const std::string &key, int index);
// 只在rebuild时用到
void update(const std::string &key, const data_info_t &info);
private:
hash_map_slot *m_bucket_[kBucketNum];
hash_map_slot *m_slot_head_; // 可用的slot链表头部,用于连接被删除元素的slot
hash_map_slot m_hash_slot_array_[kSlotSize]; // 哈希数组
uint32_t m_slot_cnt_;
};
class Page {
public:
Page(std::string &data); // 以字符串填充页
std::string to_string(); // 页数据转换成字符串
void to_string(char *ptr);
Page() : m_cur_size_{0}, m_cur_index_{0}, m_is_dirty_{true} {};
bool is_full() { return m_cur_size_ > kPageSize; } // 页是否满
uint16_t page_size() { return m_cur_size_; } // 返回当前页大小
index_t record_number() { return m_cur_index_; } // 返回记录数
index_t insert(const std::string &key, const std::string &value) {
m_cur_size_ += 16 + value.size(); // 加上key的长度
m_key_[m_cur_index_] = key;
m_value_[m_cur_index_] = value;
return m_cur_index_++; // 返回当前索引并加一
}
index_t insert(const std::string &key, std::string &&value) {
m_cur_size_ += 16 + value.size(); // 加上key的长度
m_key_[m_cur_index_] = key;
m_value_[m_cur_index_] = std::move(value);
return m_cur_index_++; // 返回当前索引并加一
}
void update(index_t index, const std::string &value) {
m_is_dirty_ = true;
m_cur_size_ -= m_value_[index].size();
m_cur_size_ += value.size(); // 更新页当前大小
m_value_[index] = value;
}
void update(index_t index, std::string &&value) {
m_is_dirty_ = true;
m_cur_size_ -= m_value_[index].size();
m_cur_size_ += value.size(); // 更新页当前大小
m_value_[index] = std::move(value);
}
std::string read_value(index_t index) { return m_value_[index]; }
std::string read_key(index_t index) { return m_key_[index]; }
bool is_dirty() { return m_is_dirty_; }
private:
std::string m_key_[kMaxIndex];
std::string m_value_[kMaxIndex];
uint16_t m_cur_size_; // 当前页数据大小
index_t m_cur_index_; // 当前索引
bool m_is_dirty_;
// std::vector<std::string> m_value_; // 以vector存储key值,方便rebuild时更新元数据
// std::vector<std::string> m_key_; // 以vector存储记录
};
class LRUList {
public:
LRUList() = default;
void insert(page_id_t hit_id) {
m_mutex_.lock();
m_list_.emplace_front(hit_id);
m_speed_map_.insert({hit_id, m_list_.begin()});
m_mutex_.unlock();
}
void update(page_id_t hit_id) {
m_mutex_.lock();
auto iter = m_speed_map_[hit_id];
if (iter != m_list_.begin()) { // 将该页移至队首
m_list_.erase(iter);
m_list_.emplace_front(hit_id);
m_speed_map_[hit_id] = m_list_.begin();
}
m_mutex_.unlock();
}
page_id_t evict() {
m_mutex_.lock();
page_id_t vicitm_page_id = m_list_.back();
m_list_.pop_back();
m_speed_map_.erase(vicitm_page_id);
m_mutex_.unlock();
return vicitm_page_id;
}
std::vector<page_id_t> clear() { // 清空LRU列表所有元素
m_mutex_.lock();
std::vector<page_id_t> res(m_list_.begin(), m_list_.end());
m_list_.clear();
m_speed_map_.clear();
m_mutex_.unlock();
return res;
}
int size() { return m_list_.size(); }
private:
std::mutex m_mutex_;
std::list<page_id_t> m_list_; // LRU列表
std::unordered_map<page_id_t, std::list<page_id_t>::iterator> m_speed_map_; // 加速LRU列表访问
};
} // namespace kv
// type.c
#include <cassert>
#include "type.h"
namespace kv {
hash_map_t::hash_map_t() {
memset(m_bucket_, 0, sizeof(m_bucket_));
m_slot_head_ = &m_hash_slot_array_[0];
m_slot_head_->next = nullptr;
m_slot_cnt_ = 1;
}
/* Find the corresponding key. */
hash_map_slot *hash_map_t::find(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
if (cur == nullptr) {
return nullptr;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
return cur;
}
cur = cur->next;
}
return nullptr;
}
/* Insert into the head of the list. */
void hash_map_t::insert(const std::string &key, const data_info_t &info, int index) {
hash_map_slot *new_slot;
// 优先使用被删除数据的slot
if (m_slot_head_->next == nullptr) {
new_slot = &m_hash_slot_array_[m_slot_cnt_++];
assert(m_slot_cnt_ < kSlotSize);
} else {
new_slot = m_slot_head_->next;
m_slot_head_->next = new_slot->next;
}
new_slot->next = nullptr;
memcpy(new_slot->key, key.c_str(), 16);
new_slot->info = info;
if (!m_bucket_[index]) {
m_bucket_[index] = new_slot;
} else {
/* Insert into the head. */
hash_map_slot *tmp = m_bucket_[index];
m_bucket_[index] = new_slot;
new_slot->next = tmp;
}
}
// 只在rebuild时调用
void hash_map_t::update(const std::string &key, const data_info_t &info) {
int index = std::hash<std::string>()(key) & (kBucketNum - 1);
hash_map_slot *cur = m_bucket_[index];
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
cur->info = info;
return;
}
cur = cur->next;
}
}
data_info_t hash_map_t::remove(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
hash_map_slot *parent = nullptr;
if (cur == nullptr) {
return kNullInfo;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
// 在bucket中删除该slot
if (parent == nullptr) {
m_bucket_[index] = cur->next;
} else {
parent->next = cur->next;
}
// 加入后备链表
cur->next = m_slot_head_->next;
m_slot_head_->next = cur;
return cur->info;
}
parent = cur;
cur = cur->next;
}
return kNullInfo;
}
/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
std::string data;
data.resize(kAllocSize);
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = char_pointer + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
return data;
}
void Page::to_string(char *ptr) {
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(ptr); // 解释为u32指针
u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = ptr + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
}
Page::Page(std::string &data) {
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
m_cur_index_ = u16_pointer[0];
m_cur_size_ = u16_pointer[1];
uint16_t offset, length;
for (uint16_t i = 0; i < m_cur_index_; i++) {
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_key_[i] = data.substr(offset, 16);
m_value_[i] = data.substr(offset + 16, length - 16);
}
m_is_dirty_ = false;
}
} // namespace kv
local_engine_entity
// local_engine_entity.h
#pragma once
#include <condition_variable>
#include <bitset>
#include "type.h"
#include "rdma_conn_manager.h"
#include "rdma_mem_pool.h"
namespace kv {
class LocalEngine; // 前置声明
class LocalEngineEntity {
public:
LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn);
~LocalEngineEntity();
bool write(const std::string &key, const std::string &value, bool use_aes = false);
bool read(const std::string &key, std::string &value);
bool deleteK(const std::string &key);
void rebuild_index();
std::vector<uint64_t> print_memory(); // 输出内存占用信息
private:
void start_thread(); // 启动后台线程
remote_info_t request_signle_info(); // 请求单个远端地址
void register_remote_memory(); // 提前注册远端内存
hash_map_t m_page_map_; // 数据与元数据映射
std::shared_ptr<Page> m_data_map_[kBitmapSize]; // 页号与页映射(数据存于本地)
uint32_t m_cache_size_; // 本地缓存大小
LRUList m_lru_list_; // LRU列表
std::unordered_map<page_id_t, remote_info_t> m_addr_map_; // 页号与远端信息映射(数据存于远端)
uint16_t m_max_index_[kBitmapSize]; // 各个远端页最大索引
std::queue<remote_info_t> m_addr_list_; // 未使用的远端内存
std::mutex m_mutex_; // 保护以上成员
// 注意最大页号不要大于kBitmapSize!
std::bitset<kMaxIndex> m_bitmap_[kBitmapSize]; // 删除为1,正常为0
std::mutex m_same_reader_mutex_[kBitmapSize]; // 提供同一页远程读互斥访问
page_id_t m_cur_page_id_; // 当前使用页号,不存在于LRU列表
std::shared_ptr<Page> m_cur_page_; // 当前使用页
page_id_t m_vicitm_id_; // 淘汰页号
std::shared_ptr<Page> m_vicitm_page_; // 淘汰页号
page_id_t m_last_update_id_; // 上次lru更新的页号
bool m_alive_; // 控制后台进程生命周期
std::condition_variable m_cv_; // 用于唤醒后台进程
std::thread *m_backup_thread_; // 后台进程
std::mutex m_useless_mutex_; // 只是方便调用API,没啥实际意义
LocalEngine *m_engine_;
ConnectionManager *m_rdma_conn_;
RDMAMemPool *m_rdma_mem_pool_;
// 统计数据
uint64_t m_alloc_memory_{0}; // 申请远端内存大小
bool m_rebuild_allow_{true};
bool m_delete_envent_{false};
bool m_rw_envent_after_delete_{false};
};
} // namespace kv
// local_engine_entity.cc
#include <set>
#include <future>
#include <functional>
#include "local_engine_entity.h"
#include "kv_engine.h"
namespace kv {
LocalEngineEntity::LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn) : m_engine_(engine), m_rdma_conn_(rdma_conn) {
m_rdma_mem_pool_ = new RDMAMemPool(m_rdma_conn_);
if (m_rdma_mem_pool_ == nullptr) {
printf("alloc rdma_mem_pool failed");
}
auto page = std::make_shared<Page>(); // 申请第一页
m_cur_page_id_ = 1;
m_cur_page_ = page;
m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_last_update_id_ = kNullPage;
m_data_map_[m_cur_page_id_] = page;
m_cache_size_ = 1;
m_alive_ = true;
// 提前申请远端内存
auto requester = std::thread([&]() { register_remote_memory(); });
requester.detach();
// 启动后台线程
start_thread();
}
// 申请kEralyRegisterNumber个kAllocSize大小的远端内存
void LocalEngineEntity::register_remote_memory() {
std::queue<remote_info_t> list;
remote_info_t remote_info;
remote_info.size = kAllocSize;
for (uint32_t i = 0; i < kEralyRegisterNumber; i++) {
m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
list.emplace(remote_info);
}
m_alloc_memory_ += kAllocSize * kEralyRegisterNumber;
m_mutex_.lock();
while (!m_addr_list_.empty()) {
list.emplace(m_addr_list_.front());
m_addr_list_.pop();
}
m_addr_list_ = std::move(list);
m_mutex_.unlock();
}
remote_info_t LocalEngineEntity::request_signle_info() {
remote_info_t remote_info;
// 先从后备地址列表选择远端地址
if (!m_addr_list_.empty()) {
remote_info = m_addr_list_.front();
m_addr_list_.pop();
return remote_info;
}
// 申请远端内存
remote_info.size = kAllocSize;
m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
m_alloc_memory_ += remote_info.size;
return remote_info;
}
void LocalEngineEntity::start_thread() {
auto backup_func = [&]() {
uint64_t rebuild_threshold = kRebuildThreshold;
std::unique_lock<std::mutex> useless_lock(m_useless_mutex_); // 持有无用锁
while (m_alive_) {
while (m_cache_size_ < kPageThreshold && !(m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_)) { // 若当前大小小于阈值并且不需要重构则休眠
m_cv_.wait(useless_lock);
if (!m_alive_) { // 进程退出,该后台线程也应该退出
return;
}
}
if (m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_) {
rebuild_index();
m_rebuild_allow_ = false;
}
if (m_cache_size_ > kPageThreshold) {
for (uint32_t i = 0; i < kEvictNumber; i++) {
m_mutex_.lock();
page_id_t victim_id = m_lru_list_.evict();
std::shared_ptr<Page> victim_page = m_data_map_[victim_id];
remote_info_t info = request_signle_info();
// 设置淘汰页信息
m_data_map_[victim_id] = nullptr;
m_cache_size_--;
m_addr_map_[victim_id] = info;
m_max_index_[victim_id] = victim_page->record_number();
m_mutex_.unlock();
std::string &&vicitm_page_data = victim_page->to_string(); // 记录淘汰页数据,开始写入远端内存
// 将本地数据写入远端内存
m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), kAllocSize, info.remote_addr, info.rkey);
}
}
}
};
m_backup_thread_ = new std::thread(backup_func);
}
LocalEngineEntity::~LocalEngineEntity() {
m_alive_ = false;
m_cv_.notify_one();
m_backup_thread_->join();
delete m_backup_thread_;
delete m_rdma_mem_pool_;
}
bool LocalEngineEntity::write(const std::string &key, const std::string &value, bool use_aes) {
if (m_rebuild_allow_ && m_delete_envent_) {
m_rw_envent_after_delete_ = true;
m_cv_.notify_one();
}
// 区分加密与非加密
std::string encrypt_value;
if (use_aes) {
m_engine_->encrypted(value, encrypt_value);
}
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
m_mutex_.lock();
auto slot = m_page_map_.find(key, hash_index);
// 第一次写入或该页正在写入远端或该页正在远端
if (slot == nullptr || m_data_map_[slot->info.page_id] == nullptr) {
if (m_cur_page_->is_full()) { // 该页已满不可用
m_lru_list_.insert(m_cur_page_id_); // 直到页满才插入LRU列表
m_cur_page_id_++;
m_data_map_[m_cur_page_id_] = std::make_shared<Page>();
m_cur_page_ = m_data_map_[m_cur_page_id_];
m_cache_size_++;
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}
index_t index;
if (use_aes) {
index = m_cur_page_->insert(key, std::move(encrypt_value));
} else {
index = m_cur_page_->insert(key, value);
}
data_info_t info = {m_cur_page_id_, index};
if (slot == nullptr) { // 第一次写入,插入元数据
m_page_map_.insert(key, info, hash_index); // 插入key与元数据映射
} else {
m_bitmap_[slot->info.page_id].set(slot->info.index, true); // 将之前数据标记为删除
slot->info = info; // 更新元数据信息
}
m_mutex_.unlock();
} else { // 该页在本地且未被淘汰,更新页数据
data_info_t info = slot->info;
if (use_aes) {
m_data_map_[info.page_id]->update(info.index, std::move(encrypt_value));
} else {
m_data_map_[info.page_id]->update(info.index, value);
}
m_mutex_.unlock();
// 更新LRU列表
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}
}
return true;
}
bool LocalEngineEntity::read(const std::string &key, std::string &value) {
if (m_rebuild_allow_ && m_delete_envent_) {
m_rw_envent_after_delete_ = true;
m_cv_.notify_one();
}
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
bool need_update_lru = true;
m_mutex_.lock();
auto slot = m_page_map_.find(key, hash_index);
if (slot == nullptr) { // 元数据不存在
m_mutex_.unlock();
return false;
}
data_info_t info = slot->info;
if (info.page_id == m_vicitm_id_) {
value = m_vicitm_page_->read_value(info.index);
m_mutex_.unlock();
return true;
}
if (m_data_map_[info.page_id] == nullptr) { // 数据在远端内存
m_mutex_.unlock();
m_same_reader_mutex_[info.page_id].lock(); // 读者互斥
if (m_data_map_[info.page_id] == nullptr) { // 双重检查
auto remote_info = m_addr_map_[info.page_id];
std::string &&page_data = std::string(remote_info.size, '0');
m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
auto new_page = std::make_shared<Page>(page_data); // 构建缓存页
m_mutex_.lock();
m_data_map_[info.page_id] = new_page;
m_cache_size_++;
m_lru_list_.insert(info.page_id);
need_update_lru = false;
m_addr_map_.erase(info.page_id); // 在远端地址映射中删除该项
m_addr_list_.emplace(remote_info); // 将该页加入地址列表
m_mutex_.unlock();
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}
m_same_reader_mutex_[info.page_id].unlock();
m_mutex_.lock();
}
value = m_data_map_[info.page_id]->read_value(info.index);
m_mutex_.unlock();
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}
return true;
}
bool LocalEngineEntity::deleteK(const std::string &key) {
m_delete_envent_ = true;
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
m_mutex_.lock();
data_info_t info = m_page_map_.remove(key, hash_index); // 删除对应key的元数据
m_mutex_.unlock();
m_bitmap_[info.page_id].set(info.index, true); // 将对应记录标记为删除
return true;
}
void LocalEngineEntity::rebuild_index() {
std::lock_guard<std::mutex> lk(m_mutex_);
std::string key;
std::string value;
data_info_t data_info;
std::shared_ptr<Page> page;
std::unordered_map<page_id_t, remote_info_t> tmp_addr_map; // 暂时存储页号与远端地址映射
uint32_t new_cache_size = 1;
std::vector<page_id_t> local_id = m_lru_list_.clear();
auto new_page = m_cur_page_;
page_id_t new_page_id = m_cur_page_id_;
// 处理本地缓存页
for (auto &page_id : local_id) {
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
m_lru_list_.insert(page_id);
new_cache_size++;
} else if (!bitmap.all()) { //存在有效记录
auto page = m_data_map_[page_id];
int record_num = page->record_number();
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) { // 该记录未被删除
if (new_page->is_full()) { // 页满,写入本地
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
key = page->read_key(i);
data_info.index = new_page->insert(key, page->read_value(i));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
m_data_map_[page_id] = nullptr; // 删除原先页
} else {
m_data_map_[page_id] = nullptr; // 删除原先页
}
}
page_id_t page_id;
remote_info_t info;
char head_data[kMaxIndex * 10];
char kv_data[2 * kMaxValueSize];
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
uint16_t head_length;
uint16_t offset, length;
// 处理远端内存页
for (auto &kv : m_addr_map_) {
page_id = kv.first;
info = kv.second;
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
tmp_addr_map.insert({page_id, info});
} else if (bitmap.all()) { // 不存在有效记录,将后端地址加入地址列表
m_addr_list_.emplace(info);
} else {
bool avai_info = true; // 是否将该远端地址加入地址列表
head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
// 读取页头部数据
m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
int record_num = m_max_index_[page_id];
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) {
if (new_page->is_full()) {
if (new_cache_size > kPageThreshold) { // 本地页满,写入远程
std::string &&page_data = new_page->to_string();
uint32_t len = page_data.length();
m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
tmp_addr_map.insert({new_page_id, info}); // 暂时记录页号与远程地址映射
avai_info = false;
} else { // 写入本地缓存
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
}
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
key = std::string(kv_data, kv_data + 16);
value = std::string(kv_data + 16, kv_data + length);
data_info.index = new_page->insert(key, std::move(value));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
if (avai_info) {
m_addr_list_.emplace(info);
}
}
}
m_addr_map_ = std::move(tmp_addr_map);
m_cur_page_id_ = new_page_id;
m_cur_page_ = new_page;
m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_cache_size_ = new_cache_size;
m_last_update_id_ = kNullPage;
m_data_map_[new_page_id] = new_page;
}
std::vector<uint64_t> LocalEngineEntity::print_memory() {
// 输出一系列统计数据
uint64_t key_metadata = kSlotSize * 32 + kBucketNum * 8;
uint64_t page_id_size = m_cur_page_id_;
uint64_t page_metadata = page_id_size * 48 + kBitmapSize * 1.2 * kMaxIndex;
uint64_t page_size = m_cache_size_ * (kPageSize);
return {key_metadata, page_id_size, page_metadata, page_size, m_alloc_memory_};
}
} // namespace kv