当前位置: 首页 > news >正文

mit6.824 2024spring Lab3A Raft

基本状态

const (Follower = iotaCandidateLeader
)

节点的字段

type Raft struct {mu        sync.Mutex          // 保护共享状态的锁peers     []*labrpc.ClientEnd // 所有节点的RPC端点persister *Persister          // 持久化存储me        int                 // 本节点索引// Raft状态state       int        // Follower/Candidate/LeadercurrentTerm int        // 当前任期votedFor    int        // 当前任期投给谁log         []Entry    // 日志条目// 选举相关voteCount int        // 当前获得的票数timeStamp time.Time  // 上次收到消息的时间// Leader专用nextIndex  []int // 每个follower的下一个日志索引matchIndex []int // 每个follower已复制的最高日志索引// 提交和应用commitIndex int // 已提交的最高日志索引lastApplied int // 已应用的最高日志索引
}

状态转换:

  • Follower → Candidate:选举超时
  • Candidate → Leader:获得多数票
  • Candidate → Follower:发现更高任期
  • Leader → Follower:发现更高任期

几个核心方法

Elect 选举过程

func (rf *Raft) Elect() {rf.mu.Lock()// 自增term,成为候选人,给自己投票rf.currentTerm += 1rf.state = Candidaterf.votedFor = rf.merf.voteCount = 1rf.timeStamp = time.Now() // 重置超时计时器args := &RequestVoteArgs{Term:         rf.currentTerm,CandidateId:  rf.me,LastLogIndex: len(rf.log) - 1,LastLogTerm:  rf.log[len(rf.log)-1].Term,}rf.mu.Unlock()// 向所有其他节点请求投票for i := 0; i < len(rf.peers); i++ {if i == rf.me { continue }go rf.collectVote(i, args) // 并发收集投票}
}

收集选票

collectVote

func (rf *Raft) collectVote(serverTo int, args *RequestVoteArgs) {voteAnswer := rf.GetVoteAnswer(serverTo, args)if !voteAnswer { return }rf.muVote.Lock()defer rf.muVote.Unlock()// 检查是否已获得多数票if rf.voteCount > len(rf.peers)/2 {return // 已经是Leader或已被否决}rf.voteCount += 1if rf.voteCount > len(rf.peers)/2 {rf.mu.Lock()if rf.state == Follower {// 已被其他协程转为Followerrf.mu.Unlock()return}rf.state = Leader // 成为Leaderrf.mu.Unlock()go rf.SendHeartBeats() // 开始发送心跳}
}

GetVoteAnswer

func (rf *Raft) GetVoteAnswer(server int, args *RequestVoteArgs) bool {sendArgs := *argsreply := RequestVoteReply{}ok := rf.sendRequestVote(server, &sendArgs, &reply) // RPC调用if !ok {return false}rf.mu.Lock()defer rf.mu.Unlock()// 检查发送请求时的任期是否与当前任期一致。如果不一致,说明在此期间节点已更新到更高任期,此响应已过时。if sendArgs.Term != rf.currentTerm {return false}// 处理更高任期的响应if reply.Term > rf.currentTerm {rf.currentTerm = reply.Term // 更新到更高任期rf.votedFor = -1            // 重置投票记录rf.state = Follower         // 退化为跟随者}return reply.VoteGranted
}

SendHeartBeats 心跳

func (rf *Raft) SendHeartBeats() {for !rf.killed() {rf.mu.Lock()if rf.state != Leader {rf.mu.Unlock()return // 不再是Leader,停止发送}args := &AppendEntriesArgs{Term:         rf.currentTerm,LeaderId:     rf.me,PrevLogIndex: 0,    // 简化实现PrevLogTerm:  0,    // 简化实现Entries:      nil,  // 空 entries 表示心跳LeaderCommit: rf.commitIndex,}rf.mu.Unlock()// 向所有follower发送心跳for i := 0; i < len(rf.peers); i++ {if i == rf.me { continue }go rf.handleHeartBeat(i, args)}time.Sleep(time.Duration(HeartBeatTimeOut) * time.Millisecond)}
}

RequestVote RPC 处理投票请求

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {rf.mu.Lock()defer rf.mu.Unlock()// 拒绝旧任期的请求if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.VoteGranted = falsereturn}// 发现更高任期,转为Followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1rf.state = Follower}// 检查日志是否至少一样新lastLogIndex := len(rf.log) - 1lastLogTerm := rf.log[lastLogIndex].TermlogOk := (args.LastLogTerm > lastLogTerm) ||(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)// 检查是否可以投票voteOk := (rf.votedFor == -1 || rf.votedFor == args.CandidateId)if voteOk && logOk {rf.votedFor = args.CandidateIdrf.timeStamp = time.Now() // 重置超时计时器reply.VoteGranted = true} else {reply.VoteGranted = false}reply.Term = rf.currentTerm
}

AppendEntries RPC 处理心跳

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()// 拒绝旧任期的请求if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}// 发现更高任期,转为Followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.votedFor = -1rf.state = Follower}// 重置超时计时器rf.timeStamp = time.Now()// 检查日志一致性(简化实现)if args.Entries != nil && // 如果是日志条目而非心跳(args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm) {reply.Success = falsereturn}// 更新提交索引if args.LeaderCommit > rf.commitIndex {rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)}reply.Success = truereply.Term = rf.currentTerm
}

tricker 超时检测

func (rf *Raft) ticker() {rd := rand.New(rand.NewSource(int64(rf.me)))for !rf.killed() {// 随机选举超时时间(防止多个节点同时发起选举)rdTimeOut := ElectTimeOutBase + rd.Intn(300)rf.mu.Lock()// 检查是否超时且不是Leaderif rf.state != Leader && time.Since(rf.timeStamp) >= time.Duration(rdTimeOut)*time.Millisecond {go rf.Elect() // 发起选举}rf.mu.Unlock()time.Sleep(ElectTimeOutCheckInterval)}
}
http://www.dtcms.com/a/359159.html

相关文章:

  • 简说DDPM
  • C语言---零碎语法知识补充(队列、函数指针、左移右移、任务标识符)
  • 机器人控制器开发(底层模块)——rk3588s 的 CAN 配置
  • 码农特供版《消费者权益保护法》逆向工程指北——附源码级注释与异常处理方案
  • 人工智能训练师复习题目实操题2.2.1 - 2.2.5
  • 手表--带屏幕音响-时间制切换12/24小时
  • PS学习笔记
  • 【15】VisionMaster入门到精通——--通信--TCP通信、UDP通信、串口通信、PLC通信、ModBus通信
  • 计算机算术7-浮点基础知识
  • 面经分享--小米Java一面
  • 青年教师发展(中科院软件所-田丰)
  • Dify 从入门到精通(第 65/100 篇):Dify 的自动化测试(进阶篇)
  • MCP与A2A的应用
  • LightGBM(Light Gradient Boosting Machine,轻量级梯度提升机)梳理总结
  • 【AI工具】在 VSCode中安装使用Excalidraw
  • 【69页PPT】智慧工厂数字化工厂蓝图规划建设方案(附下载方式)
  • 基于 Kubernetes 的 Ollama DeepSeek-R1 模型部署
  • 内存管理(智能指针,内存对齐,野指针,悬空指针)
  • Java中Integer转String
  • 为什么企业需要项目管理
  • 安卓编程 之 线性布局
  • 树莓派4B 安装中文输入法
  • AtCoder Beginner Contest 421
  • Mysql 学习day 2 深入理解Mysql索引底层数据结构
  • 【开题答辩全过程】以 基于WEB的茶文化科普系统的设计与实现为例,包含答辩的问题和答案
  • 用简单仿真链路产生 WiFi CSI(不依赖专用工具箱,matlab实现)
  • 面试tips--MyBatis--<where> where 1=1 的区别
  • 如何查看Linux系统中文件夹或文件的大小
  • 【LeetCode - 每日1题】有效的数独
  • SQLSugar 快速入门:从基础到实战查询与使用指南