MIT6.824 Raft算法Lab2A实验
实验目标
掌握raft的leader选举算法
结果:完成src/raft/test_test.go中带有2A名称的测试函数
test1
func TestInitialElection2A(t *testing.T) {servers := 3cfg := make_config(t, servers, false)defer cfg.cleanup()cfg.begin("Test (2A): initial election")// is a leader elected?cfg.checkOneLeader()// sleep a bit to avoid racing with followers learning of the// election, then check that all peers agree on the term.time.Sleep(50 * time.Millisecond)term1 := cfg.checkTerms()// does the leader+term stay the same if there is no network failure?time.Sleep(2 * RaftElectionTimeout)term2 := cfg.checkTerms()if term1 != term2 {fmt.Printf("warning: term changed even though there were no failures")}// there should still be a leader.cfg.checkOneLeader()cfg.end()
}
- 初始选举检查
- Term 一致性检查
- Leader 稳定性检查
- 最终 Leader 检查
test 2
func TestReElection2A(t *testing.T) {servers := 3cfg := make_config(t, servers, false)defer cfg.cleanup()cfg.begin("Test (2A): election after network failure")leader1 := cfg.checkOneLeader()// if the leader disconnects, a new one should be elected.cfg.disconnect(leader1)cfg.checkOneLeader()// if the old leader rejoins, that shouldn't// disturb the new leader.cfg.connect(leader1)leader2 := cfg.checkOneLeader()// if there's no quorum, no leader should// be elected.cfg.disconnect(leader2)cfg.disconnect((leader2 + 1) % servers)time.Sleep(2 * RaftElectionTimeout)cfg.checkNoLeader()// if a quorum arises, it should elect a leader.cfg.connect((leader2 + 1) % servers)cfg.checkOneLeader()// re-join of last node shouldn't prevent leader from existing.cfg.connect(leader2)cfg.checkOneLeader()cfg.end()
}
选出一个leader,断开这个leader,重新连接这个leader,防止一个集群产生两个leader、
大纲
需要完成的部分:
- raft的初始化
- 维护自身的定时器,在超时时做出对应的动作
- 投票的服务和发送
- 心跳的服务和发送
raft的初始化
type Raft struct {mu sync.Mutex // Lock to protect shared access to this peer's statepeers []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister // Object to hold this peer's persisted stateme int // this peer's index into peers[]// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.currentTerm int //当前的任期votedFor int // 投给了谁logs []LogEntry // 日志条目commitIndex int //已经提交的日志下标lastApplied int //最后应用的日志下标nextIndex []int //server:每个peer的下一条日志索引matchIndex []int //server:已经提交的日志索引status inttimer TimervoteAgreeCnt int
}
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).rf.currentTerm = 0rf.votedFor = -1rf.logs = make([]LogEntry, 0)rf.commitIndex = -1rf.lastApplied = -1rf.nextIndex = make([]int, len(peers))rf.matchIndex = make([]int, len(peers))rf.status = Followerrf.timer = Timer{ticker: time.NewTicker(time.Duration(150+rand.Intn(200)) * time.Millisecond)}// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())go rf.HandleTimeOut()return rf
}
其中启动了一个rf.HandleTimeOut协程来处理超时情况
func (rf *Raft) HandleTimeOut() {for {select {case <-rf.timer.ticker.C:rf.mu.Lock()if rf.status == Follower || rf.status == Candidate {if rf.status == Follower {rf.status = Candidate}rf.currentTerm++rf.votedFor = rf.merf.timer.ResetTimeOut()rf.voteAgreeCnt = 1args := RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me,LastLogIndex: rf.commitIndex,}if len(rf.logs) > 0 {args.LastLogTerm = rf.logs[len(rf.logs)-1].Term}for i, _ := range rf.peers {if i == rf.me {continue}resp := RequestVoteReply{}go rf.sendRequestVote(i, &args, &resp)}} else if rf.status == Leader {rf.timer.ResetHeartTime()args := AppendEntriesArgs{Term: rf.currentTerm,LeaderId: rf.me,}for i := 0; i < len(rf.peers); i++ {if i == rf.me {continue}resp := AppendEntriesResp{}go rf.sendAppendEntries(i, args, &resp)}}rf.mu.Unlock()}}
}
该协程专门监听超时管道的消息,处理情况按照当前所处的状态分为三种:
- 当状态为follower时,发生超时切换为候选者,执行和候选者相同的动作
- 当状态为候选者时,开始拉票:设置自身状态,然后向其他节点发送投票的rpc请求,当超过一半的节点同意时,自动晋升为leader,重新设置超时时间为心跳时间
- 当状态为leader时直接发送心跳到其他节点,其他节点收到之后重置超时时间
投票
type RequestVoteArgs struct {// Your data here (2A, 2B).Term intCandidateId intLastLogIndex intLastLogTerm int
}// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {// Your data here (2A).Term intOk bool
}// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()reply.Ok = falsereply.Term = rf.currentTermif args.Term < rf.currentTerm {return} else if args.Term > rf.currentTerm {reply.Ok = truerf.status = Followerrf.votedFor = args.CandidateIdrf.timer.ResetTimeOut()rf.currentTerm = args.Termreturn} else {if rf.votedFor == -1 || rf.votedFor == args.CandidateId {rf.status = Followerrf.votedFor = args.CandidateIdreply.Ok = truerf.timer.ResetTimeOut()}}
}func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {if ok := rf.peers[server].Call("Raft.RequestVote", args, reply); !ok {for !ok {ok = rf.peers[server].Call("Raft.RequestVote", args, reply)}}if args.Term < rf.currentTerm {return false}if reply.Ok {rf.mu.Lock()defer rf.mu.Unlock()rf.voteAgreeCnt++if rf.voteAgreeCnt >= (len(rf.peers)+1)/2 {rf.status = Leaderrf.timer.ResetHeartTime()}}return reply.Ok
}
关于是否投票,根据term可以分为三个情况:
如果当前节点 尚未投票(votedFor 为空),则可以继续检查日志新旧。
如果已经投票给 其他候选者,则直接拒绝(即使日志更新也不行)。
如果已经投票给 同一个候选者(例如重复收到请求),可以再次同意(但实际实现中通常忽略重复请求)。
心跳
type AppendEntriesArgs struct {Term intLeaderId intPervLogIndex intPervLogTerm intLogEntries []LogEntry
}
type AppendEntriesResp struct {Term intOk bool
}
type LogEntry struct {Cmd interface{}Term int
}func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, resp *AppendEntriesResp) {if ok := rf.peers[server].Call("Raft.AppendEntries", args, resp); !ok {for !ok {ok = rf.peers[server].Call("Raft.AppendEntries", args, resp)}}
}func (rf *Raft) AppendEntries(args AppendEntriesArgs, resp *AppendEntriesResp) {if args.Term < rf.currentTerm {return}rf.mu.Lock()defer rf.mu.Unlock()rf.status = Followerrf.timer.ResetTimeOut()rf.currentTerm = args.Term
}