B+树 BulkLoading 多核并行设计
目录
数据库系统课程设计报告 1
B+树 BulkLoading 多核并行设计 1
Ⅰ . B+树 BulkLoading 过程理解 1
1.1 BulkLoading 的基本思想 1
1.2 结合实现理解 BulkLoading 1
1.3 BulkLoading 的利弊 2
Ⅱ . 并行设计思路 3
2.1 基本想法 3
Ⅲ . 算法流程图 4
4.1 连续分配块 5
4.2 连续写磁盘 6
Ⅴ . 关键代码描述 6
5.2 12
Ⅵ . 实验结果分析 28
6.0 并行和性能调优的代码说明 28
6.0.3 如何测试代码 29
6.1 并行实验结果代码及分析 29
6.1.1 影响因素 29
6.1.2 实验方法 29
测试代码 29
6.2 性能调优和创新优化实验结果及分析 36
6.2.1 性能调优的原因 36
6.2.2 数据量的大小 36
6.2.3 线程数 38
6.2.4 Buffer大小 38
1.3BulkLoading 的利弊
相比于通过插入节点构造B+树,BulkLoding充分利用了数据有序的特性,让每一个节点构造过程的 复杂度都为O(1),构造过程中不存在节点的分叉、合并等调整,生成B+树的过程非常快。
通过BulkLoading构造B+树也可能存在潜在的问题:因为每一个节点从左到右都是尽可能填满的, 当需要插入新节点时,B+树结构的变动可能会非常大,会产生较大的时间开销。
或许我们可以加入一个填充因子来控制BulkLoading每次为一个叶子节点填出的数据项的个数,让 叶子节点的数据项不是满的,这样可以减弱插入节点对B+树结构造成的影响。
Ⅱ . 并行设计思路
2.1基本想法
首先,需要思考的问题是在批量加载的哪些步骤可以并行,通过阅读串行代码我们可知,构建B+树 的串行过程大致为:读取数据并在内存中创建一个节点对象存放数据,然后再通过系统函数 fwrite() 将该节点中的数据写入到磁盘中。
我们考虑到,节点数据写到文件的过程由于文件指针的限制无法并行,同一时刻只能有一个线程写 磁盘,因此可以创建内存缓冲区缓冲节点数据,在获取写文件权限后将内存中节点写出。
由于写磁盘的速度低于写内存,BulkLoading 运行速度的瓶颈可能在于写硬盘,因此要充分提高
CPU和磁盘IO的并行度,来减少IO造成的护航效应。
引入互斥锁:由于写硬盘不能并行,因此需要一个互斥锁保证同一时刻只有一个线程能够将已经加 载到内存的节点数据写入硬盘。那么对于其余抢不到锁的线程,我们有三种处理思路:
一个线程在内存中写完一个节点就等待直到获取锁(堵塞);
一个线程在内存中写完所有待处理的节点再争取锁,等待直到获取锁(堵塞);
一个节点每向内存写一个节点就尝试获取锁,本文转载自http://www.biyezuopin.vip/onews.asp?id=16715如果获取失败就将块加入一个队列,如果获取成功就 磁盘写队列内所有的节点(非堵塞)。
我们采取的是类似于第三种思路的并行思路:一个节点每向内存写完 MIN_BLOCK 个节点就尝试获取锁,如果获取失败就将块加入一个长为 MAX_BLOCK 的队列,如果获取成功就磁盘写队列内所有的节点(非堵塞),如果此时队列已满就进入堵塞状态。其中 MIN_BLOCK 和 MAX_BLOCK 是两个可调的参数。
最后,在我们的并行 BulkLoading 过程中,主线程会为每一层创建若干个线程,并且只有一层的所有线程执行完工作后,返回记录线程产生的一批节点键值的数组,才会开始创建上一层的线程。
#include <iostream>
#include <algorithm>
#include <cmath>
#include <cstdlib>
#include <cstring>
#include <vector>
#include <string>
#include <set>
#include <fstream>
#include <sstream>
#include "def.h"
#include "util.h"
#include "random.h"
#include "pri_queue.h"
#include "b_node.h"
#include "b_tree.h"
#include "evaluate.h"
using namespace std;
// print B_Tree
void print_B_Tree(BTree *trees_, char *filename)
{
FILE *fp;
fp = fopen(filename, "w");
if (fp == NULL)
{
printf("File can not open!");
exit(0);
}
int start_block = trees_->root_;
int end_block = trees_->root_;
int newly_startblock;
int newly_endblock;
BIndexNode *index_child = NULL;
// read root node
char indexnode_level;
int indexnode_num_entries;
BIndexNode *indexnode_left_sibling;
BIndexNode *indexnode_right_sibling;
fprintf(fp, "root: block %d\n", start_block);
index_child = new BIndexNode();
index_child->init_restore(trees_, start_block);
indexnode_level = index_child->get_level();
indexnode_num_entries = index_child->get_num_entries();
indexnode_left_sibling = index_child->get_left_sibling();
indexnode_right_sibling = index_child->get_right_sibling();
fprintf(fp, "\tlevel: %d \tnum_entries: %d\n", indexnode_level, indexnode_num_entries);
for (int j = 0; j < indexnode_num_entries; ++j)
{
fprintf(fp, "\t\tkey: %f\tson: %d\n", index_child->get_key(j), index_child->get_son(j));
}
start_block = index_child->get_son(0);
end_block = index_child->get_son(indexnode_num_entries - 1);
delete index_child;
index_child = NULL;
// index node
// from root to the leaf layer to layer
while (start_block > 1)
{
for (int k = start_block; k <= end_block; k++)
{
fprintf(fp, "index: block %d\n", k);
index_child = new BIndexNode();
index_child->init_restore(trees_, k);
indexnode_level = index_child->get_level();
indexnode_num_entries = index_child->get_num_entries();
indexnode_left_sibling = index_child->get_left_sibling();
indexnode_right_sibling = index_child->get_right_sibling();
fprintf(fp, "\tlevel: %d \tnum_entries: %d\n", indexnode_level, indexnode_num_entries);
for (int j = 0; j < indexnode_num_entries; ++j)
{
fprintf(fp, "\t\tkey: %f\tson: %d\n", index_child->get_key(j), index_child->get_son(j));
}
if (k == start_block)
{
newly_startblock = index_child->get_son(0);
}
if (k == end_block)
{
newly_endblock = index_child->get_son(indexnode_num_entries - 1);
}
delete index_child;
index_child = NULL;
} // end for loop
start_block = newly_startblock;
end_block = newly_endblock;
} // end while
// leaf node variable
BLeafNode *leaf_child = NULL;
// read root node
char leafnode_level;
int leafnode_num_entries;
int leafnode_num_keys;
BLeafNode *leafnode_left_sibling;
BLeafNode *leafnode_right_sibling;
// print leaf node
for (int k = start_block; k <= end_block; k++)
{
fprintf(fp, "leaf: block %d\n", k);
leaf_child = new BLeafNode();
leaf_child->init_restore(trees_, k);
leafnode_level = leaf_child->get_level();
leafnode_num_entries = leaf_child->get_num_entries();
leafnode_left_sibling = leaf_child->get_left_sibling();
leafnode_right_sibling = leaf_child->get_right_sibling();
leafnode_num_keys = leaf_child->get_num_keys();
fprintf(fp, "\tlevel: %d \tnum_entries: %d\tnum_keys: %d\n", leafnode_level, leafnode_num_entries, leafnode_num_keys);
for (int j = 0; j < leafnode_num_keys; ++j)
{
int count_entries = 0;
fprintf(fp, "\t\tkey: %f\n", leaf_child->get_key(j));
for (int w = count_entries; w < std::min(count_entries + 16, leafnode_num_entries); w++)
{
fprintf(fp, "\t\t\tid: %d\n", leaf_child->get_entry_id(w));
}
count_entries += 16;
}
delete leaf_child;
leaf_child = NULL;
}
fclose(fp);
}
// -----------------------------------------------------------------------------
int main(int nargs, char **args)
{
char data_file[200];
char tree_file_ser[200];
char tree_file_par[200];
int B_ = 512; // node size
int n_pts_ = 20000000;
strncpy(data_file, "./data/dataset.csv", sizeof(data_file));
strncpy(tree_file_ser, "./result/B_tree_ser", sizeof(tree_file_ser));
strncpy(tree_file_par, "./result/B_tree_par", sizeof(tree_file_par));
printf("data_file = %s\n", data_file);
printf("tree_file_ser = %s\n", tree_file_ser);
printf("tree_file_par = %s\n", tree_file_par);
Result *table = new Result[n_pts_];
ifstream fp(data_file);
string line;
int i = 0;
while (getline(fp, line) && i <= n_pts_ - 1)
{
string number;
istringstream readstr(line);
getline(readstr, number, ',');
table[i].key_ = atof(number.c_str());
getline(readstr, number, ',');
table[i].id_ = atoi(number.c_str());
i++;
}
fp.close();
timeval start_t;
timeval end_t;
printf("开始运行,并行线程数:20,内存缓冲大小:500,数据量:%d\n", n_pts_);
BTree *trees_ = new BTree();
trees_->init(B_, tree_file_ser);
gettimeofday(&start_t, NULL);
if (trees_->bulkload(n_pts_, table))
return 1;
gettimeofday(&end_t, NULL);
float run_t1 = end_t.tv_sec - start_t.tv_sec +
(end_t.tv_usec - start_t.tv_usec) / 1000000.0f;
printf("串行运行时间: %f s\n", run_t1);
print_B_Tree(trees_, "./result/ser_res");
delete trees_;
trees_ = new BTree();
trees_->init(B_, tree_file_par);
gettimeofday(&start_t, NULL);
if (trees_->parallelBulkLoad(n_pts_, table, 20, 500))
return 1;
gettimeofday(&end_t, NULL);
run_t1 = end_t.tv_sec - start_t.tv_sec +
(end_t.tv_usec - start_t.tv_usec) / 1000000.0f;
printf("并行运行时间: %f s\n", run_t1);
print_B_Tree(trees_, "./result/par_res");
if (table != NULL)
{
delete[] table;
table = NULL;
}
char systemCmd[1024];
printf("进行结果正确性验证\n");
strncpy(systemCmd, "diff ", 1024);
strcat(systemCmd, "./result/par_res ");
strcat(systemCmd, "./result/ser_res");
printf("%s\n", systemCmd);
system(systemCmd);
printf("开始进行结果测试...\n");
// 参数测试
// evaluate();
return 0;
}