MIT6.824 Raft算法Lab2A实验

发布于:2025-07-24 ⋅ 阅读:(33) ⋅ 点赞:(0)

实验目标

掌握raft的leader选举算法
结果:完成src/raft/test_test.go中带有2A名称的测试函数

test1

func TestInitialElection2A(t *testing.T) {
	servers := 3
	cfg := make_config(t, servers, false)
	defer cfg.cleanup()

	cfg.begin("Test (2A): initial election")

	// is a leader elected?
	cfg.checkOneLeader()

	// sleep a bit to avoid racing with followers learning of the
	// election, then check that all peers agree on the term.
	time.Sleep(50 * time.Millisecond)
	term1 := cfg.checkTerms()

	// does the leader+term stay the same if there is no network failure?
	time.Sleep(2 * RaftElectionTimeout)
	term2 := cfg.checkTerms()
	if term1 != term2 {
		fmt.Printf("warning: term changed even though there were no failures")
	}

	// there should still be a leader.
	cfg.checkOneLeader()

	cfg.end()
}
  • 初始选举检查
  • Term 一致性检查
  • Leader 稳定性检查
  • 最终 Leader 检查

test 2

func TestReElection2A(t *testing.T) {
	servers := 3
	cfg := make_config(t, servers, false)
	defer cfg.cleanup()

	cfg.begin("Test (2A): election after network failure")

	leader1 := cfg.checkOneLeader()

	// if the leader disconnects, a new one should be elected.
	cfg.disconnect(leader1)
	cfg.checkOneLeader()

	// if the old leader rejoins, that shouldn't
	// disturb the new leader.
	cfg.connect(leader1)
	leader2 := cfg.checkOneLeader()

	// if there's no quorum, no leader should
	// be elected.
	cfg.disconnect(leader2)
	cfg.disconnect((leader2 + 1) % servers)
	time.Sleep(2 * RaftElectionTimeout)
	cfg.checkNoLeader()

	// if a quorum arises, it should elect a leader.
	cfg.connect((leader2 + 1) % servers)
	cfg.checkOneLeader()

	// re-join of last node shouldn't prevent leader from existing.
	cfg.connect(leader2)
	cfg.checkOneLeader()

	cfg.end()
}

选出一个leader,断开这个leader,重新连接这个leader,防止一个集群产生两个leader、

大纲

需要完成的部分:

  • raft的初始化
  • 维护自身的定时器,在超时时做出对应的动作
  • 投票的服务和发送
  • 心跳的服务和发送

raft的初始化

type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int        //当前的任期
	votedFor    int        // 投给了谁
	logs        []LogEntry // 日志条目
	commitIndex int        //已经提交的日志下标
	lastApplied int        //最后应用的日志下标
	nextIndex   []int      //server:每个peer的下一条日志索引
	matchIndex  []int      //server:已经提交的日志索引

	status       int
	timer        Timer
	voteAgreeCnt int
}
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.currentTerm = 0
	rf.votedFor = -1
	rf.logs = make([]LogEntry, 0)
	rf.commitIndex = -1
	rf.lastApplied = -1
	rf.nextIndex = make([]int, len(peers))
	rf.matchIndex = make([]int, len(peers))
	rf.status = Follower
	rf.timer = Timer{ticker: time.NewTicker(time.Duration(150+rand.Intn(200)) * time.Millisecond)}
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	go rf.HandleTimeOut()
	return rf
}

其中启动了一个rf.HandleTimeOut协程来处理超时情况

func (rf *Raft) HandleTimeOut() {
	for {
		select {
		case <-rf.timer.ticker.C:
			rf.mu.Lock()
			if rf.status == Follower || rf.status == Candidate {
				if rf.status == Follower {
					rf.status = Candidate
				}
				rf.currentTerm++
				rf.votedFor = rf.me
				rf.timer.ResetTimeOut()
				rf.voteAgreeCnt = 1
				args := RequestVoteArgs{
					Term:         rf.currentTerm,
					CandidateId:  rf.me,
					LastLogIndex: rf.commitIndex,
				}
				if len(rf.logs) > 0 {
					args.LastLogTerm = rf.logs[len(rf.logs)-1].Term
				}
				for i, _ := range rf.peers {
					if i == rf.me {
						continue
					}
					resp := RequestVoteReply{}
					go rf.sendRequestVote(i, &args, &resp)
				}
			} else if rf.status == Leader {
				rf.timer.ResetHeartTime()
				args := AppendEntriesArgs{
					Term:     rf.currentTerm,
					LeaderId: rf.me,
				}
				for i := 0; i < len(rf.peers); i++ {
					if i == rf.me {
						continue
					}
					resp := AppendEntriesResp{}
					go rf.sendAppendEntries(i, args, &resp)
				}
			}
			rf.mu.Unlock()
		}
	}
}

该协程专门监听超时管道的消息,处理情况按照当前所处的状态分为三种:

  • 当状态为follower时,发生超时切换为候选者,执行和候选者相同的动作
  • 当状态为候选者时,开始拉票:设置自身状态,然后向其他节点发送投票的rpc请求,当超过一半的节点同意时,自动晋升为leader,重新设置超时时间为心跳时间
  • 当状态为leader时直接发送心跳到其他节点,其他节点收到之后重置超时时间

投票

type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
	// Your data here (2A).
	Term int
	Ok   bool
}

// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	reply.Ok = false
	reply.Term = rf.currentTerm
	if args.Term < rf.currentTerm {
		return
	} else if args.Term > rf.currentTerm {
		reply.Ok = true
		rf.status = Follower
		rf.votedFor = args.CandidateId
		rf.timer.ResetTimeOut()
		rf.currentTerm = args.Term
		return
	} else {
		if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
			rf.status = Follower
			rf.votedFor = args.CandidateId
			reply.Ok = true
			rf.timer.ResetTimeOut()
		}
	}
}

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	if ok := rf.peers[server].Call("Raft.RequestVote", args, reply); !ok {
		for !ok {
			ok = rf.peers[server].Call("Raft.RequestVote", args, reply)
		}
	}
	if args.Term < rf.currentTerm {
		return false
	}
	if reply.Ok {
		rf.mu.Lock()
		defer rf.mu.Unlock()
		rf.voteAgreeCnt++
		if rf.voteAgreeCnt >= (len(rf.peers)+1)/2 {
			rf.status = Leader
			rf.timer.ResetHeartTime()
		}
	}
	return reply.Ok
}

关于是否投票,根据term可以分为三个情况:

如果当前节点 尚未投票(votedFor 为空),则可以继续检查日志新旧。

如果已经投票给 其他候选者,则直接拒绝(即使日志更新也不行)。

如果已经投票给 同一个候选者(例如重复收到请求),可以再次同意(但实际实现中通常忽略重复请求)。

心跳

type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PervLogIndex int
	PervLogTerm  int
	LogEntries   []LogEntry
}
type AppendEntriesResp struct {
	Term int
	Ok   bool
}
type LogEntry struct {
	Cmd  interface{}
	Term int
}

func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, resp *AppendEntriesResp) {
	if ok := rf.peers[server].Call("Raft.AppendEntries", args, resp); !ok {
		for !ok {
			ok = rf.peers[server].Call("Raft.AppendEntries", args, resp)
		}
	}
}

func (rf *Raft) AppendEntries(args AppendEntriesArgs, resp *AppendEntriesResp) {
	if args.Term < rf.currentTerm {
		return
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	rf.status = Follower
	rf.timer.ResetTimeOut()
	rf.currentTerm = args.Term
}

网站公告

今日签到

点亮在社区的每一天
去签到