总结
MapReduce
MapReduce是一种用于处理和生成大规模数据的一种算法模型。对于输入数据, 用户指定一种 MapFunc 则可以生成一个个键值对(称作 intermediate) , 这一过程被叫做 Map。对于Intermediate, 用户指定一种 Reduce Function,则可以根据Intermediate 的 key 对Intermediate 进行合并归类, 这一过程被叫做 Reduce
注意点:
每个map任务会输出多个文件,每个文件中的intermediate经过hash函数取余后值是一样的。由于Reduce任务需要对所有key一样的intermediate执行Reduce Function,所以必须在coordinator中对所有map woker上报的文件进行处理,确保分配给一个reduce woker的所有文件其对应的hash取余是一样的
所有map任务执行结束后reduce任务才能执行
实现思路
woker
woker比较简单,一直从coordinator获取任务,根据任务类型进行不同的操作,任务完成后把消息上报
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
task := getTask()
switch task.TaskType {
case TaskNone:
myLog("receive done signal, worker exist")
return
case TaskSleep:
time.Sleep(WorkerSleepDuration)
case TaskMap:
doMapTask(task, mapf)
taskReport(task)
case TaskReduce:
doReduceTask(task, reducef)
taskReport(task)
}
}
}
coordinator
从时间顺序看,coordinator需要经历如下过程:
- 根据输入文件名和nReduce初始化
- 分配Task任务
- 等待Task任务全部完成
- shffuix、初始化Reduce任务
- 等待reduce任务完成
要求
实现内容
- 一个coordinator线程和多个woker线程并发
- woker通过RPC和coordinator交流
- woker线程循环询问coordinator获取任务,读取任务的输入(可能是多个输入文件),执行任务,输出文件(可能是多个),然后再次询问新任务
- 如果woker在10秒内还没完成任务,coordinator需要把这个任务重新分配给别的woker线程,每个mapper应该创建n个中间变量
具体要求
map函数需要把中间值的key映射为n个值,n由
main/mrcoordinator.go
文件的MakeCoordinator()
函数提供worker需要把输出文件写到
mr-out-X
中,X代表这是第几个woker输出文件的格式:
// this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
临时文件写到同级目录即可
mr/coordinator.go
文件中如果MapReduce任务结束了,Done()
函数需要返回true,之后mrcoordinator.go
才会退出任务完成时,woker协程需要退出,可以通过使用
call()
函数的返回值实现,如果woker和mrcoordinator的通信中断,也可以关闭woker协程,因为此时任务已经完成了
提示
开始任务的方式:
- 修改
mr/worker.go
文件的Worker()
函数,发送rpc给coordinator,获取任务 - 修改coordinator回复一个未开始map任务的文件名
- 修改
mrsequential.go
读取文件、调用map方法
- 修改
map、reduce方法在运行时被加载,实现在wc.so中
修改了mr目录下任何文件后,都需要重新编译plugin
临时文件需要被命名为
mr-X-Y
,X是map任务的编号,Y是reduce任务的编号woker的map任务需要把kv对保存到文件中,reduce任务再来读取,可以使用json
// wirte enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv) } // read dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
worker可以使用
ihash(key)
函数去选择执行任务的reduce协程可以从
mrsequential.go
借鉴有关文件读取、中间值kv排序、输出值排序的代码coordinator作为RPC服务器是并发的,记得给公用变量加锁
使用
go run -race
开启go race detector,test-mr.sh
文件在最开始注释了如何通过-race运行woker有时需要等待,比如reduce需要等到map结束后才能进行。可以让woker休眠+轮训coordinator,也可以让coordinator中循环等待,等待可以使用
time.Sleep()
orsync.Cond
使用
mrapps/crash.go
plugin来测试crash恢复为了保证没有人读取到crash程序写入的不完整文件,可以使用临时文件,在完成后自动重命名。可以使用e
ioutil.TempFile
或os.CreateTemp
创建临时文件,使用os.Rename
重命名mr-tmp
目录输出了测试结果test-mr-many.sh
会执行很多次test-mr.sh
,这可以帮忙测试出小概率bug,他需要接受参数指定测试次数。注意,不能并发运行test-mr.sh
RPC只会发送可导出类型的结构体,子结构体变量名也要大写字母开头
当调用RPC的
call()
函数时,应该将回复结构体(reply struct)初始化为其类型的默认值。这意味着在进行RPC调用之前,不应该设置回复结构体的任何字段,如果在进行RPC调用之前,传递的回复结构体含有非默认值的字段,RPC系统可能会在不发出任何警告的情况下返回错误的值这两个文件不要动
main/mrcoordinator.go
andmain/mrworker.go
实现代码放在
mr/coordinator.go
,mr/worker.go
,mr/rpc.go
写完后运行程序
rm mr-out* go run mrcoordinator.go pg-*.txt go run mrworker.go wc.so 在mr-out-*查看输出 cat mr-out-* | sort | more A 509 ABOUT 2 ACT 8
测试脚本在
main/test-mr.sh
用于检查- 输出是否正确
- map、reduce是否并发
- woker崩溃时是否可以恢复
debug和运行的编译参数不一样
- debug
go build -buildmode=plugin -gcflags="all=-N -l" ../mrapps/wc.go
- 运行
go build -buildmode=plugin ../mrapps/wc.go
- debug
代码
RPC
RPC规则:
- 方法可导出
- 方法只能有两个可序列化参数,第二个需要是指针,方法需要返回err
服务端
// 服务端 type HelloService struct{} func (h *HelloService) Hello(request string, responce *string) error { *responce = "hello:" + request return nil } func main() { err := rpc.RegisterName("HelloService", new(HelloService)) if err != nil { return } listener, err := net.Listen("tcp", "1234") if err != nil { return } conn, err := listener.Accept() if err != nil { return } rpc.ServeConn(conn) }
客户端
func main() { dial, err := rpc.Dial("tcp", "localhost:1234") if err != nil { panic(err) } var reply string err = dial.Call("HelloService.Hello", "hello test", &reply) if err != nil { panic(err) } fmt.Println(reply) }
参考
https://juejin.cn/post/7260123819476926501