mit6.824 2024spring Lab1 MapReduce

发布于:2025-08-17 ⋅ 阅读:(21) ⋅ 点赞:(0)

master节点

package mr

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
	"os"
	"sync"
	"time"
)

// coordinator -> master节点

// 1处理worker节点的rpc请求
// 2维护任务状态
// 3检测任务是否超时,任务执行时间太久要给他停掉然后重新分配

type Task struct {
	FileName  string
	Status    string
	StartTime time.Time
	TaskID    int
}
type Coordinator struct {
	// Your definitions here.
	mu          sync.Mutex //worker节点并发来请求,所以需要加锁
	mapTasks    []Task
	reduceTasks []Task
	nReduce     int  //下面的MakeCoordinator函数参数中有一个nReduce
	mapFinished bool //map完成后进入reduce阶段
	allFinished bool
	files       []string
	nextTaskID  int
}

// Your code here -- RPC handlers for the worker to call.

// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

// c是方法的接收者,类似于其他原因的this或者self
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 检查是否存在超时任务
	c.checkTimeOut()

	// 如果所有任务完成,通知worker退出
	if c.allFinished {
		reply.TaskType = ExitTask
		return nil
	}

	// 如果map任务还没执行完,给worker分配map任务
	if !c.mapFinished {
		for i, task := range c.mapTasks {
			if task.Status == Idle {
				reply.TaskID = task.TaskID
				reply.TaskType = MapTask
				reply.FileName = task.FileName
				reply.NReduce = c.nReduce //map后会分到n个桶
				// 更新任务状态
				c.mapTasks[i].Status = InProgress
				c.mapTasks[i].StartTime = time.Now()
				return nil
			}
		}
		// map没有全部完成,但是找不到一个空闲的map任务
		reply.TaskType = WaitTask
		return nil
	}

	//所有map都执行完,则分配reduce任务
	for i, task := range c.reduceTasks {
		if task.Status == Idle {
			reply.TaskID = task.TaskID
			reply.TaskType = ReduceTask
			reply.ReduceTaskNum = i            //第几个reduce任务(编号)
			reply.MapTaskNum = len(c.mapTasks) //有多少个map任务
			//更新任务状态
			c.reduceTasks[i].Status = InProgress
			c.reduceTasks[i].StartTime = time.Now()
			return nil
		}
	}
	// 没有空闲的reduce任务
	reply.TaskType = WaitTask
	return nil
}

func (c *Coordinator) checkTimeOut() {
	// 超时时间10s
	timeOut := 10 * time.Second
	now := time.Now()

	if !c.mapFinished {
		allCompleted := true
		for i, task := range c.mapTasks {
			if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {
				// 任务已经超时了
				c.mapTasks[i].Status = Idle
			}
			if task.Status != Completed {
				allCompleted = false
			}
		}
		c.mapFinished = allCompleted
	}

	allCompleted := true
	for i, task := range c.reduceTasks {
		if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {
			c.reduceTasks[i].Status = Idle
		}
		if task.Status != Completed {
			allCompleted = false
		}
	}
	c.allFinished = allCompleted
}

func (c *Coordinator) ReportTaskDone(args *ReportTaskArgs, reply *ReportTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if args.TaskType == MapTask {
		for i, task := range c.mapTasks {
			if task.TaskID == args.TaskID && task.Status == InProgress {
				c.mapTasks[i].Status = Completed

				allCompleted := true
				for _, task := range c.mapTasks {
					if task.Status != Completed {
						allCompleted = false
						break
					}
				}
				c.allFinished = allCompleted
				reply.OK = true
				return nil
			}
		}
	} else if args.TaskType == ReduceTask {
		for i, task := range c.reduceTasks {
			if task.TaskID == args.TaskID && task.Status == InProgress {
				c.reduceTasks[i].Status = Completed

				allCompleted := true
				for _, task := range c.reduceTasks {
					if task.Status != Completed {
						allCompleted = false
						break
					}
				}
				c.allFinished = allCompleted
				reply.OK = true
				return nil
			}
		}
	}
	reply.OK = false
	return nil
}

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
	ret := false

	// Your code here.
	if c.allFinished {
		ret = true
	}

	return ret
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	// Your code here.

	c := Coordinator{
		files:       files,
		nReduce:     nReduce,
		mapTasks:    make([]Task, len(files)),
		reduceTasks: make([]Task, nReduce),
		mapFinished: false,
		allFinished: false,
		nextTaskID:  0,
		// mu不需要初始化
	}

	// 初始化map任务
	for i, file := range c.files {
		c.mapTasks[i] = Task{
			TaskID:   i,
			FileName: file,
			Status:   Idle,
		}
	}

	// 初始化reduce任务
	// reduce任务输入的是一些中间文件,是之后要产生的,现在并不知道名字,所以FileName不需要初始化
	for i := 0; i < nReduce; i++ {
		c.reduceTasks[i] = Task{
			TaskID: i,
			Status: Idle,
		}
	}

	c.server()
	return &c
}

worker节点

package mr

import (
	"encoding/json"
	"fmt"
	"hash/fnv"
	"io"
	"log"
	"net/rpc"
	"os"
	"sort"
	"time"
)

/**
* 请求任务
* 执行map或reduce任务
* 处理文件输入输出
* 汇报任务状态
 */

// Map functions return a slice of KeyValue.
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	workerID := os.Getpid()

	for {
		task := getTask(workerID)
		switch task.TaskType {
		case MapTask:
			doMap(task, mapf, workerID)
		case ReduceTask:
			doReduce(task, reducef, workerID)
		case WaitTask:
			time.Sleep(500 * time.Millisecond)
			continue
		case ExitTask:
			return
		}
	}
}

func doMap(task GetTaskReply, mapf func(string, string) []KeyValue, workerID int) {
	filename := task.FileName
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := io.ReadAll(file)
	if err != nil {
		log.Fatalf("connot read %v", filename)
	}
	file.Close()

	// 调用用户自定义的map函数,得到键值对
	kva := mapf(filename, string(content))

	// 将中间结果分成nReduce个桶
	intermediate := make([][]KeyValue, task.NReduce)

	for _, kv := range kva {
		bucketNum := ihash(kv.Key) % task.NReduce
		intermediate[bucketNum] = append(intermediate[bucketNum], kv)
	}

	// 将每个桶放到对应的临时文件中
	for i := 0; i < task.NReduce; i++ {
		tempFile, err := os.CreateTemp("", "mr-tmp-*")
		if err != nil {
			log.Fatalf("connot create tmp file")
		}

		enc := json.NewEncoder(tempFile)
		for _, kv := range intermediate {
			err := enc.Encode(&kv)
			if err != nil {
				log.Fatalf("connot encode %v", kv)
			}
		}
		tempFile.Close()
		// 将临时文件重命名 mr-map任务编号-reduce桶编号
		os.Rename(tempFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, i))
	}
	reportTaskDone(task.TaskType, task.TaskID, workerID)
}
func doReduce(task GetTaskReply, reducef func(string, []string) string, workerID int) {
	reduceTaskNum := task.ReduceTaskNum
	mapTaskNum := task.MapTaskNum

	intermediate := []KeyValue{}

	// 读取该reduce负责的中间文件
	for i := 0; i < mapTaskNum; i++ {
		filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNum)
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
		file.Close()
	}
	sort.Sort(ByKey(intermediate))

	// 创建输出的临时文件
	tempfile, err := os.CreateTemp("", "mr-out-tmp-*")
	if err != nil {
		log.Fatalf("connot create temp file")
	}

	// 对每一个key调一下reduce函数
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(tempfile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
	tempfile.Close()
	os.Rename(tempfile.Name(), fmt.Sprintf("mr-out-%d", reduceTaskNum))
	reportTaskDone(ReduceTask, task.TaskID, workerID)
}
func getTask(workerID int) GetTaskReply {
	args := GetTaskArgs{WorkerID: workerID}
	reply := GetTaskReply{}
	call("Coordinator.GetTask", &args, &reply)
	return reply
}
func reportTaskDone(taskType string, taskID int, workerID int) {
	args := ReportTaskArgs{TaskType: taskType, TaskID: taskID, WorkerID: workerID, Completed: true}
	reply := ReportTaskReply{}
	call("Coordinator.ReportTaskDone", &args, &reply)
}

// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.Example", &args, &reply)
	if ok {
		// reply.Y should be 100.
		fmt.Printf("reply.Y %v\n", reply.Y)
	} else {
		fmt.Printf("call failed!\n")
	}
}

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import (
	"os"
	"strconv"
)

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.

// 任务类型常量
const (
	MapTask    = "map"
	ReduceTask = "reduce"
	ExitTask   = "exit"
	WaitTask   = "wait"
)

// 任务状态常量
const (
	Idle       = "idle"
	InProgress = "in-progress"
	Completed  = "completed"
)

// 请求任务
type GetTaskArgs struct {
	WorkerID int
}

type GetTaskReply struct {
	TaskID        int
	TaskType      string
	FileName      string
	MapTaskNum    int //map任务总数
	ReduceTaskNum int //reduce任务的编号,负责分区编号,表示当前Reduce任务负责处理的分区编号。
	// Map任务完成后,结果会分到NReduce个桶里面,Reduce任务需要从所有Map任务的中间文件中读取对应分区的数据
	NReduce int //一共有多少个reduce任务
}

// 汇报任务状态
type ReportTaskArgs struct {
	TaskType  string
	WorkerID  int
	TaskID    int
	Completed bool
}
type ReportTaskReply struct {
	OK bool
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
	s := "/var/tmp/5840-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

网站公告

今日签到

点亮在社区的每一天
去签到