MIT6.824-lab2D-日志压缩(log compaction)

发布于:2022-08-09 ⋅ 阅读:(312) ⋅ 点赞:(0)

2D(日志压缩)

我2D部分最后一个测试没有通过,找了很久,始终没发现问题,不想搞了,暂时这样吧。大体的思路我觉得没有问题。?

做到显示为止,我们重启的服务器会重放完整的 Raft 日志以恢复其状态。但是,长期运行的服务永远记住完整的 Raft 日志是不切实际的。因此,我们可以做一个快照,快照存储了在某一个index之前的所有日志的一个状态, Raft 就可以看丢弃快照之前的日志条目。结果是更少量的持久数据和更快的重启。

任务

主要是实现四个API:

  • sendInstallSnapshotToPeer(server int) :向指定的节点发送快照;
  • InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply):节点接收到快照;
  • Snapshot(index int, snapshot []byte):index是快照所包含的最后一个日志的index,snapshot是已经处理好的快照字节流,用来生成一次快照;
  • CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool:当follower收到leader的快照并判断接收后,要生成一个快照命令,应用该快照命令时会调用CondInstallSnapshot()函数进行处理;

这里有一个容易搞错的点:

  • log日志中0位置存储的是快照,其实0位置真正存储的是一个空命令,可以理解为是快照的一个标识,lastSnapshotTerm和lastSnapshotIndex分别表示当前快照的term和index,也就是快照所包含的最后一个日志的term和index;
  • 快照命令并不算做日志的一项,该命令和日志所记录的命令是分开进行处理的。当follower判断接收了一个快照后,就会生成一个快照命令并发送到applyChan来进行处理。

任务须知

如果系统设置了快照,那么raft会使用applierSnap来处理我们要应用的命令:

func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config {
    ...
    applier := cfg.applier
    if snapshot {
        //如果开启了快照,则使用这个applier
        applier = cfg.applierSnap
    }
    // create a full set of Rafts.
    for i := 0; i < cfg.n; i++ {
        cfg.logs[i] = map[int]interface{}{}
        cfg.start1(i, applier)  //启动一个raft节点
    }
    ...
}

func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
    ...
    go applier(i, applyCh) //创建一个协成不断运行
    ...
}

具体的运行函数applierSnap:

func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
    cfg.mu.Lock()
    rf := cfg.rafts[i]
    cfg.mu.Unlock()
    if rf == nil {
        return // ???
    }

    for m := range applyCh {
        err_msg := ""
        //如果是一个快照命令
        if m.SnapshotValid {
            //如果是一个快照命令,代表着接收了leader发来的新快照,调用CondInstallSnapshot()进行处理
            if rf.CondInstallSnapshot(m.SnapshotTerm, m.SnapshotIndex, m.Snapshot) {
                cfg.mu.Lock()
                err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
                cfg.mu.Unlock()
            }
            //如果是一个普通命令(也就是日志存储的命令)
        } else if m.CommandValid {
            ...
            //每有10条日志,就调用Snapshot()进行一次快照生成(其实是9条,因为0位置存储的是快照)
            if (m.CommandIndex+1)%SnapShotInterval == 0 {
                w := new(bytes.Buffer)
                e := labgob.NewEncoder(w)
                e.Encode(m.CommandIndex)
                var xlog []interface{}
                for j := 0; j <= m.CommandIndex; j++ {
                    xlog = append(xlog, cfg.logs[i][j])
                }
                e.Encode(xlog)
                rf.Snapshot(m.CommandIndex, w.Bytes())
            }
        } else {
            // Ignore other types of ApplyMsg.
        }
        if err_msg != "" {
            log.Fatalf("apply error: %v", err_msg)
            cfg.applyErr[i] = err_msg
        }
    }
}

根据以上的源码,我们可以知道:

  • raft会根据日志的数量,调用Snapshot()来进行一次快照生成;
  • 当follower收到leader的快照并判断接收后,要生成一个快照命令,应用该快照命令时会调用CondInstallSnapshot()函数进行处理;

代码

rpc参数和回复

type InstallSnapshotArgs struct {
    Term              int
    LeaderId          int
    LastIncludedIndex int
    LastIncludedTerm  int
    //Offset            int
    Data []byte
    //Done bool
}

type InstallSnapshotReply struct {
    Term int
}

sendInstallSnapshotToPeer

//向指定节点发送快照
func (rf *Raft) sendInstallSnapshotToPeer(server int) {
    rf.mu.Lock()
    args := InstallSnapshotArgs{
        Term:              rf.currentTerm,
        LeaderId:          rf.me,
        LastIncludedIndex: rf.lastSnapshotIndex,
        LastIncludedTerm:  rf.lastSnapshotTerm,
        Data:              rf.persister.ReadSnapshot(),
    }
    rf.mu.Unlock()

    //用于调用超时
    timer := time.NewTimer(RPCTimeout)
    defer timer.Stop()
    DPrintf("%v role: %v, send snapshot  to peer,%v,args = %+v,reply = %+v", rf.me, rf.role, server, args)

    //发送rpc
    for {
        timer.Stop()
        timer.Reset(RPCTimeout)

        ch := make(chan bool, 1)
        reply := &InstallSnapshotReply{}
        go func() {
            ok := rf.peers[server].Call("Raft.InstallSnapshot", &args, reply)
            if !ok {
                time.Sleep(time.Millisecond * 10)
            }
            ch <- ok
        }()

        select {
            case <-rf.stopCh:
            return
            case <-timer.C:
            DPrintf("%v role: %v, send snapshot to peer %v TIME OUT!!!", rf.me, rf.role, server)
            continue
            case ok := <-ch:
            if !ok {
                continue
            }
        }

        //对结果进行处理
        rf.mu.Lock()
        defer rf.mu.Unlock()
        if rf.role != Role_Leader || args.Term != rf.currentTerm {
            return
        }
        if reply.Term > rf.currentTerm {
            rf.changeRole(Role_Follower)
            rf.currentTerm = reply.Term
            rf.resetElectionTimer()
            rf.persist()
            return
        }
	
        if args.LastIncludedIndex > rf.matchIndex[server] {
            rf.matchIndex[server] = args.LastIncludedIndex
        }
        if args.LastIncludedIndex+1 > rf.nextIndex[server] {
            rf.nextIndex[server] = args.LastIncludedIndex + 1
        }
        return
    }
}

大体流程分为三步:

  • 根据当前节点的状态生成InstallSnapshotArgs;
  • 在rpc超时时间以内,不断地向指定节点发送rpc,直到发送成功;
  • 根据发送获取的结果,进行判断,更新相应的数据结构,特别是matchIndex和nextindex。

InstallSnapshot

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    reply.Term = rf.currentTerm
    if rf.currentTerm > args.Term {
        return
    }

    if args.Term > rf.currentTerm || rf.role != Role_Follower {
        rf.changeRole(Role_Follower)
        rf.votedFor = -1
        rf.currentTerm = args.Term
        rf.resetElectionTimer()
        rf.persist()
    }

    //如果自身快照包含的最后一个日志>=leader快照包含的最后一个日志,就没必要接受了
    if rf.lastSnapshotIndex >= args.LastIncludedIndex {
        return
    }

    //接收发来的快照,并提交一个命令处理
    rf.applyCh <- ApplyMsg{
        SnapshotValid: true,
        Snapshot:      args.Data,
        SnapshotTerm:  args.LastIncludedTerm,
        SnapshotIndex: args.LastIncludedIndex,
    }
}

大概分为三步:

  • 当节点收到快照后,如果自身的term大于发送方的term,就拒绝;否者更新自身的term和role等信息;
  • 如果自身快照包含的最后一个日志>=leader快照包含的最后一个日志,就没必要接受了;
  • 否则,接收发来的快照,并提交一个命令处理。

Snapshot

//生成一次快照,实现很简单,删除掉对应已经被压缩的 raft log 即可
//index是当前要压缩到的index,snapshot是已经帮我们压缩好的数据
func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    snapshotIndex := rf.lastSnapshotIndex
    //如果当前快照包含的最后一条log的index≥参数中快照的index,则没必要进行压缩
    if snapshotIndex >= index {
        DPrintf("{Node %v} rejects replacing log with snapshotIndex %v as current snapshotIndex %v is larger in term %v", rf.me, index, snapshotIndex, rf.currentTerm)
        return
    }
    oldLastSnapshotIndex := rf.lastSnapshotIndex
    rf.lastSnapshotTerm = rf.logs[rf.getStoreIndexByLogIndex(index)].Term
    rf.lastSnapshotIndex = index
    //删掉index前的所有日志
    rf.logs = rf.logs[index-oldLastSnapshotIndex:]
    //0位置就是快照命令
    rf.logs[0].Term = rf.lastSnapshotTerm
    rf.logs[0].Command = nil
    rf.persister.SaveStateAndSnapshot(rf.getPersistData(), snapshot)
    DPrintf("{Node %v}'s state is {role %v,term %v,commitIndex %v,lastApplied %v} after replacing log with snapshotIndex %v as old snapshotIndex %v is smaller", rf.me, rf.role, rf.currentTerm, rf.commitIndex, rf.lastApplied, index, snapshotIndex)
}

CondInstallSnapshot

func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    // Your code here (2D).
    //判断接收快照后,logs中还有没有多余的日志,并进行不同的处理
    _, lastIndex := rf.getLastLogTermAndIndex()
    if lastIncludedIndex > lastIndex {
        rf.logs = make([]LogEntry, 1)
    } else {
        installLen := lastIncludedIndex - rf.lastSnapshotIndex
        rf.logs = rf.logs[installLen:]
        rf.logs[0].Command = nil
    }
    //0处是空日志,代表了快照日志的标记
    rf.logs[0].Term = lastIncludedTerm

    //其实接下来可以读入快照的数据进行同步,这里可以不写

    rf.lastSnapshotIndex, rf.lastSnapshotTerm = lastIncludedIndex, lastIncludedTerm
    rf.lastApplied, rf.commitIndex = lastIncludedIndex, lastIncludedIndex
    //保存快照和状态
    rf.persister.SaveStateAndSnapshot(rf.getPersistData(), snapshot)
    return true
}

测试结果

在这里插入图片描述

最后一个测试没有通过,找了很久,始终没发现问题,不想搞了,暂时这样吧。

本文含有隐藏内容,请 开通VIP 后查看

网站公告


今日签到

点亮在社区的每一天
去签到