摘要
以 2003年,Google 发表的三篇论文为标志的大数据时代,至今已过去近二十年时间,MapReduce 那篇论文虽然只有理论,并为公开底层软件实现。但这么多年过去,Hadoop,Spark 等框架早已实现论文中所描述的功能,甚至还有所改进。
本文以 MapReduce 为基础,实现了一套基于浏览器实现的分布式系统。加之如今 Chrome 对各个平台近乎完美的兼容性,实现了一次编写,处处运行的目标。同时得力于个人移动设备的普及,手机,平板,甚至是家用游戏机,智能电视。如果急需性能,还可以通过朋友圈的方式,号召朋友们使用自己的设备,在后台开启几个标签的方式,成为计算节点,加快整体计算速度。
在 BMR 系统下,用户甚至不需要学习 C++,JAVA 等传统分布式计算用到的语言;只需要会简单的 JS,即可完成分布式计算任务的开发,开发成本极低。本文对 BMR系统的设计,以及实现时做的取舍做了详细说明,对分布式计算平台的研究具有一定的指导意义。
关键词:MapReduce、分布式计算、高性能
MapReduce-based Distributed Computing System
Abstract
Intelligent In 2003, Google published three papers as a sign of the era of big data, nearly two decades have passed, MapReduce that paper, although only the theory, and for the public underlying software implementation. However, after so many years, Hadoop, Spark and other frameworks have already achieved the functions described in the paper, and even improved them.
This paper implements a browser-based distributed system based on MapReduce. With the near-perfect compatibility of Chrome with all platforms today, the goal of writing once and running everywhere is achieved. And thanks to the popularity of personal mobile devices, phones, tablets, and even home consoles and smart TVs. If performance is desperately needed, it is also possible to call on friends to use their own devices by opening several tabs in the background to become computing nodes and speed up the overall computing speed.
With the BMR system, users do not even need to learn C++, JAVA, and other languages traditionally used in distributed computing; they only need to know simple JS to develop distributed computing tasks, and the development cost is extremely low. This paper provides a detailed description of the design of the BMR system and the trade-offs made in its implementation, which is a guideline for the study of distributed computing platforms.
Keywords: MapReduce, distributed computing, high performance
目 录
摘要 I
Abstract II
目 录 III
第一章 引言 1
1.1 研究背景 1
1.2 研究现状 1
1.3 论文目标及内容 2
第二章 MapRedues相关技术介绍 4
2.1 分布式系统 4
2.1.1 什么是分布式系统 4
2.1.2 并行于分布式计算 4
2.1.3 分布式系统的架构 5
2.1.4 分布式系统的历史 6
2.1.5 分布式系统的应用 6
2.1.6 分布式系统的特点 7
2.2 函数式编程语言 7
2.2.1 函数式编程 7
2.2.2 lambda演算 8
2.2.3 函数式编程的优缺点 9
2.3 MapReduce 编程模型 11
2.3.1 介绍 11
2.3.2 运行流程 12
2.3.3 数据流 13
2.3.4 MapReduce的优缺点 14
第三章 基于MapReduce的计算框架设计和技术分析 16
3.1 开发工具链 16
3.1.1 Visual Studio Code 16
3.1.2 NPM 16
3.1.3 Chrome 16
3.1.4 系统的技术栈 16
3.2 系统的技术栈 17
3.2.1 前端技术栈 18
3.2.2 后端技术栈 19
3.3 系统的总体设计 20
3.3.1 总体架构 20
3.3.2 服务端模块划分 21
3.3.3 客户端模块划分 24
3.3.4 服务端客户端通信接口 26
3.4 服务端详细设计 30
3.4.1 执行代码组建模块 30
3.4.2 标记模块 31
3.4.3 运行状态模块 34
3.4.4 检查点模块 35
3.4.5 检测模块 36
3.5 客户端详情 37
3.5.1 JSON美化组件 37
3.5.2 Worker 38
第四章 框架的使用说明 39
4.1 运行环境 39
4.1.1 服务端 39
4.1.2 客户端 40
4.2 源码结构 41
4.2.1 服务端 41
4.2.2 客户端 42
4.3 使用流程 43
第五章 结论 45
5.1 论文总结工作 45
5.2 未来展望 46
参考文献 47
致谢 49
1.3 论文目标及内容
本课题采用 B/S 架构,基于 MapReduce 分布式计算平台,在 BMR 平台上为用户提供方便快捷的分布式计算功能;用户计算机,为集群提供高性能的 Worker 节点;并通过 VUE 和 ElementUI 等现代化的前端开发框架,为用户美观、易用的 WEB GUI(WEB 图形交互界面),以查看任务运行状态,节点工作情况等。为了实现上述系统开发目标,需重点研究如下几方面内容:
1.MapReduce 相关技术的研究
对 MapReduce 相关技术,如分布式系统,函数式编程,并行计算等技术展开分析,研究它们的原理,演变过程,优缺点,以及与 BMR 系统的关系。
2.系统的设计与技术分析
介绍 BMR 系统开发过程中所使用的工具,阐述系统中各个模块的设计目的,对关键功能实现所涉及的算法进行说明,并对框架中接口的进行简要说明。
3.基于 BMR 系统编写的计算任务的开发流程
讲解系统的部署,运行以及在该如何基于该系统,编写分布式任务,详细说明开发时需要遵循的约定。并通过几个例子讲解使用时,如何使用插件机制,提升计算效率。
本文转载自:http://www.biyezuopin.vip/onews.asp?id=16553
import axios from 'axios'
import { expose } from "threads/worker"
const TASK_TYPES = {
MAP : "MAP",
REDUCE : "REDUCE",
FINISH : "FINISH",
};
const TASK_STATUS_TYPES = {
PREPARE : "PREPARE",
RUNNING : "RUNNING",
FINISH : "FINISH",
FAIL : "FAIL"
}
function bind_emit(arr) {
return (key, value) => {
arr.push([key, value]);
}
}
const default_inputreader = (key, value, _emit) => {
value.split('\n').forEach((word, line_number) => {
if (word == '') return;
_emit(line_number, word);
});
}
const default_comparator = (key1, key2) => {
if (key1 < key2) return -1;
else if (key1 === key2) return 0;
return 1;
}
function group_by(items, key) {
return items.reduce((result, x) => {
result[x[key]] = (result[x[key]] || []);
result[x[key]].push(x);
return result;
}, {});
}
function exec_wrapper(_fun, params, is_arr=false, counter) {
const output = [];
if (is_arr) {
params.forEach(item => {
const key = item[0];
const value = item[1];
try {
_fun(key, value, bind_emit(output));
counter['finish_record'] = counter['finish_record'] ?? 0;
counter['finish_record'] += 1;
} catch (ex) {
counter['failed_record'] = counter['failed_record'] ?? 0;
counter['failed_record'] += 1;
}
});
} else {
_fun(...params, bind_emit(output));
}
return output;
}
function parse_usercode(usercode) {
const obj = JSON.parse(JSON.stringify(usercode));
['_map', '_reduce', '_combinator', '_partioning', '_comparator', '_inputreader', '_outputreader'].forEach(name => {
if (obj[name])
obj[name] = new Function(`return ${obj[name]}`)();
});
return obj;
}
async function map_executor(usercode, map_task) {
const [filename, filedata] = map_task['input'];
const inputreader = usercode['_inputreader'] ?? default_inputreader;
const comparator = usercode['_comparator'] ?? default_comparator;
// inputreader
const map_input = exec_wrapper(inputreader, [filename, filedata]);
// map
const counter = {};
let map_output = exec_wrapper(usercode['_map'], map_input, true, counter);
// partion
if (usercode['_partioning']) {
const partion = usercode['_partioning'];
map_output = map_output.map(item => {
return [partion(item[0]), item[1]];
});
}
// combinator
if (usercode['_combinator']) {
const combinator_output = [];
const combinator = usercode['_combinator'];
for (let [key, values] of Object.entries(group_by(map_output, 0))) {
combinator(key, values, bind_emit(combinator_output));
}
map_output = combinator_output;
}
map_output = map_output.sort((a, b) => comparator(a[0], b[0]));
// upload map result
const job_uid = map_task['job_uid'];
const task_uid = map_task['task_uid'];
const msg = await axios.post(`http://localhost:8080/mr/job/${job_uid}/finish_task/${task_uid}`, {
counter,
output: [`${filename}`, map_output]
}, {
maxContentLength: Infinity,
maxBodyLength: Infinity
});
console.log(msg.data);
console.log(map_output);
}
async function reduce_executor(usercode, task) {
const reduce_output = [];
// for (const [key, values] of Object.entries(filedata)) {
// usercode['_reduce'](key, values.map(item => item[1]), bind_emit(reduce_output));
// }
const [key, values] = task['input'];
usercode['_reduce'](key, values.map(item => item[1]), bind_emit(reduce_output));
// post to server
const job_uid = task['job_uid'];
const task_uid = task['task_uid'];
const msg = await axios.post(`http://localhost:8080/mr/job/${job_uid}/finish_task/${task_uid}`, {
output: reduce_output
}, {
maxContentLength: Infinity,
maxBodyLength: Infinity
});
console.log(msg.data);
console.log(reduce_output);
}
async function start(job_uid) {
const usercode = parse_usercode((await axios.get(`http://localhost:8080/mr/job/${job_uid}/code`)).data);
const task = (await axios.get(`http://localhost:8080/mr/job/${job_uid}/task`)).data;
// console.log(task['task_type']);
if (task['task_type'] === TASK_TYPES.MAP)
map_executor(usercode, task);
else if (task['task_type'] === TASK_TYPES.REDUCE)
reduce_executor(usercode, task);
const job = (await axios.get(`http://localhost:8080/mr/job/53ca9264-9722-4210-88d7-faac1ad0b9c9`)).data;
// console.log(job);
if (job['job_status'] !== TASK_STATUS_TYPES.FINISH) {
setTimeout(() => {
start(job_uid);
}, 1000);
}
}
expose(function(job_uid) {
start(job_uid);
});