mit6.824 2024spring Lab3A Raft
基本状态
const (Follower = iotaCandidateLeader
)
节点的字段
type Raft struct {mu sync.Mutex // 保护共享状态的锁peers []*labrpc.ClientEnd // 所有节点的RPC端点persister *Persister // 持久化存储me int // 本节点索引// Raft状态state int // Follower/Candidate/LeadercurrentTerm int // 当前任期votedFor int // 当前任期投给谁log []Entry // 日志条目// 选举相关voteCount int // 当前获得的票数timeStamp time.Time // 上次收到消息的时间// Leader专用nextIndex []int // 每个follower的下一个日志索引matchIndex []int // 每个follower已复制的最高日志索引// 提交和应用commitIndex int // 已提交的最高日志索引lastApplied int // 已应用的最高日志索引
}
状态转换:
- Follower → Candidate:选举超时
- Candidate → Leader:获得多数票
- Candidate → Follower:发现更高任期
- Leader → Follower:发现更高任期
几个核心方法
Elect 选举过程
func (rf *Raft) Elect() {rf.mu.Lock()// 自增term,成为候选人,给自己投票rf.currentTerm += 1rf.state = Candidaterf.votedFor = rf.merf.voteCount = 1rf.timeStamp = time.Now() // 重置超时计时器args := &RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me,LastLogIndex: len(rf.log) - 1,LastLogTerm: rf.log[len(rf.log)-1].Term,}rf.mu.Unlock()// 向所有其他节点请求投票for i := 0; i < len(rf.peers); i++ {if i == rf.me { continue }go rf.collectVote(i, args) // 并发收集投票}
}
收集选票
collectVote
func (rf *Raft) collectVote(serverTo int, args *RequestVoteArgs) {voteAnswer := rf.GetVoteAnswer(serverTo, args)if !voteAnswer { return }rf.muVote.Lock()defer rf.muVote.Unlock()// 检查是否已获得多数票if rf.voteCount > len(rf.peers)/2 {return // 已经是Leader或已被否决}rf.voteCount += 1if rf.voteCount > len(rf.peers)/2 {rf.mu.Lock()if rf.state == Follower {// 已被其他协程转为Followerrf.mu.Unlock()return}rf.state = Leader // 成为Leaderrf.mu.Unlock()go rf.SendHeartBeats() // 开始发送心跳}
}
GetVoteAnswer
func (rf *Raft) GetVoteAnswer(server int, args *RequestVoteArgs) bool {sendArgs := *argsreply := RequestVoteReply{}ok := rf.sendRequestVote(server, &sendArgs, &reply) // RPC调用if !ok {return false}rf.mu.Lock()defer rf.mu.Unlock()// 检查发送请求时的任期是否与当前任期一致。如果不一致,说明在此期间节点已更新到更高任期,此响应已过时。if sendArgs.Term != rf.currentTerm {return false}// 处理更高任期的响应if reply.Term > rf.currentTerm {rf.currentTerm = reply.Term // 更新到更高任期rf.votedFor = -1 // 重置投票记录rf.state = Follower // 退化为跟随者}return reply.VoteGranted
}
SendHeartBeats 心跳
func (rf *Raft) SendHeartBeats() {for !rf.killed() {rf.mu.Lock()if rf.state != Leader {rf.mu.Unlock()return // 不再是Leader,停止发送}args := &AppendEntriesArgs{Term: rf.currentTerm,LeaderId: rf.me,PrevLogIndex: 0, // 简化实现PrevLogTerm: 0, // 简化实现Entries: nil, // 空 entries 表示心跳LeaderCommit: rf.commitIndex,}rf.mu.Unlock()// 向所有follower发送心跳for i := 0; i < len(rf.peers); i++ {if i == rf.me { continue }go rf.handleHeartBeat(i, args)}time.Sleep(time.Duration(HeartBeatTimeOut) * time.Millisecond)}
}
RequestVote RPC 处理投票请求
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {rf.mu.Lock()defer rf.mu.Unlock()// 拒绝旧任期的请求if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.VoteGranted = falsereturn}// 发现更高任期,转为Followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1rf.state = Follower}// 检查日志是否至少一样新lastLogIndex := len(rf.log) - 1lastLogTerm := rf.log[lastLogIndex].TermlogOk := (args.LastLogTerm > lastLogTerm) ||(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)// 检查是否可以投票voteOk := (rf.votedFor == -1 || rf.votedFor == args.CandidateId)if voteOk && logOk {rf.votedFor = args.CandidateIdrf.timeStamp = time.Now() // 重置超时计时器reply.VoteGranted = true} else {reply.VoteGranted = false}reply.Term = rf.currentTerm
}
AppendEntries RPC 处理心跳
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()// 拒绝旧任期的请求if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}// 发现更高任期,转为Followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1rf.state = Follower}// 重置超时计时器rf.timeStamp = time.Now()// 检查日志一致性(简化实现)if args.Entries != nil && // 如果是日志条目而非心跳(args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm) {reply.Success = falsereturn}// 更新提交索引if args.LeaderCommit > rf.commitIndex {rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)}reply.Success = truereply.Term = rf.currentTerm
}
tricker 超时检测
func (rf *Raft) ticker() {rd := rand.New(rand.NewSource(int64(rf.me)))for !rf.killed() {// 随机选举超时时间(防止多个节点同时发起选举)rdTimeOut := ElectTimeOutBase + rd.Intn(300)rf.mu.Lock()// 检查是否超时且不是Leaderif rf.state != Leader && time.Since(rf.timeStamp) >= time.Duration(rdTimeOut)*time.Millisecond {go rf.Elect() // 发起选举}rf.mu.Unlock()time.Sleep(ElectTimeOutCheckInterval)}
}