实验目标
掌握raft的leader选举算法
结果:完成src/raft/test_test.go中带有2A名称的测试函数
test1
func TestInitialElection2A(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2A): initial election")
// is a leader elected?
cfg.checkOneLeader()
// sleep a bit to avoid racing with followers learning of the
// election, then check that all peers agree on the term.
time.Sleep(50 * time.Millisecond)
term1 := cfg.checkTerms()
// does the leader+term stay the same if there is no network failure?
time.Sleep(2 * RaftElectionTimeout)
term2 := cfg.checkTerms()
if term1 != term2 {
fmt.Printf("warning: term changed even though there were no failures")
}
// there should still be a leader.
cfg.checkOneLeader()
cfg.end()
}
- 初始选举检查
- Term 一致性检查
- Leader 稳定性检查
- 最终 Leader 检查
test 2
func TestReElection2A(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2A): election after network failure")
leader1 := cfg.checkOneLeader()
// if the leader disconnects, a new one should be elected.
cfg.disconnect(leader1)
cfg.checkOneLeader()
// if the old leader rejoins, that shouldn't
// disturb the new leader.
cfg.connect(leader1)
leader2 := cfg.checkOneLeader()
// if there's no quorum, no leader should
// be elected.
cfg.disconnect(leader2)
cfg.disconnect((leader2 + 1) % servers)
time.Sleep(2 * RaftElectionTimeout)
cfg.checkNoLeader()
// if a quorum arises, it should elect a leader.
cfg.connect((leader2 + 1) % servers)
cfg.checkOneLeader()
// re-join of last node shouldn't prevent leader from existing.
cfg.connect(leader2)
cfg.checkOneLeader()
cfg.end()
}
选出一个leader,断开这个leader,重新连接这个leader,防止一个集群产生两个leader、
大纲
需要完成的部分:
- raft的初始化
- 维护自身的定时器,在超时时做出对应的动作
- 投票的服务和发送
- 心跳的服务和发送
raft的初始化
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int //当前的任期
votedFor int // 投给了谁
logs []LogEntry // 日志条目
commitIndex int //已经提交的日志下标
lastApplied int //最后应用的日志下标
nextIndex []int //server:每个peer的下一条日志索引
matchIndex []int //server:已经提交的日志索引
status int
timer Timer
voteAgreeCnt int
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.currentTerm = 0
rf.votedFor = -1
rf.logs = make([]LogEntry, 0)
rf.commitIndex = -1
rf.lastApplied = -1
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
rf.status = Follower
rf.timer = Timer{ticker: time.NewTicker(time.Duration(150+rand.Intn(200)) * time.Millisecond)}
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
go rf.HandleTimeOut()
return rf
}
其中启动了一个rf.HandleTimeOut协程来处理超时情况
func (rf *Raft) HandleTimeOut() {
for {
select {
case <-rf.timer.ticker.C:
rf.mu.Lock()
if rf.status == Follower || rf.status == Candidate {
if rf.status == Follower {
rf.status = Candidate
}
rf.currentTerm++
rf.votedFor = rf.me
rf.timer.ResetTimeOut()
rf.voteAgreeCnt = 1
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.commitIndex,
}
if len(rf.logs) > 0 {
args.LastLogTerm = rf.logs[len(rf.logs)-1].Term
}
for i, _ := range rf.peers {
if i == rf.me {
continue
}
resp := RequestVoteReply{}
go rf.sendRequestVote(i, &args, &resp)
}
} else if rf.status == Leader {
rf.timer.ResetHeartTime()
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
}
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
resp := AppendEntriesResp{}
go rf.sendAppendEntries(i, args, &resp)
}
}
rf.mu.Unlock()
}
}
}
该协程专门监听超时管道的消息,处理情况按照当前所处的状态分为三种:
- 当状态为follower时,发生超时切换为候选者,执行和候选者相同的动作
- 当状态为候选者时,开始拉票:设置自身状态,然后向其他节点发送投票的rpc请求,当超过一半的节点同意时,自动晋升为leader,重新设置超时时间为心跳时间
- 当状态为leader时直接发送心跳到其他节点,其他节点收到之后重置超时时间
投票
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
// Your data here (2A).
Term int
Ok bool
}
// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Ok = false
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
return
} else if args.Term > rf.currentTerm {
reply.Ok = true
rf.status = Follower
rf.votedFor = args.CandidateId
rf.timer.ResetTimeOut()
rf.currentTerm = args.Term
return
} else {
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
rf.status = Follower
rf.votedFor = args.CandidateId
reply.Ok = true
rf.timer.ResetTimeOut()
}
}
}
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
if ok := rf.peers[server].Call("Raft.RequestVote", args, reply); !ok {
for !ok {
ok = rf.peers[server].Call("Raft.RequestVote", args, reply)
}
}
if args.Term < rf.currentTerm {
return false
}
if reply.Ok {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.voteAgreeCnt++
if rf.voteAgreeCnt >= (len(rf.peers)+1)/2 {
rf.status = Leader
rf.timer.ResetHeartTime()
}
}
return reply.Ok
}
关于是否投票,根据term可以分为三个情况:
如果当前节点 尚未投票(votedFor 为空),则可以继续检查日志新旧。
如果已经投票给 其他候选者,则直接拒绝(即使日志更新也不行)。
如果已经投票给 同一个候选者(例如重复收到请求),可以再次同意(但实际实现中通常忽略重复请求)。
心跳
type AppendEntriesArgs struct {
Term int
LeaderId int
PervLogIndex int
PervLogTerm int
LogEntries []LogEntry
}
type AppendEntriesResp struct {
Term int
Ok bool
}
type LogEntry struct {
Cmd interface{}
Term int
}
func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, resp *AppendEntriesResp) {
if ok := rf.peers[server].Call("Raft.AppendEntries", args, resp); !ok {
for !ok {
ok = rf.peers[server].Call("Raft.AppendEntries", args, resp)
}
}
}
func (rf *Raft) AppendEntries(args AppendEntriesArgs, resp *AppendEntriesResp) {
if args.Term < rf.currentTerm {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
rf.status = Follower
rf.timer.ResetTimeOut()
rf.currentTerm = args.Term
}