天池比赛记录

发布于:2022-11-27 ⋅ 阅读:(334) ⋅ 点赞:(0)

赛题简单介绍

比赛地址:第四届全球数据库大赛赛道1:云原生共享内存数据库性能优化

赛题大致内容:
本地读写速度快,但空间小,远端读写速度慢,但空间大(通过eRDMA读写远端数据)
初赛时实现一个简化、高效的KV存储引擎,支持Write、Read接口,此时key-value皆为定值
复赛额外实现一个Delete接口和重建(rebuild)功能,此时value为变长值。
评测程序分为2个阶段:
1)程序正确性验证
验证KV操作的正确性(包括加密/解密过程),这部分的耗时不计入运行时间的统计。如果正确性测试不通过,则终止,评测失败。
2)性能评测
引擎使用的本地内存和远端内存限制在 8 GB 和 32 GB。 阶段1. 每个线程分别写入约 12 M个Key大小为 16 Bytes,Value大小为 80-1024 Bytes 的 KV对象,并选择性读取验证;阶段2. 每个线程会进行并发删除,每个线程删除 10 M个Key,删除操作耗时将计入运行时间;阶段3. 每个线程分别再次写入约 10 M个Key大小为 16 Bytes,Value大小为 80-256 Bytes 的 KV对象;接着会进行读写混合测试,开启16个线程以75%:25%的读写比例调用64M次。其中75%的读访问具有热点的特征,大部分的读访问集中在少量的Key上面。最后的分数为以上操作耗时的总和。

数据安排如下:本阶段保证任意时刻数据的value部分长度和不超过30G。纯写入的12M次操作中大约70%的操作Value长度在80-128Bytes之间;大约20%的操作Value长度在129-256Bytes之间;大约10%的操作Value长度在257-1024Bytes之间。读写混合的64M操作中,所有Set操作的Value长度均不超过128Bytes。
在这里插入图片描述
评测程序输出大致如下:

Start local encryption evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh

Starting Write-Read Testing.
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 4
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 13
##################### Start Write from index: 14
##################### Start Write from index: 15
##################### End Write from index: 0
##################### End Write from index: 13
##################### End Write from index: 14
##################### End Write from index: 15
##################### End Write from index: 11
##################### End Write from index: 4
##################### End Write from index: 8
##################### End Write from index: 5
##################### End Write from index: 2
##################### End Write from index: 10
##################### End Write from index: 9
##################### End Write from index: 3
##################### End Write from index: 6
##################### End Write from index: 7
##################### End Write from index: 1
##################### End Write from index: 12
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 6
##################### Start Read from index: 7
##################### Start Read from index: 8
##################### Start Read from index: 9
##################### Start Read from index: 10
##################### Start Read from index: 12
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 15
##################### Start Read from index: 11
##################### End Read from index: 0
##################### End Read from index: 13
##################### End Read from index: 14
##################### End Read from index: 15
##################### End Read from index: 11
##################### End Read from index: 4
##################### End Read from index: 8
##################### End Read from index: 5
##################### End Read from index: 2
##################### End Read from index: 10
##################### End Read from index: 9
##################### End Read from index: 3
##################### End Read from index: 7
##################### End Read from index: 1
##################### End Read from index: 6
##################### End Read from index: 12
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 14
##################### Start Write from index: 13
##################### Start Write from index: 15
##################### Start Write from index: 4
##################### End Write from index: 12
##################### End Write from index: 11
##################### End Write from index: 14
##################### End Write from index: 0
##################### End Write from index: 4
##################### End Write from index: 1
##################### End Write from index: 8
##################### End Write from index: 13
##################### End Write from index: 3
##################### End Write from index: 7
##################### End Write from index: 10
##################### End Write from index: 9
##################### End Write from index: 6
##################### End Write from index: 15
##################### End Write from index: 5
##################### End Write from index: 2
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 7
##################### Start Read from index: 6
##################### Start Read from index: 8
##################### Start Read from index: 10
##################### Start Read from index: 11
##################### Start Read from index: 12
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 15
##################### Start Read from index: 9
##################### End Read from index: 12
##################### End Read from index: 11
##################### End Read from index: 14
##################### End Read from index: 0
##################### End Read from index: 4
##################### End Read from index: 1
##################### End Read from index: 8
##################### End Read from index: 13
##################### End Read from index: 3
##################### End Read from index: 7
##################### End Read from index: 10
##################### End Read from index: 9
##################### End Read from index: 6
##################### End Read from index: 15
##################### End Read from index: 5
##################### End Read from index: 2
##################### End The Write-Read Test ##############################
Time for Write-Read Test 32.000000 seconds
LocalEngine::stop finsh
##################### Evaluation Success ##############################
Start local perf evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh

Starting Write-Read Testing.
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 4
##################### Start Write from index: 3
##################### Start Write from index: 5
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 12
##################### Start Write from index: 13
##################### Start Write from index: 14
##################### Start Write from index: 15
##################### End Write from index: 7
##################### End Write from index: 8
##################### End Write from index: 1
##################### End Write from index: 14
##################### End Write from index: 10
##################### End Write from index: 12
##################### End Write from index: 3
##################### End Write from index: 6
##################### End Write from index: 11
##################### End Write from index: 13
##################### End Write from index: 15
##################### End Write from index: 9
##################### End Write from index: 0
##################### End Write from index: 5
##################### End Write from index: 2
##################### End Write from index: 4
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 5
##################### Start Read from index: 7
##################### Start Read from index: 9
##################### Start Read from index: 6
##################### Start Read from index: 11
##################### Start Read from index: 8
##################### Start Read from index: 13
##################### Start Read from index: 14
##################### Start Read from index: 12
##################### Start Read from index: 15
##################### Start Read from index: 10
##################### End Read from index: 7
##################### End Read from index: 8
##################### End Read from index: 1
##################### End Read from index: 14
##################### End Read from index: 10
##################### End Read from index: 12
##################### End Read from index: 3
##################### End Read from index: 6
##################### End Read from index: 11
##################### End Read from index: 13
##################### End Read from index: 15
##################### End Read from index: 9
##################### End Read from index: 0
##################### End Read from index: 5
##################### End Read from index: 2
##################### End Read from index: 4
##################### Start Write from index: 0
##################### Start Write from index: 1
##################### Start Write from index: 2
##################### Start Write from index: 3
##################### Start Write from index: 4
##################### Start Write from index: 6
##################### Start Write from index: 7
##################### Start Write from index: 8
##################### Start Write from index: 9
##################### Start Write from index: 10
##################### Start Write from index: 11
##################### Start Write from index: 13
##################### Start Write from index: 12
##################### Start Write from index: 14
##################### Start Write from index: 5
##################### Start Write from index: 15
##################### End Write from index: 13
##################### End Write from index: 12
##################### End Write from index: 5
##################### End Write from index: 14
##################### End Write from index: 7
##################### End Write from index: 6
##################### End Write from index: 15
##################### End Write from index: 2
##################### End Write from index: 10
##################### End Write from index: 4
##################### End Write from index: 11
##################### End Write from index: 3
##################### End Write from index: 9
##################### End Write from index: 0
##################### End Write from index: 8
##################### End Write from index: 1
##################### Start Read from index: 0
##################### Start Read from index: 1
##################### Start Read from index: 2
##################### Start Read from index: 3
##################### Start Read from index: 4
##################### Start Read from index: 6
##################### Start Read from index: 7
##################### Start Read from index: 8
##################### Start Read from index: 9
##################### Start Read from index: 10
##################### Start Read from index: 12
##################### Start Read from index: 14
##################### Start Read from index: 13
##################### Start Read from index: 5
##################### Start Read from index: 11
##################### Start Read from index: 15
##################### End Read from index: 13
##################### End Read from index: 12
##################### End Read from index: 5
##################### End Read from index: 14
##################### End Read from index: 7
##################### End Read from index: 6
##################### End Read from index: 15
##################### End Read from index: 2
##################### End Read from index: 10
##################### End Read from index: 4
##################### End Read from index: 11
##################### End Read from index: 3
##################### End Read from index: 9
##################### End Read from index: 0
##################### End Read from index: 8
##################### End Read from index: 1
##################### Start Update from index: 0
##################### Start Update from index: 1
##################### Start Update from index: 2
##################### Start Update from index: 3
##################### Start Update from index: 4
##################### Start Update from index: 5
##################### Start Update from index: 7
##################### Start Update from index: 6
##################### Start Update from index: 8
##################### Start Update from index: 10
##################### Start Update from index: 11
##################### Start Update from index: 12
##################### Start Update from index: 9
##################### Start Update from index: 13
##################### Start Update from index: 14
##################### Start Update from index: 15
##################### After updating, Start Reading from index: 15
##################### End Read in Updating from index: 15
##################### After updating, Start Reading from index: 10
##################### End Read in Updating from index: 10
##################### After updating, Start Reading from index: 7
##################### End Read in Updating from index: 7
##################### After updating, Start Reading from index: 5
##################### End Read in Updating from index: 5
##################### After updating, Start Reading from index: 4
##################### End Read in Updating from index: 4
##################### After updating, Start Reading from index: 11
##################### End Read in Updating from index: 11
##################### After updating, Start Reading from index: 8
##################### End Read in Updating from index: 8
##################### After updating, Start Reading from index: 1
##################### End Read in Updating from index: 1
##################### After updating, Start Reading from index: 0
##################### End Read in Updating from index: 0
##################### After updating, Start Reading from index: 6
##################### End Read in Updating from index: 6
##################### After updating, Start Reading from index: 2
##################### End Read in Updating from index: 2
##################### After updating, Start Reading from index: 3
##################### End Read in Updating from index: 3
##################### After updating, Start Reading from index: 9
##################### End Read in Updating from index: 9
##################### After updating, Start Reading from index: 12
##################### End Read in Updating from index: 12
##################### After updating, Start Reading from index: 14
##################### End Read in Updating from index: 14
##################### After updating, Start Reading from index: 13
##################### End Read in Updating from index: 13
##################### End The Write-Read Test ##############################
Time for Write-Read Test 158.000000 seconds

Starting Deleting Testing.
##################### Start Delete from index: 0
##################### Start Delete from index: 1
##################### Start Delete from index: 2
##################### Start Delete from index: 3
##################### Start Delete from index: 4
##################### Start Delete from index: 6
##################### Start Delete from index: 7
##################### Start Delete from index: 5
##################### Start Delete from index: 9
##################### Start Delete from index: 10
##################### Start Delete from index: 11
##################### Start Delete from index: 14
##################### Start Delete from index: 8
##################### Start Delete from index: 15
##################### Start Delete from index: 13
##################### Start Delete from index: 12
##################### After Deleting, Start Reading from index: 7
##################### After Deleting, Start Reading from index: 8
##################### After Deleting, Start Reading from index: 1
##################### After Deleting, Start Reading from index: 14
##################### After Deleting, Start Reading from index: 12
##################### After Deleting, Start Reading from index: 10
##################### After Deleting, Start Reading from index: 6
##################### After Deleting, Start Reading from index: 3
##################### After Deleting, Start Reading from index: 11
##################### After Deleting, Start Reading from index: 15
##################### After Deleting, Start Reading from index: 13
##################### After Deleting, Start Reading from index: 9
##################### After Deleting, Start Reading from index: 0
##################### After Deleting, Start Reading from index: 5
##################### After Deleting, Start Reading from index: 2
##################### After Deleting, Start Reading from index: 4
##################### ReInserting from index: 8
##################### ReInserting from index: 7
##################### ReInserting from index: 1
##################### ReInserting from index: 12
##################### ReInserting from index: 6
##################### ReInserting from index: 11
##################### ReInserting from index: 10
##################### ReInserting from index: 13
##################### ReInserting from index: 15
##################### ReInserting from index: 9
##################### ReInserting from index: 14
##################### ReInserting from index: 0
##################### ReInserting from index: 3
##################### ReInserting from index: 5
##################### ReInserting from index: 2
##################### ReInserting from index: 4
##################### End Delete Test from index: 11
##################### End Delete Test from index: 13
##################### End Delete Test from index: 8
##################### End Delete Test from index: 5
##################### End Delete Test from index: 1
##################### End Delete Test from index: 7
##################### End Delete Test from index: 6
##################### End Delete Test from index: 10
##################### End Delete Test from index: 0
##################### End Delete Test from index: 14
##################### End Delete Test from index: 2
##################### End Delete Test from index: 9
##################### End Delete Test from index: 3
##################### End Delete Test from index: 15
##################### End Delete Test from index: 12
##################### End Delete Test from index: 4
##################### End The Delete Test ##############################
Time for Delete Test 47.000000 seconds

Starting HOT Data Testing.
##################### Start test from index: 0
##################### Start test from index: 1
##################### Start test from index: 2
##################### Start test from index: 3
##################### Start test from index: 4
##################### Start test from index: 6
##################### Start test from index: 5
##################### Start test from index: 8
##################### Start test from index: 9
##################### Start test from index: 10
##################### Start test from index: 11
##################### Start test from index: 12
##################### Start test from index: 13
##################### Start test from index: 7
##################### Start test from index: 14
##################### Start test from index: 15
##################### End The Hot Data Test ##############################
Time for Hot Data Test 46.000000 seconds
Hot:46
LocalEngine::stop finsh
Total:253
##################### Evaluation Success ##############################
Success evaluation, update score...37.2979146325685141834147

复赛排名第20名,正好是极客奖最后一名,嘻嘻。

比赛经历

在初赛时官方提供了一个简单的demo,将key和远端地址存于本地,value全部存于远端,初赛结束时我们的代码大致架构为:

key-value数据以页面的方式存储起来,本地存储key的元数据(key,(页号,索引))(5G),缓存少量页(2G),远端页的远端地址,远端存储大部分页数据(30G)。
写入时:
	插入:将数据插入新申请的页中,写入元数据(key,(页号,页索引))
	更新:若对应页面当前存于远端,则视作插入操作处理,并更新key元数据,
		 若对应页面存于本地,则直接更新value
	更新LRU列表
读取时:
	若页面存于远端,则读取远端数据,将该页加入本地缓存
	若页面存于本地,则直接读取数据
	更新LRU列表
淘汰:
	开启一个后台线程,当本地缓存页大于阈值时,将最久未被访问的页写入远端,记录远端地址

哈希:
	为减小锁争用,我们构建了许多个执行请求的实体,并通过对key进行哈希将请求分发至某一实体
	(LocalEngine->LocalEngineEntity),而后对key进行第二次哈希,写入/读取key的元数据。
	由于STL的map占用空间较大,官方提供了哈希表的简单实现,直接刚开始就申请足够的空间,然后使
	用拉链法连接哈希值相同的数据,不需要动态扩容。

在这里插入图片描述
初赛结束时,我们只得了9分,最大的原因在于第一次哈希与第二次哈希使用同样的哈希函数(std::hash),导致LocalEngineEntity里的自定义哈希表中很大的一部分空间永远不会被访问(哈希值皆为LocalEngineEntity下标的整数),增大了哈希冲突的概率。
在初赛的基础上编写复赛代码,主要实现三个功能:value的加密,删除操作,重构操作(删除被标记为无效的数据,整理有效数据使其排列更紧凑)。
加密:value的加密根据IPP-Crypto的接口简单实现一个加密算法即可,没有几行代码。

bool LocalEngine::set_aes() {
  // Current algorithm is not supported, just for demonstration.
  m_aes_.algo = CBC;
  m_aes_.key_len = 16;
  m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
  if (m_aes_.key == nullptr) return false;
  m_aes_.blk_size = 16;
  m_aes_.piv_len = 16;
  m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
  if (m_aes_.piv == nullptr) return false;

  int ctxSize;               // AES context size
  ippsAESGetSize(&ctxSize);  // evaluating AES context size
  // allocatting memory for AES context
  m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
  // AES context initialization
  ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
  return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
  Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
  // encrypting plaintext
  ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
  std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
  encrypt_value = std::move(tmp);
  return true;
}

删除: 代码的数据通路为:key——(page, index) ——本地缓存m_data_map——远端地址m_addr_map。故实现删除操作首先需要将key—>(page, index)的映射删除,这个只需要增加自定义哈希表的删除功能,注意将删除后的slot插入另一个链表中,以便复用该slot

data_info_t hash_map_t::remove(const std::string &key, int index) {
  hash_map_slot *cur = m_bucket_[index];
  hash_map_slot *parent = nullptr;
  if (cur == nullptr) {
    return kNullInfo;
  }
  while (cur) {
    if (memcmp(cur->key, key.c_str(), 16) == 0) {
      // 在bucket中删除该slot
      if (parent == nullptr) {
        m_bucket_[index] = cur->next;
      } else {
        parent->next = cur->next;
      }
      // 加入后备链表
      cur->next = m_slot_head_->next;
      m_slot_head_->next = cur;
      return cur->info;
    }
    parent = cur;
    cur = cur->next;
  }
  return kNullInfo;
}

将该映射删除后,无法通过key访问相应的value,但value仍然占据存储空间,故需要标记该位置,表示该value已经被删除,在重构操作时不需要迁移该位置的数据。

std::bitset<kMaxIndex> m_bitmap_[kBitmapSize];  // 删除为1,正常为0

每一页增加一个位图,标记页中记录是否有效。其中kMaxIndex表示页中最大记录数,kBitmapSize表示运行过程中的最大页号。
与位图相关的另一个操作是更新操作,如果更新操作对应的数据当前在远端,若此时读取远端数据再进行本地更新效率太低;故将这个操作拆分为删除远端数据+插入新数据;这时也需要将远端数据标记为无效。

bool LocalEngineEntity::deleteK(const std::string &key) {
  m_delete_envent_ = true;
  int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
  m_mutex_.lock();
  data_info_t info = m_page_map_.remove(key, hash_index);  // 删除对应key的元数据
  m_mutex_.unlock();
  m_bitmap_[info.page_id].set(info.index, true);  // 将对应记录标记为删除
  return true;
}
// 数据在远端的更新操作
m_bitmap_[slot->info.page_id].set(slot->info.index, true);  // 将之前数据标记为删除
slot->info = info;                                          // 更新元数据信息

重构:
本地缓存与远端数据交互的基本单位是页,程序运行过程中,无效记录会越来越多,故需定时读取所有页,将有效记录写入到新页中,删除旧页,类似于一种垃圾回收。
在读取远端页时,先读取其头部元数据,再依次读取有效记录,而不是读取整个页数据,这是因为rebuild时远端页有效记录占比较小,这样的读取方式可以减小读取量。

void LocalEngineEntity::rebuild_index() {
  std::lock_guard<std::mutex> lk(m_mutex_);
  std::string key;
  std::string value;
  data_info_t data_info;
  std::shared_ptr<Page> page;
  std::unordered_map<page_id_t, remote_info_t> tmp_addr_map;  // 暂时存储页号与远端地址映射
  uint32_t new_cache_size = 1;
  std::vector<page_id_t> local_id = m_lru_list_.clear();
  auto new_page = m_cur_page_;
  page_id_t new_page_id = m_cur_page_id_;
  // 处理本地缓存页
  for (auto &page_id : local_id) {
    auto &bitmap = m_bitmap_[page_id];
    if (bitmap.none()) {  // 不存在删除的记录,不进行操作
      m_lru_list_.insert(page_id);
      new_cache_size++;
    } else if (!bitmap.all()) {  //存在有效记录
      auto page = m_data_map_[page_id];
      int record_num = page->record_number();
      for (int i = 0; i < record_num; i++) {
        if (!bitmap.test(i)) {        // 该记录未被删除
          if (new_page->is_full()) {  // 页满,写入本地
            m_data_map_[new_page_id] = new_page;
            new_cache_size++;
            m_lru_list_.insert(new_page_id);
            new_page = std::make_shared<Page>();
            new_page_id++;
          }
          // 读出数据插入新页并更新元数据映射
          key = page->read_key(i);
          data_info.index = new_page->insert(key, page->read_value(i));
          data_info.page_id = new_page_id;
          m_page_map_.update(key, data_info);
        }
      }
      m_data_map_[page_id] = nullptr;  // 删除原先页
    } else {
      m_data_map_[page_id] = nullptr;  // 删除原先页
    }
  }
  page_id_t page_id;
  remote_info_t info;
  char head_data[kMaxIndex * 10];
  char kv_data[2 * kMaxValueSize];
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
  uint16_t head_length;
  uint16_t offset, length;

  // 处理远端内存页
  for (auto &kv : m_addr_map_) {
    page_id = kv.first;
    info = kv.second;
    auto &bitmap = m_bitmap_[page_id];
    if (bitmap.none()) {  // 不存在删除的记录,不进行操作
      tmp_addr_map.insert({page_id, info});
    } else if (bitmap.all()) {  // 不存在有效记录,将后端地址加入地址列表
      m_addr_list_.emplace(info);
    } else {
      bool avai_info = true;  // 是否将该远端地址加入地址列表
      head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
      // 读取页头部数据
      m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
      int record_num = m_max_index_[page_id];
      for (int i = 0; i < record_num; i++) {
        if (!bitmap.test(i)) {
          if (new_page->is_full()) {
            if (new_cache_size > kPageThreshold) {  // 本地页满,写入远程
              std::string &&page_data = new_page->to_string();
              uint32_t len = page_data.length();
              m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
              tmp_addr_map.insert({new_page_id, info});  // 暂时记录页号与远程地址映射
              avai_info = false;
            } else {  // 写入本地缓存
              m_data_map_[new_page_id] = new_page;
              new_cache_size++;
              m_lru_list_.insert(new_page_id);
            }
            new_page = std::make_shared<Page>();
            new_page_id++;
          }
          // 读出数据插入新页并更新元数据映射
          offset = u16_pointer[i + 2];
          length = u16_pointer[i + 3] - u16_pointer[i + 2];
          m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
          key = std::string(kv_data, kv_data + 16);
          value = std::string(kv_data + 16, kv_data + length);
          data_info.index = new_page->insert(key, std::move(value));
          data_info.page_id = new_page_id;
          m_page_map_.update(key, data_info);
        }
      }
      if (avai_info) {
        m_addr_list_.emplace(info);
      }
    }
  }
  m_addr_map_ = std::move(tmp_addr_map);
  m_cur_page_id_ = new_page_id;
  m_cur_page_ = new_page;
  m_vicitm_id_ = kNullPage;
  m_vicitm_page_ = nullptr;
  m_cache_size_ = new_cache_size;
  m_last_update_id_ = kNullPage;
  m_data_map_[new_page_id] = new_page;
}

一些细节

变长字符编码方式演化

auto &&vicitm_page_data = victim_page->to_string();  // 记录淘汰页数据,开始写入远端内存
m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), len, remote_info.remote_addr,
                                           remote_info.rkey);  

kv数据在本地缓存是以string数组(Page类)的形式存储的,当本地缓存达到阈值时,需将很久未访问的页写入到远端,此时是写一个大字符串;故需要将string数组转换为一个大字符串。由于之后有可能再访问该页,需要把各记录的大小也编码进字符串中。刚开始我将每个记录编码成记录大小 +‘\0’ +记录内容的形式。

/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小 '\0' 记录内容
记录大小 '\0' 记录内容
...
'\0'(页结束标志)
*/
class Page {
 public:
  Page(std::string &data);  // 以字符串填充页
  std::string to_string();  // 页数据转换成字符串
 private:
  int get_size(const std::string &data, int &start);  // 字符串转数字
  std::string get_string(int size);                   // 数字转字符串

  std::vector<std::string> m_value_;  // 以vector存储记录
  std::vector<std::string> m_key_;    // 以vector存储记录
  uint32_t m_cur_size_;               // 当前页数据大小
  uint32_t m_cur_index_;              // 当前索引
};

// 将start为起点的字符串转换成数字
int Page::get_size(const std::string &data, int &start) {
  std::string size_str = "";
  int value_size = 0;
  while (true) {
    if (data[start] != '\0') {  // 未遇到结束标志,加入字符串
      size_str.push_back(data[start]);
    } else {
      if (!size_str.empty()) {  // 字符串不为空,转换成数字
        value_size = std::stoi(size_str);
      }
      break;
    }
    start++;
  }
  start++;            // 跳过当前的结束字符
  return value_size;  // 页末尾结束字符返回0
}

// 将数字转换成字符串并在其后填充结束字符
std::string Page::get_string(int size) {
  auto res = std::to_string(size);
  res.push_back('\0');
  return res;
}

Page::Page(std::string &data) {
  int start = 0;
  int value_num = get_size(data, start);  // 读取记录数
  int page_size = get_size(data, start);  // 读取页大小
  m_key_.reserve(value_num);
  m_value_.reserve(value_num);

  m_cur_index_ = value_num;
  m_cur_size_ = page_size;
  std::string kv;
  int value_size;
  while (true) {  // 循环读取记录
    value_size = get_size(data, start);
    if (value_size <= 0) {  // 遇到页末尾结束字符则返回
      break;
    }
    kv = data.substr(start, value_size);
    start += value_size;
    m_key_.emplace_back(kv.substr(0, 16));
    m_value_.emplace_back(kv.substr(16));
  }
}

std::string Page::to_string() {
  assert(this != nullptr);
  std::string data;
  data.reserve(m_cur_size_ + m_cur_index_ * 6);  // 提前预订字符串空间
  data.append(get_string(m_cur_index_));         // 加入记录数
  data.append(get_string(m_cur_size_));          // 加入页大小
  assert(m_cur_index_ == m_key_.size());
  assert(m_key_.size() == m_value_.size());
  for (uint32_t i = 0; i < m_cur_index_; i++) {
    data.append(get_string(16 + m_value_[i].length()));  // 加入记录大小
    data.append(m_key_[i]);                              // 加入记录数据
    data.append(m_value_[i]);
  }
  data.push_back('\0');  // 页末尾填充结束字符
  return data;
}

而后稍微改变一下编码方式,将记录大小全部放在头部,记录数据放在尾部。

/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小'\0'
记录大小'\0'
记录内容(key value)
记录内容(key value)

保存key值以便在rebuild时能够更改元数据信息
...
*/
class Page {
 public:
  Page();
  Page(std::string &data);            // 以字符串填充页
  std::string to_string();            // 页数据转换成字符串
 private:
  std::string get_string(int size);  // 数字转字符串

  std::string m_key_[kMaxIndex];
  std::string m_value_[kMaxIndex];
  uint32_t m_cur_size_;   // 当前页数据大小
  uint32_t m_cur_index_;  // 当前索引

  // std::vector<std::string> m_value_;  // 以vector存储key值,方便rebuild时更新元数据
  // std::vector<std::string> m_key_;    // 以vector存储记录
};

std::string Page::to_string() {
  std::string data;
  data.reserve(m_cur_size_ + m_cur_index_ * 4);  // 提前预订字符串空间
  data.append(get_string(m_cur_index_));         // 加入记录数
  data.append(get_string(m_cur_size_));          // 加入页大小
  for (uint32_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录大小
    data.append(get_string(16 + m_value_[i].size()));
  }
  for (uint32_t i = 0; i < m_cur_index_; i++) {
    data.append(m_key_[i]);  // 加入记录数据
    data.append(m_value_[i]);
  }
  return data;
}
Page::Page(std::string &str) {
  std::string::size_type pos;
  std::string::size_type size = str.size();
  // 获取页大小与最大记录数
  int start = 0;
  pos = str.find('\0', start);
  m_cur_index_ = std::stoi(str.substr(start, pos - start));
  start = pos + 1;
  pos = str.find('\0', start);
  m_cur_size_ = std::stoi(str.substr(start, pos - start));
  start = pos + 1;
  // 获取各个记录长度
  std::vector<int> record_lengths(m_cur_index_);
  for (uint32_t i = 0; i < m_cur_index_; i++) {
    pos = str.find('\0', start);
    record_lengths[i] = std::stoi(str.substr(start, pos - start));
    start = pos + 1;
  }
  // 获取各个记录内容
  int record_num = record_lengths.size();
  for (int i = 0; i < record_num; i++) {
    m_key_[i] = str.substr(start, 16);
    m_value_[i] = str.substr(start + 16, record_lengths[i] - 16);
    start += record_lengths[i];
  }
}

这时候编码解码比之前稍微简单一点(去掉了get_size函数),但将长度编码进字符串的方式还是觉得有点低效。后面一想,为什么不直接把大字符串视作一个整数数组,对数组元素赋值就可以了,也就是说 123没必要转换成’1’ ‘2’ '3’后存入数组(3个字节),而是直接对一个short类型(2字节)的数赋值。 这样就不再需要编码额外的结束字符作为标记了。

字符数组的头部存储各种元数据,尾部存储实际key-value数据。

/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
  std::string data;
  data.resize(kAllocSize);
  uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);         // 从该偏移开始存储实际的数据
  char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针

  u16_pointer[0] = m_cur_index_;
  u16_pointer[1] = m_cur_size_;
  uint16_t offset = data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
    u16_pointer[i + 2] = offset;
    offset += 16 + m_value_[i].size();
  }
  u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
  char *p = char_pointer + data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {
    memcpy(p, m_key_[i].c_str(), 16);
    memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
    p += 16 + m_value_[i].size();
  }
  return data;
}
Page::Page(std::string &data) {
  char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
  m_cur_index_ = u16_pointer[0];
  m_cur_size_ = u16_pointer[1];
  uint16_t offset, length;
  for (uint16_t i = 0; i < m_cur_index_; i++) {
    offset = u16_pointer[i + 2];
    length = u16_pointer[i + 3] - u16_pointer[i + 2];
    m_key_[i] = data.substr(offset, 16);
    m_value_[i] = data.substr(offset + 16, length - 16);
  }
  m_is_dirty_ = false;
}

#pragma pack使用错误

本地缓存总共8G空间,有5G是存储key的元数据。其结构体定义如下:

using page_id_t = uint16_t;
using index_t = uint16_t;

struct data_info_t {  // 数据信息
  page_id_t page_id;  // 页号
  index_t index;      // 索引
};

/* One slot stores the key and the meta info of the value which
   describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
  char key[16];
  data_info_t info;
  hash_map_slot *next;
};

#pragma pack(4)
struct hash_map_slot_test {
  char key[16];
  data_info_t info;
  hash_map_slot *next;
};
// 没有#pragma pack()

可以看到struct data_info_t占4字节,但struct hash_map_slot因为对齐的原因占32字节。自然而然的,为了节省空间,可以强制结构体4字节对齐,这样就能节省4字节的空间,也就节省了1/8的空间。但由于不熟悉#pragma pack,#pragma pack(4)并没有以#pragma pack()结束。然后一运行程序就段错误,gdb调试时bt显示调用栈,f3时传递参数为32位,f2突然截断,参数变成了16位。我觉得这个错误过于诡异,因为就修改了字节对齐,故到比赛结束我都没再用#pragma pack,一般也不建议使用#pragma pack。
示例程序:

using page_id_t = uint16_t;
using index_t = uint16_t;

struct data_info_t {  // 数据信息
  page_id_t page_id;  // 页号
  index_t index;      // 索引
};

/* One slot stores the key and the meta info of the value which
   describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
  char key[16];
  data_info_t info;
  hash_map_slot *next;
};

#pragma pack(4)
struct hash_map_slot_test {
  char key[16];
  data_info_t info;
  hash_map_slot_test *next;
};
#pragma pack()

int main() {
  cout << sizeof(hash_map_slot) << endl;
  cout << sizeof(hash_map_slot_test) << endl;
  hash_map_slot slot1;
  hash_map_slot_test slot2;

  printf("hash_map_slot layout:\n%p  \n%p  \n%p\n", &(slot1.key), &(slot1.info), &(slot1.next));
  printf("hash_map_slot test layout:\n%p  \n%p  \n%p\n", &(slot2.key), &(slot2.info), &(slot2.next));
}

/*
输出:
32
28
hash_map_slot layout:
0x7ffef416d1f0  
0x7ffef416d200  
0x7ffef416d208
hash_map_slot test layout:
0x7ffef416d1d0  
0x7ffef416d1e0  
0x7ffef416d1e4
*/

相关博客:
C/C++中结构体内存对齐(边界对齐),#pragma pack设置
关于#pragma pack(n)引发的一系列问题

右值引用本身是左值

1: void func(Test&& t);
2: void func(Test& t);

Test t1
Test&& t2 = std::move(t1);
func(t2);

对于右值引用我一直有一个误区,认为右值引用是右值,例如以上代码片段,实参类型为Test&&,很容易认为此时调用的函数为第二个,但实际上此时调用的却是第一个。这是因为右值引用本身是左值,更为具体来说,右值引用类型既可以被当作左值也可以被当作右值,判断的标准是,如果它有名字,那就是左值,否则就是右值

示例程序:

#include <bits/stdc++.h>
using namespace std;

class Test {
 public:
  Test() = default;
  Test(const Test &test) : str(test.str) { printf("enter copy construction\n"); }
  Test(Test &&test) : str(std::move(test.str)) { printf("enter move construction\n"); }

  Test &operator=(const Test &test) {
    str = test.str;
    printf("enter copy assign\n");
    return *this;
  }
  Test &operator=(Test &&test) {
    str = std::move(test.str);
    printf("enter move assign\n");
    return *this;
  }

 private:
  string str;
};

int main() {
  Test p1, p2, p3;
  Test &&p4 = std::move(p3);

  Test p5(p1);             // 调用复制构造
  Test p6(std::move(p2));  // 调用移动构造
  Test p7(p4);             // 调用复制构造

  Test t1, t2, t3, t4;
  Test &&t5 = std::move(t4);
  t1 = t2;             // 调用复制赋值
  t1 = std::move(t3);  // 调用移动赋值
  t1 = t5;             // 调用复制赋值(误区)
}
/*
输出:
enter copy construction
enter move construction
enter copy construction
enter copy assign
enter move assign
enter copy assign
*/

实际上使用vscode将鼠标点到函数调用的地方就能看到所调用的函数
在这里插入图片描述
在这里插入图片描述
注意move后不应该再使用变量值
我有一次错误就在于调用move函数后仍然使用对象的size方法,导致未定义的行为。

template<class T>
void swap(T& a, T& b)
{
  T tmp(std::move(a));
  a = std::move(b);
  b = std::move(tmp);
}

X a, b;
swap(a, b);

参考博客:
详解C++右值引用

其余优化

读者互斥锁
当读请求未命中时,程序需读取远端的字符串数据,并将其解码成string数组(Page类),插入本地缓存后再读取value;该操作耗时较长,且没必要持有锁;但如果多个请求同时请求该远端页,如果让每一个请求都读取远端页,既浪费IO资源,也没啥实际用处;故增加一个读者互斥锁,保证每一页只有一个读者正在请求远端页,其余请求同一远端页的读者会阻塞互斥锁前;待读取远端页的读者读取页数据完成并将该页插入本地缓存中,其余读者发现本地缓存已存在该页,就不会重复读取远端页了。
相关代码:

  if (m_data_map_[info.page_id] == nullptr) {  // 数据在远端内存
    m_mutex_.unlock();
    m_same_reader_mutex_[info.page_id].lock();   // 读者互斥
    if (m_data_map_[info.page_id] == nullptr) {  // 双重检查
      auto remote_info = m_addr_map_[info.page_id];
      std::string &&page_data = std::string(remote_info.size, '0');
      m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
      auto new_page = std::make_shared<Page>(page_data);  // 构建缓存页
      m_mutex_.lock();
      m_data_map_[info.page_id] = new_page;
      m_cache_size_++;
      m_lru_list_.insert(info.page_id);
      need_update_lru = false;
      m_addr_map_.erase(info.page_id);    // 在远端地址映射中删除该项
      m_addr_list_.emplace(remote_info);  // 将该页加入地址列表
      m_mutex_.unlock();
      // 尝试唤醒后台进程
      if (m_cache_size_ > kPageThreshold) {
        m_cv_.notify_one();
      }
    }
    m_same_reader_mutex_[info.page_id].unlock();
    m_mutex_.lock();
  }

时不时用sizeof看看占用空间,增加一个读者互斥锁相当于每页多了40字节,占比不大。

int main() {
  cout << "mutex:" << sizeof(std::mutex) << endl;
  cout << "shared_ptr:" << sizeof(std::shared_ptr<int>) << endl;
}
/*
输出:
mutex:40
shared_ptr:16
*/

LRU优化
不论是读操作还是写操作,末尾都需要更新LRU列表;为了减小冲突,LRU采用独立的锁,不在m_mutex_锁内更新;当前页(新申请的页)并不加入LRU列表,也不会被淘汰,待当前页写满后再插入到LRU列表中;上次LRU更新的页本次也不再更新(已在队首)。

  if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
    m_lru_list_.update(info.page_id);
    m_last_update_id_ = info.page_id;
  }

map改成数组 vector改数组
由于最大页号确定,map<page_id,page>改成了page[max_page_id];由于页最大记录数确定, std::vector< std::string > m_value_变成了std::string m_value_[kMaxIndex]; 通过使用固定的数据结构,减少了扩容带来的开销。

程序说明

程序正确性预设
1:运行中页号不得超过kBitmapSize,页中记录数不得超过kMaxIndex,页导出字符串长度不得超过kAllocSize
2:程序经历连续的删除操作后才进入读写操作,使得rebuild最大化。在删除之前远端内存足容纳所有数据
3:即使每次插入时会检查当前页是否已满,但仍然无法阻止程序通过更新value值来增大页大小,所以页数据大小大于kAllocSize便会出现问题
4:kAllocSize小于65536,页成员就可以使用uint16存储,大于则需要使用uint32存储
5:mutex之外的操作耗时极短,可以在rebuild处理前完成

1: debug时系统高效充分打印所需的信息
2:架构越简洁。越不容易出错
3: 可编写简单的测试用例检验模块的正确性
4: 利用sizeof和system_clock简单估计内存占用与执行速度
5  右值引用变量在用于表达式时是左值,move后不应该再使用变量值
int &&x = 1;
f(x);             // 调用 f(int& x)
f(std::move(x));  // 调用 f(int&& x)
6 编写代码时明确锁所维持的不变量是什么,而不是仅仅确保操作的原子性
m_mutex_:确保保护成员操作的原子性,并保证页数据仅存在本地(m_data_map_)与远端(m_addr_map_)中的一个位置
m_remote_read_lock_:淘汰时在数据真正写入前阻塞对应页的请求

相关代码

Engine

// kv_engine.h
namespace kv {

/* Encryption algorithm competitor can choose. */
enum aes_algorithm { CTR = 0, CBC, CBC_CS1, CBC_CS2, CBC_CS3, CFB, OFB };

/* Algorithm relate message. */
typedef struct crypto_message_t {
  aes_algorithm algo;
  Ipp8u *key;
  Ipp32u key_len;
  Ipp8u *counter;
  Ipp32u counter_len;
  Ipp8u *piv;
  Ipp32u piv_len;
  Ipp32u blk_size;
  Ipp32u counter_bit;

  IppsAESSpec *ctx;
} crypto_message_t;

/* Abstract base engine */
class Engine {
 public:
  virtual ~Engine(){};

  virtual bool start(const std::string addr, const std::string port) = 0;

  virtual void stop() = 0;

  virtual bool alive() = 0;
};

class LocalEngineEntity;  // 前置声明
/* Local-side engine */
class LocalEngine : public Engine {
 public:
  ~LocalEngine();

  bool start(const std::string addr, const std::string port) override;

  void stop() override;

  bool alive() override;

  /* Init aes context message. */
  bool set_aes();

  bool encrypted(const std::string &value, std::string &encrypt_value);

  /* Evaluation problem will call this function. */
  crypto_message_t *get_aes() { return &m_aes_; }

  bool write(const std::string &key, const std::string &value, bool use_aes = false);

  bool read(const std::string &key, std::string &value);
  /** The delete interface */
  bool deleteK(const std::string &key);

  /** Rebuild the hash index to recycle the unsed memory */
  void rebuild_index();

 private:
  // static inline int get_index(const std::string &key) { return std::hash<std::string>()(key) & (kSharedNumber - 1); }
  static inline int get_index(const std::string &key) { return CityHash16(key.c_str()) & (kSharedNumber - 1); }
  kv::ConnectionManager *m_rdma_conn_;
  // /* NOTE: should use some concurrent data structure, and also should take the
  //  * extra memory overhead into consideration */
  // RDMAMemPool *m_rdma_mem_pool_;

  crypto_message_t m_aes_;

  LocalEngineEntity *m_entity_[kSharedNumber];  // 执行请求的实体
};

/* Remote-side engine */
class RemoteEngine : public Engine {
 public:
  struct WorkerInfo {
    CmdMsgBlock *cmd_msg;
    CmdMsgRespBlock *cmd_resp_msg;
    struct ibv_mr *msg_mr;
    struct ibv_mr *resp_mr;
    rdma_cm_id *cm_id;
    struct ibv_cq *cq;
  };

  ~RemoteEngine(){};

  bool start(const std::string addr, const std::string port) override;
  void stop() override;
  bool alive() override;

 private:
  void handle_connection();

  int create_connection(struct rdma_cm_id *cm_id);

  struct ibv_mr *rdma_register_memory(void *ptr, uint64_t size);

  int remote_write(WorkerInfo *work_info, uint64_t local_addr, uint32_t lkey, uint32_t length, uint64_t remote_addr, uint32_t rkey);

  int allocate_and_register_memory(uint64_t &addr, uint32_t &rkey, uint64_t size);

  void worker(WorkerInfo *work_info, uint32_t num);

  struct rdma_event_channel *m_cm_channel_;
  struct rdma_cm_id *m_listen_id_;
  struct ibv_pd *m_pd_;
  struct ibv_context *m_context_;
  bool m_stop_;
  std::thread *m_conn_handler_;
  WorkerInfo **m_worker_info_;
  uint32_t m_worker_num_;
  std::thread **m_worker_threads_;
};

}  // namespace kv


// local_engine.cc
#include "assert.h"
#include "atomic"
#include "kv_engine.h"
#include "local_engine_entity.h"
#include <iostream>

namespace kv {

/**
 * @description: start local engine service
 * @param {string} addr    the address string of RemoteEngine to connect
 * @param {string} port   the port of RemoteEngine to connect
 * @return {bool} true for success
 */
bool LocalEngine::start(const std::string addr, const std::string port) {
  m_rdma_conn_ = new ConnectionManager();
  if (m_rdma_conn_ == nullptr) return false;
  if (m_rdma_conn_->init(addr, port, 4, 72)) return false;
  for (int i = 0; i < kSharedNumber; i++) {
    m_entity_[i] = new LocalEngineEntity(this, m_rdma_conn_);
  }
  printf("LocalEngine::start finsh\n");
  auto watcher = std::thread([]() {
    sleep(60 * 9);
    printf("timeout\n");
    fflush(stdout);
    abort();
  });
  watcher.detach();
  return true;
}

/**
 * @description: stop local engine service
 * @return {void}
 */
void LocalEngine::stop() {
  for (int i = 0; i < kSharedNumber; i++) {
    delete m_entity_[i];
  }
  delete m_rdma_conn_;
  printf("LocalEngine::stop finsh\n");
};

/**
 * @description: get engine alive state
 * @return {bool}  true for alive
 */
bool LocalEngine::alive() { return true; }

/**
 * @description: provide message about the aes_ecb mode
 * @return {bool}  true for success
 */
bool LocalEngine::set_aes() {
  // Current algorithm is not supported, just for demonstration.
  m_aes_.algo = CBC;
  m_aes_.key_len = 16;
  m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
  if (m_aes_.key == nullptr) return false;
  m_aes_.blk_size = 16;
  m_aes_.piv_len = 16;
  m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
  if (m_aes_.piv == nullptr) return false;

  int ctxSize;               // AES context size
  ippsAESGetSize(&ctxSize);  // evaluating AES context size
  // allocatting memory for AES context
  m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
  // AES context initialization
  ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
  return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
  Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
  // encrypting plaintext
  ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
  std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
  encrypt_value = std::move(tmp);
  return true;
}

bool LocalEngine::write(const std::string &key, const std::string &value, bool use_aes) {
  int index = get_index(key);
  return m_entity_[index]->write(key, value, use_aes);
}
/**
 * @description: read value from engine via key
 * @param {string} key
 * @param {string} &value
 * @return {bool}  true for success
 */

bool LocalEngine::read(const std::string &key, std::string &value) {
  int index = get_index(key);
  return m_entity_[index]->read(key, value);
}

bool LocalEngine::deleteK(const std::string &key) {
  int index = get_index(key);
  return m_entity_[index]->deleteK(key);
}
/* When we delete a number of KV pairs, we should rebuild the index to
 * reallocate remote addr for each KV to recycle fragments. This will block
 * front request processing. This solution should be optimized. */
void LocalEngine::rebuild_index() {
  /** rebuild all the index to recycle the fragment */
  /** step-1: block the database and not allowe any writes
   *  step-2: transverse the index to read each value
   *  step-3: read the value from the remote and write it to a new remote addr
   *  step-4: free all old addr
   */
  printf("*********rebuild_index*********\n");
  for (int i = 0; i < kSharedNumber; i++) {
    m_entity_[i]->rebuild_index();
  }
}

LocalEngine::~LocalEngine() {
  if (nullptr != m_aes_.key) delete[] m_aes_.key;
  if (nullptr != m_aes_.counter) delete[] m_aes_.counter;
  if (nullptr != m_aes_.piv) delete[] m_aes_.piv;
  if (nullptr != m_aes_.ctx) delete (Ipp8u *)m_aes_.ctx;
  m_aes_.key = nullptr;
  m_aes_.counter = nullptr;
  m_aes_.piv = nullptr;
  m_aes_.ctx = nullptr;
}
}  // namespace kv

type

// type.h
namespace kv {
using page_id_t = uint16_t;
using index_t = uint16_t;
struct remote_info_t {  // 远端内存信息
  uint64_t remote_addr;
  uint32_t rkey;
  uint32_t size;
};

struct data_info_t {  // 数据信息
  page_id_t page_id;  // 页号
  index_t index;      // 索引
};

/* One slot stores the key and the meta info of the value which
   describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
  char key[16];
  data_info_t info;
  hash_map_slot *next;
};
const page_id_t kNullPage = 0;
const data_info_t kNullInfo = {0xffff, 0xffff};  // 无效的信息

const int kSharedNumber = 1 << 6;                                             // 缓存实体数目
const uint32_t kBucketNum = 1 << 21;                                          // hash中bucket数
const uint32_t kSlotSize = (1 << 22) * 1.2 * 32 / kSharedNumber;              // hash中slot数
const uint64_t kMaxDataSize = (uint64_t)1 << 36;                              // 测试数据大小
const uint32_t kPageSize = 60 * 1 << 10;                                      // 页容量
const uint32_t kMaxValueSize = 1024;                                          // value最大值
const uint32_t kMinValueSize = 80;                                            // value最小值
const uint32_t kMaxIndex = kPageSize / 96;                                    // 页中最大记录数,96=80+16
const uint64_t kBitmapSize = 1.4 * kMaxDataSize / kSharedNumber / kPageSize;  // 运行过程中最大页号
const uint32_t kAllocSize = 1 << 16;                                          // 分配的远端内存大小
const uint32_t kPageThreshold = 1 << 8;                                       // 本地存储页的阈值
const uint32_t kEvictNumber = kPageThreshold * 0.05;                          // 一次淘汰的页数
const uint32_t kPrintFreq = 1 << 24;                                          // 输出信息频率
const uint64_t kRebuildThreshold = (uint64_t)(1 << 30) * 36 / kSharedNumber;  // 重建阈值
const uint32_t kEralyRegisterNumber = (uint64_t)(1 << 30) * 30 / kSharedNumber / kAllocSize;

class hash_map_t {
 public:
  /* Initialize all the pointers to nullptr. */
  hash_map_t();
  // index为key对应hash值
  /* Find the corresponding key. */
  hash_map_slot *find(const std::string &key, int index);

  /* Insert into the head of the list. */
  void insert(const std::string &key, const data_info_t &info, int index);

  data_info_t remove(const std::string &key, int index);
  // 只在rebuild时用到
  void update(const std::string &key, const data_info_t &info);

 private:
  hash_map_slot *m_bucket_[kBucketNum];
  hash_map_slot *m_slot_head_;                  // 可用的slot链表头部,用于连接被删除元素的slot
  hash_map_slot m_hash_slot_array_[kSlotSize];  // 哈希数组
  uint32_t m_slot_cnt_;
};

class Page {
 public:
  Page(std::string &data);  // 以字符串填充页

  std::string to_string();  // 页数据转换成字符串
  
  void to_string(char *ptr);
  Page() : m_cur_size_{0}, m_cur_index_{0}, m_is_dirty_{true} {};

  bool is_full() { return m_cur_size_ > kPageSize; }  // 页是否满

  uint16_t page_size() { return m_cur_size_; }  // 返回当前页大小

  index_t record_number() { return m_cur_index_; }  // 返回记录数

  index_t insert(const std::string &key, const std::string &value) {
    m_cur_size_ += 16 + value.size();  // 加上key的长度
    m_key_[m_cur_index_] = key;
    m_value_[m_cur_index_] = value;
    return m_cur_index_++;  // 返回当前索引并加一
  }

  index_t insert(const std::string &key, std::string &&value) {
    m_cur_size_ += 16 + value.size();  // 加上key的长度
    m_key_[m_cur_index_] = key;
    m_value_[m_cur_index_] = std::move(value);
    return m_cur_index_++;  // 返回当前索引并加一
  }

  void update(index_t index, const std::string &value) {
    m_is_dirty_ = true;
    m_cur_size_ -= m_value_[index].size();
    m_cur_size_ += value.size();  // 更新页当前大小
    m_value_[index] = value;
  }

  void update(index_t index, std::string &&value) {
    m_is_dirty_ = true;
    m_cur_size_ -= m_value_[index].size();
    m_cur_size_ += value.size();  // 更新页当前大小
    m_value_[index] = std::move(value);
  }

  std::string read_value(index_t index) { return m_value_[index]; }

  std::string read_key(index_t index) { return m_key_[index]; }

  bool is_dirty() { return m_is_dirty_; }

 private:
  std::string m_key_[kMaxIndex];
  std::string m_value_[kMaxIndex];
  uint16_t m_cur_size_;  // 当前页数据大小
  index_t m_cur_index_;  // 当前索引
  bool m_is_dirty_;

  // std::vector<std::string> m_value_;  // 以vector存储key值,方便rebuild时更新元数据
  // std::vector<std::string> m_key_;    // 以vector存储记录
};

class LRUList {
 public:
  LRUList() = default;
  void insert(page_id_t hit_id) {
    m_mutex_.lock();
    m_list_.emplace_front(hit_id);
    m_speed_map_.insert({hit_id, m_list_.begin()});
    m_mutex_.unlock();
  }
  void update(page_id_t hit_id) {
    m_mutex_.lock();
    auto iter = m_speed_map_[hit_id];
    if (iter != m_list_.begin()) {  // 将该页移至队首
      m_list_.erase(iter);
      m_list_.emplace_front(hit_id);
      m_speed_map_[hit_id] = m_list_.begin();
    }
    m_mutex_.unlock();
  }
  page_id_t evict() {
    m_mutex_.lock();
    page_id_t vicitm_page_id = m_list_.back();
    m_list_.pop_back();
    m_speed_map_.erase(vicitm_page_id);
    m_mutex_.unlock();
    return vicitm_page_id;
  }
  std::vector<page_id_t> clear() {  // 清空LRU列表所有元素
    m_mutex_.lock();
    std::vector<page_id_t> res(m_list_.begin(), m_list_.end());
    m_list_.clear();
    m_speed_map_.clear();
    m_mutex_.unlock();
    return res;
  }
  int size() { return m_list_.size(); }

 private:
  std::mutex m_mutex_;
  std::list<page_id_t> m_list_;                                                // LRU列表
  std::unordered_map<page_id_t, std::list<page_id_t>::iterator> m_speed_map_;  // 加速LRU列表访问
};

}  // namespace kv

// type.c
#include <cassert>
#include "type.h"
namespace kv {
hash_map_t::hash_map_t() {
  memset(m_bucket_, 0, sizeof(m_bucket_));
  m_slot_head_ = &m_hash_slot_array_[0];
  m_slot_head_->next = nullptr;
  m_slot_cnt_ = 1;
}

/* Find the corresponding key. */
hash_map_slot *hash_map_t::find(const std::string &key, int index) {
  hash_map_slot *cur = m_bucket_[index];
  if (cur == nullptr) {
    return nullptr;
  }
  while (cur) {
    if (memcmp(cur->key, key.c_str(), 16) == 0) {
      return cur;
    }
    cur = cur->next;
  }
  return nullptr;
}

/* Insert into the head of the list. */
void hash_map_t::insert(const std::string &key, const data_info_t &info, int index) {
  hash_map_slot *new_slot;
  // 优先使用被删除数据的slot
  if (m_slot_head_->next == nullptr) {
    new_slot = &m_hash_slot_array_[m_slot_cnt_++];
    assert(m_slot_cnt_ < kSlotSize);
  } else {
    new_slot = m_slot_head_->next;
    m_slot_head_->next = new_slot->next;
  }
  new_slot->next = nullptr;

  memcpy(new_slot->key, key.c_str(), 16);
  new_slot->info = info;
  if (!m_bucket_[index]) {
    m_bucket_[index] = new_slot;
  } else {
    /* Insert into the head. */
    hash_map_slot *tmp = m_bucket_[index];
    m_bucket_[index] = new_slot;
    new_slot->next = tmp;
  }
}

// 只在rebuild时调用
void hash_map_t::update(const std::string &key, const data_info_t &info) {
  int index = std::hash<std::string>()(key) & (kBucketNum - 1);
  hash_map_slot *cur = m_bucket_[index];
  while (cur) {
    if (memcmp(cur->key, key.c_str(), 16) == 0) {
      cur->info = info;
      return;
    }
    cur = cur->next;
  }
}

data_info_t hash_map_t::remove(const std::string &key, int index) {
  hash_map_slot *cur = m_bucket_[index];
  hash_map_slot *parent = nullptr;
  if (cur == nullptr) {
    return kNullInfo;
  }
  while (cur) {
    if (memcmp(cur->key, key.c_str(), 16) == 0) {
      // 在bucket中删除该slot
      if (parent == nullptr) {
        m_bucket_[index] = cur->next;
      } else {
        parent->next = cur->next;
      }
      // 加入后备链表
      cur->next = m_slot_head_->next;
      m_slot_head_->next = cur;
      return cur->info;
    }
    parent = cur;
    cur = cur->next;
  }
  return kNullInfo;
}
/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
  std::string data;
  data.resize(kAllocSize);
  uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);         // 从该偏移开始存储实际的数据
  char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针

  u16_pointer[0] = m_cur_index_;
  u16_pointer[1] = m_cur_size_;
  uint16_t offset = data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
    u16_pointer[i + 2] = offset;
    offset += 16 + m_value_[i].size();
  }
  u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
  char *p = char_pointer + data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {
    memcpy(p, m_key_[i].c_str(), 16);
    memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
    p += 16 + m_value_[i].size();
  }
  return data;
}
void Page::to_string(char *ptr) {
  uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t);  // 从该偏移开始存储实际的数据
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(ptr);    // 解释为u32指针

  u16_pointer[0] = m_cur_index_;
  u16_pointer[1] = m_cur_size_;
  uint16_t offset = data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {  // 加入各个记录起始地址
    u16_pointer[i + 2] = offset;
    offset += 16 + m_value_[i].size();
  }
  u16_pointer[m_cur_index_ + 2] = offset;  // 实际数据最终偏移
  char *p = ptr + data_start;
  for (uint16_t i = 0; i < m_cur_index_; i++) {
    memcpy(p, m_key_[i].c_str(), 16);
    memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
    p += 16 + m_value_[i].size();
  }
}
Page::Page(std::string &data) {
  char *char_pointer = const_cast<char *>(data.c_str());               // 移除常量性
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer);  // 解释为u32指针
  m_cur_index_ = u16_pointer[0];
  m_cur_size_ = u16_pointer[1];
  uint16_t offset, length;
  for (uint16_t i = 0; i < m_cur_index_; i++) {
    offset = u16_pointer[i + 2];
    length = u16_pointer[i + 3] - u16_pointer[i + 2];
    m_key_[i] = data.substr(offset, 16);
    m_value_[i] = data.substr(offset + 16, length - 16);
  }
  m_is_dirty_ = false;
}

}  // namespace kv

local_engine_entity

// local_engine_entity.h
#pragma once
#include <condition_variable>
#include <bitset>
#include "type.h"
#include "rdma_conn_manager.h"
#include "rdma_mem_pool.h"
namespace kv {

class LocalEngine;  // 前置声明
class LocalEngineEntity {
 public:
  LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn);
  ~LocalEngineEntity();
  bool write(const std::string &key, const std::string &value, bool use_aes = false);
  bool read(const std::string &key, std::string &value);
  bool deleteK(const std::string &key);
  void rebuild_index();
  std::vector<uint64_t> print_memory();  // 输出内存占用信息

 private:
  void start_thread();                  // 启动后台线程
  remote_info_t request_signle_info();  // 请求单个远端地址
  void register_remote_memory();        // 提前注册远端内存

  hash_map_t m_page_map_;                                    // 数据与元数据映射
  std::shared_ptr<Page> m_data_map_[kBitmapSize];            // 页号与页映射(数据存于本地)
  uint32_t m_cache_size_;                                    // 本地缓存大小
  LRUList m_lru_list_;                                       // LRU列表
  std::unordered_map<page_id_t, remote_info_t> m_addr_map_;  // 页号与远端信息映射(数据存于远端)
  uint16_t m_max_index_[kBitmapSize];                        // 各个远端页最大索引
  std::queue<remote_info_t> m_addr_list_;                    // 未使用的远端内存
  std::mutex m_mutex_;                                       // 保护以上成员

  // 注意最大页号不要大于kBitmapSize!
  std::bitset<kMaxIndex> m_bitmap_[kBitmapSize];  // 删除为1,正常为0
  std::mutex m_same_reader_mutex_[kBitmapSize];   // 提供同一页远程读互斥访问

  page_id_t m_cur_page_id_;           // 当前使用页号,不存在于LRU列表
  std::shared_ptr<Page> m_cur_page_;  // 当前使用页

  page_id_t m_vicitm_id_;                // 淘汰页号
  std::shared_ptr<Page> m_vicitm_page_;  // 淘汰页号
  page_id_t m_last_update_id_;           // 上次lru更新的页号

  bool m_alive_;                  // 控制后台进程生命周期
  std::condition_variable m_cv_;  // 用于唤醒后台进程
  std::thread *m_backup_thread_;  // 后台进程
  std::mutex m_useless_mutex_;    // 只是方便调用API,没啥实际意义

  LocalEngine *m_engine_;
  ConnectionManager *m_rdma_conn_;
  RDMAMemPool *m_rdma_mem_pool_;

  // 统计数据
  uint64_t m_alloc_memory_{0};  // 申请远端内存大小

  bool m_rebuild_allow_{true};
  bool m_delete_envent_{false};
  bool m_rw_envent_after_delete_{false};
};
}  // namespace kv

// local_engine_entity.cc
#include <set>
#include <future>
#include <functional>
#include "local_engine_entity.h"
#include "kv_engine.h"

namespace kv {
LocalEngineEntity::LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn) : m_engine_(engine), m_rdma_conn_(rdma_conn) {
  m_rdma_mem_pool_ = new RDMAMemPool(m_rdma_conn_);
  if (m_rdma_mem_pool_ == nullptr) {
    printf("alloc rdma_mem_pool failed");
  }
  auto page = std::make_shared<Page>();  // 申请第一页
  m_cur_page_id_ = 1;
  m_cur_page_ = page;

  m_vicitm_id_ = kNullPage;
  m_vicitm_page_ = nullptr;
  m_last_update_id_ = kNullPage;

  m_data_map_[m_cur_page_id_] = page;
  m_cache_size_ = 1;

  m_alive_ = true;
  // 提前申请远端内存
  auto requester = std::thread([&]() { register_remote_memory(); });
  requester.detach();
  // 启动后台线程
  start_thread();
}
// 申请kEralyRegisterNumber个kAllocSize大小的远端内存
void LocalEngineEntity::register_remote_memory() {
  std::queue<remote_info_t> list;
  remote_info_t remote_info;
  remote_info.size = kAllocSize;
  for (uint32_t i = 0; i < kEralyRegisterNumber; i++) {
    m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
    list.emplace(remote_info);
  }
  m_alloc_memory_ += kAllocSize * kEralyRegisterNumber;

  m_mutex_.lock();
  while (!m_addr_list_.empty()) {
    list.emplace(m_addr_list_.front());
    m_addr_list_.pop();
  }
  m_addr_list_ = std::move(list);
  m_mutex_.unlock();
}

remote_info_t LocalEngineEntity::request_signle_info() {
  remote_info_t remote_info;
  // 先从后备地址列表选择远端地址
  if (!m_addr_list_.empty()) {
    remote_info = m_addr_list_.front();
    m_addr_list_.pop();
    return remote_info;
  }
  // 申请远端内存
  remote_info.size = kAllocSize;
  m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
  m_alloc_memory_ += remote_info.size;
  return remote_info;
}

void LocalEngineEntity::start_thread() {
  auto backup_func = [&]() {
    uint64_t rebuild_threshold = kRebuildThreshold;
    std::unique_lock<std::mutex> useless_lock(m_useless_mutex_);  // 持有无用锁
    while (m_alive_) {
      while (m_cache_size_ < kPageThreshold && !(m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_)) {  // 若当前大小小于阈值并且不需要重构则休眠
        m_cv_.wait(useless_lock);
        if (!m_alive_) {  // 进程退出,该后台线程也应该退出
          return;
        }
      }
      if (m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_) {
        rebuild_index();
        m_rebuild_allow_ = false;
      }
      if (m_cache_size_ > kPageThreshold) {
        for (uint32_t i = 0; i < kEvictNumber; i++) {
          m_mutex_.lock();
          page_id_t victim_id = m_lru_list_.evict();
          std::shared_ptr<Page> victim_page = m_data_map_[victim_id];
          remote_info_t info = request_signle_info();

          // 设置淘汰页信息
          m_data_map_[victim_id] = nullptr;
          m_cache_size_--;
          m_addr_map_[victim_id] = info;
          m_max_index_[victim_id] = victim_page->record_number();
          m_mutex_.unlock();

          std::string &&vicitm_page_data = victim_page->to_string();  // 记录淘汰页数据,开始写入远端内存
          // 将本地数据写入远端内存
          m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), kAllocSize, info.remote_addr, info.rkey);
        }
      }
    }
  };
  m_backup_thread_ = new std::thread(backup_func);
}
LocalEngineEntity::~LocalEngineEntity() {
  m_alive_ = false;
  m_cv_.notify_one();
  m_backup_thread_->join();
  delete m_backup_thread_;
  delete m_rdma_mem_pool_;
}

bool LocalEngineEntity::write(const std::string &key, const std::string &value, bool use_aes) {
  if (m_rebuild_allow_ && m_delete_envent_) {
    m_rw_envent_after_delete_ = true;
    m_cv_.notify_one();
  }

  // 区分加密与非加密
  std::string encrypt_value;
  if (use_aes) {
    m_engine_->encrypted(value, encrypt_value);
  }
  int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);

  m_mutex_.lock();
  auto slot = m_page_map_.find(key, hash_index);

  // 第一次写入或该页正在写入远端或该页正在远端
  if (slot == nullptr || m_data_map_[slot->info.page_id] == nullptr) {
    if (m_cur_page_->is_full()) {          // 该页已满不可用
      m_lru_list_.insert(m_cur_page_id_);  // 直到页满才插入LRU列表
      m_cur_page_id_++;
      m_data_map_[m_cur_page_id_] = std::make_shared<Page>();
      m_cur_page_ = m_data_map_[m_cur_page_id_];
      m_cache_size_++;
      // 尝试唤醒后台进程
      if (m_cache_size_ > kPageThreshold) {
        m_cv_.notify_one();
      }
    }

    index_t index;
    if (use_aes) {
      index = m_cur_page_->insert(key, std::move(encrypt_value));
    } else {
      index = m_cur_page_->insert(key, value);
    }

    data_info_t info = {m_cur_page_id_, index};
    if (slot == nullptr) {                        // 第一次写入,插入元数据
      m_page_map_.insert(key, info, hash_index);  // 插入key与元数据映射
    } else {
      m_bitmap_[slot->info.page_id].set(slot->info.index, true);  // 将之前数据标记为删除
      slot->info = info;                                          // 更新元数据信息
    }
    m_mutex_.unlock();
  } else {  // 该页在本地且未被淘汰,更新页数据
    data_info_t info = slot->info;
    if (use_aes) {
      m_data_map_[info.page_id]->update(info.index, std::move(encrypt_value));
    } else {
      m_data_map_[info.page_id]->update(info.index, value);
    }
    m_mutex_.unlock();
    // 更新LRU列表
    if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_) {
      m_lru_list_.update(info.page_id);
      m_last_update_id_ = info.page_id;
    }
  }
  return true;
}
bool LocalEngineEntity::read(const std::string &key, std::string &value) {
  if (m_rebuild_allow_ && m_delete_envent_) {
    m_rw_envent_after_delete_ = true;
    m_cv_.notify_one();
  }
  int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
  bool need_update_lru = true;
  m_mutex_.lock();

  auto slot = m_page_map_.find(key, hash_index);
  if (slot == nullptr) {  // 元数据不存在
    m_mutex_.unlock();
    return false;
  }
  data_info_t info = slot->info;
  if (info.page_id == m_vicitm_id_) {
    value = m_vicitm_page_->read_value(info.index);
    m_mutex_.unlock();
    return true;
  }
  if (m_data_map_[info.page_id] == nullptr) {  // 数据在远端内存
    m_mutex_.unlock();
    m_same_reader_mutex_[info.page_id].lock();   // 读者互斥
    if (m_data_map_[info.page_id] == nullptr) {  // 双重检查
      auto remote_info = m_addr_map_[info.page_id];
      std::string &&page_data = std::string(remote_info.size, '0');
      m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
      auto new_page = std::make_shared<Page>(page_data);  // 构建缓存页
      m_mutex_.lock();
      m_data_map_[info.page_id] = new_page;
      m_cache_size_++;
      m_lru_list_.insert(info.page_id);
      need_update_lru = false;
      m_addr_map_.erase(info.page_id);    // 在远端地址映射中删除该项
      m_addr_list_.emplace(remote_info);  // 将该页加入地址列表
      m_mutex_.unlock();
      // 尝试唤醒后台进程
      if (m_cache_size_ > kPageThreshold) {
        m_cv_.notify_one();
      }
    }
    m_same_reader_mutex_[info.page_id].unlock();
    m_mutex_.lock();
  }
  value = m_data_map_[info.page_id]->read_value(info.index);
  m_mutex_.unlock();
  if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
    m_lru_list_.update(info.page_id);
    m_last_update_id_ = info.page_id;
  }
  return true;
}

bool LocalEngineEntity::deleteK(const std::string &key) {
  m_delete_envent_ = true;
  int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
  m_mutex_.lock();
  data_info_t info = m_page_map_.remove(key, hash_index);  // 删除对应key的元数据
  m_mutex_.unlock();
  m_bitmap_[info.page_id].set(info.index, true);  // 将对应记录标记为删除
  return true;
}

void LocalEngineEntity::rebuild_index() {
  std::lock_guard<std::mutex> lk(m_mutex_);
  std::string key;
  std::string value;
  data_info_t data_info;
  std::shared_ptr<Page> page;
  std::unordered_map<page_id_t, remote_info_t> tmp_addr_map;  // 暂时存储页号与远端地址映射
  uint32_t new_cache_size = 1;
  std::vector<page_id_t> local_id = m_lru_list_.clear();
  auto new_page = m_cur_page_;
  page_id_t new_page_id = m_cur_page_id_;
  // 处理本地缓存页
  for (auto &page_id : local_id) {
    auto &bitmap = m_bitmap_[page_id];
    if (bitmap.none()) {  // 不存在删除的记录,不进行操作
      m_lru_list_.insert(page_id);
      new_cache_size++;
    } else if (!bitmap.all()) {  //存在有效记录
      auto page = m_data_map_[page_id];
      int record_num = page->record_number();
      for (int i = 0; i < record_num; i++) {
        if (!bitmap.test(i)) {        // 该记录未被删除
          if (new_page->is_full()) {  // 页满,写入本地
            m_data_map_[new_page_id] = new_page;
            new_cache_size++;
            m_lru_list_.insert(new_page_id);
            new_page = std::make_shared<Page>();
            new_page_id++;
          }
          // 读出数据插入新页并更新元数据映射
          key = page->read_key(i);
          data_info.index = new_page->insert(key, page->read_value(i));
          data_info.page_id = new_page_id;
          m_page_map_.update(key, data_info);
        }
      }
      m_data_map_[page_id] = nullptr;  // 删除原先页
    } else {
      m_data_map_[page_id] = nullptr;  // 删除原先页
    }
  }
  page_id_t page_id;
  remote_info_t info;
  char head_data[kMaxIndex * 10];
  char kv_data[2 * kMaxValueSize];
  uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
  uint16_t head_length;
  uint16_t offset, length;

  // 处理远端内存页
  for (auto &kv : m_addr_map_) {
    page_id = kv.first;
    info = kv.second;
    auto &bitmap = m_bitmap_[page_id];
    if (bitmap.none()) {  // 不存在删除的记录,不进行操作
      tmp_addr_map.insert({page_id, info});
    } else if (bitmap.all()) {  // 不存在有效记录,将后端地址加入地址列表
      m_addr_list_.emplace(info);
    } else {
      bool avai_info = true;  // 是否将该远端地址加入地址列表
      head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
      // 读取页头部数据
      m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
      int record_num = m_max_index_[page_id];
      for (int i = 0; i < record_num; i++) {
        if (!bitmap.test(i)) {
          if (new_page->is_full()) {
            if (new_cache_size > kPageThreshold) {  // 本地页满,写入远程
              std::string &&page_data = new_page->to_string();
              uint32_t len = page_data.length();
              m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
              tmp_addr_map.insert({new_page_id, info});  // 暂时记录页号与远程地址映射
              avai_info = false;
            } else {  // 写入本地缓存
              m_data_map_[new_page_id] = new_page;
              new_cache_size++;
              m_lru_list_.insert(new_page_id);
            }
            new_page = std::make_shared<Page>();
            new_page_id++;
          }
          // 读出数据插入新页并更新元数据映射
          offset = u16_pointer[i + 2];
          length = u16_pointer[i + 3] - u16_pointer[i + 2];
          m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
          key = std::string(kv_data, kv_data + 16);
          value = std::string(kv_data + 16, kv_data + length);
          data_info.index = new_page->insert(key, std::move(value));
          data_info.page_id = new_page_id;
          m_page_map_.update(key, data_info);
        }
      }
      if (avai_info) {
        m_addr_list_.emplace(info);
      }
    }
  }
  m_addr_map_ = std::move(tmp_addr_map);
  m_cur_page_id_ = new_page_id;
  m_cur_page_ = new_page;
  m_vicitm_id_ = kNullPage;
  m_vicitm_page_ = nullptr;
  m_cache_size_ = new_cache_size;
  m_last_update_id_ = kNullPage;
  m_data_map_[new_page_id] = new_page;
}

std::vector<uint64_t> LocalEngineEntity::print_memory() {
  // 输出一系列统计数据
  uint64_t key_metadata = kSlotSize * 32 + kBucketNum * 8;
  uint64_t page_id_size = m_cur_page_id_;
  uint64_t page_metadata = page_id_size * 48 + kBitmapSize * 1.2 * kMaxIndex;
  uint64_t page_size = m_cache_size_ * (kPageSize);
  return {key_metadata, page_id_size, page_metadata, page_size, m_alloc_memory_};
}
}  // namespace kv