MIT 6.5840-分布式系统 Lab1

发布于:2024-03-17 ⋅ 阅读:(93) ⋅ 点赞:(0)

总结

MapReduce
  • MapReduce是一种用于处理和生成大规模数据的一种算法模型。对于输入数据, 用户指定一种 MapFunc 则可以生成一个个键值对(称作 intermediate) , 这一过程被叫做 Map。对于Intermediate, 用户指定一种 Reduce Function,则可以根据Intermediate 的 key 对Intermediate 进行合并归类, 这一过程被叫做 Reduce

  • 注意点:

    • 每个map任务会输出多个文件,每个文件中的intermediate经过hash函数取余后值是一样的。由于Reduce任务需要对所有key一样的intermediate执行Reduce Function,所以必须在coordinator中对所有map woker上报的文件进行处理,确保分配给一个reduce woker的所有文件其对应的hash取余是一样的

    • 所有map任务执行结束后reduce任务才能执行

实现思路
woker

woker比较简单,一直从coordinator获取任务,根据任务类型进行不同的操作,任务完成后把消息上报

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		task := getTask()
		switch task.TaskType {
		case TaskNone:
			myLog("receive done signal, worker exist")
			return
		case TaskSleep:
			time.Sleep(WorkerSleepDuration)
		case TaskMap:
			doMapTask(task, mapf)
			taskReport(task)
		case TaskReduce:
			doReduceTask(task, reducef)
			taskReport(task)
		}
	}
}
coordinator

从时间顺序看,coordinator需要经历如下过程:

  • 根据输入文件名和nReduce初始化
  • 分配Task任务
  • 等待Task任务全部完成
  • shffuix、初始化Reduce任务
  • 等待reduce任务完成

要求

  • 实现内容

    • 一个coordinator线程和多个woker线程并发
    • woker通过RPC和coordinator交流
    • woker线程循环询问coordinator获取任务,读取任务的输入(可能是多个输入文件),执行任务,输出文件(可能是多个),然后再次询问新任务
    • 如果woker在10秒内还没完成任务,coordinator需要把这个任务重新分配给别的woker线程,每个mapper应该创建n个中间变量
  • 具体要求

    • map函数需要把中间值的key映射为n个值,n由main/mrcoordinator.go 文件的 MakeCoordinator()函数提供

    • worker需要把输出文件写到mr-out-X中,X代表这是第几个woker

    • 输出文件的格式:

      // this is the correct format for each line of Reduce output.
      fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
      
    • 临时文件写到同级目录即可

    • mr/coordinator.go 文件中如果MapReduce任务结束了, Done()函数需要返回true,之后mrcoordinator.go才会退出

    • 任务完成时,woker协程需要退出,可以通过使用 call()函数的返回值实现,如果woker和mrcoordinator的通信中断,也可以关闭woker协程,因为此时任务已经完成了

  • 提示

    • 开始任务的方式:

      • 修改mr/worker.go文件的Worker() 函数,发送rpc给coordinator,获取任务
      • 修改coordinator回复一个未开始map任务的文件名
      • 修改mrsequential.go读取文件、调用map方法
    • map、reduce方法在运行时被加载,实现在wc.so中

    • 修改了mr目录下任何文件后,都需要重新编译plugin

    • 临时文件需要被命名为 mr-X-Y,X是map任务的编号,Y是reduce任务的编号

    • woker的map任务需要把kv对保存到文件中,reduce任务再来读取,可以使用json

        // wirte
        enc := json.NewEncoder(file)
        for _, kv := ... {
          err := enc.Encode(&kv)
        }
        
        // read
        dec := json.NewDecoder(file)
        for {
          var kv KeyValue
          if err := dec.Decode(&kv); err != nil {
            break
          }
          kva = append(kva, kv)
        } 
      
    • worker可以使用 ihash(key) 函数去选择执行任务的reduce协程

    • 可以从 mrsequential.go 借鉴有关文件读取、中间值kv排序、输出值排序的代码

    • coordinator作为RPC服务器是并发的,记得给公用变量加锁

    • 使用 go run -race开启go race detector,test-mr.sh 文件在最开始注释了如何通过-race运行

    • woker有时需要等待,比如reduce需要等到map结束后才能进行。可以让woker休眠+轮训coordinator,也可以让coordinator中循环等待,等待可以使用 time.Sleep() or sync.Cond

    • 使用 mrapps/crash.go plugin来测试crash恢复

    • 为了保证没有人读取到crash程序写入的不完整文件,可以使用临时文件,在完成后自动重命名。可以使用e ioutil.TempFileos.CreateTemp 创建临时文件,使用 os.Rename 重命名

    • mr-tmp目录输出了测试结果

    • test-mr-many.sh会执行很多次 test-mr.sh,这可以帮忙测试出小概率bug,他需要接受参数指定测试次数。注意,不能并发运行 test-mr.sh

    • RPC只会发送可导出类型的结构体,子结构体变量名也要大写字母开头

    • 当调用RPC的 call() 函数时,应该将回复结构体(reply struct)初始化为其类型的默认值。这意味着在进行RPC调用之前,不应该设置回复结构体的任何字段,如果在进行RPC调用之前,传递的回复结构体含有非默认值的字段,RPC系统可能会在不发出任何警告的情况下返回错误的值

    • 这两个文件不要动main/mrcoordinator.go and main/mrworker.go

    • 实现代码放在 mr/coordinator.go, mr/worker.go ,mr/rpc.go

  • 写完后运行程序

    rm mr-out*
    go run mrcoordinator.go pg-*.txt
    go run mrworker.go wc.so
    在mr-out-*查看输出
    cat mr-out-* | sort | more
    A 509
    ABOUT 2
    ACT 8
    
  • 测试脚本在 main/test-mr.sh 用于检查

    • 输出是否正确
    • map、reduce是否并发
    • woker崩溃时是否可以恢复
  • debug和运行的编译参数不一样

    • debug go build -buildmode=plugin -gcflags="all=-N -l" ../mrapps/wc.go
    • 运行 go build -buildmode=plugin ../mrapps/wc.go

代码

RPC

  • RPC规则:

    • 方法可导出
    • 方法只能有两个可序列化参数,第二个需要是指针,方法需要返回err
  • 服务端

    // 服务端
    type HelloService struct{}
    
    func (h *HelloService) Hello(request string, responce *string) error {
    	*responce = "hello:" + request
    	return nil
    }
    
    func main() {
    	err := rpc.RegisterName("HelloService", new(HelloService))
    	if err != nil {
    		return
    	}
    	listener, err := net.Listen("tcp", "1234")
    	if err != nil {
    		return
    	}
    	conn, err := listener.Accept()
    	if err != nil {
    		return
    	}
    	
    	rpc.ServeConn(conn)
    }
    
  • 客户端

    func main() {
    	dial, err := rpc.Dial("tcp", "localhost:1234")
    	if err != nil {
    		panic(err)
    	}
    
    	var reply string
    	err = dial.Call("HelloService.Hello", "hello test", &reply)
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println(reply)
    }
    

参考

https://juejin.cn/post/7260123819476926501

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到