当前位置: 首页 > news >正文

MIT6.824 Raft算法Lab2A实验

实验目标

掌握raft的leader选举算法
结果:完成src/raft/test_test.go中带有2A名称的测试函数

test1

func TestInitialElection2A(t *testing.T) {servers := 3cfg := make_config(t, servers, false)defer cfg.cleanup()cfg.begin("Test (2A): initial election")// is a leader elected?cfg.checkOneLeader()// sleep a bit to avoid racing with followers learning of the// election, then check that all peers agree on the term.time.Sleep(50 * time.Millisecond)term1 := cfg.checkTerms()// does the leader+term stay the same if there is no network failure?time.Sleep(2 * RaftElectionTimeout)term2 := cfg.checkTerms()if term1 != term2 {fmt.Printf("warning: term changed even though there were no failures")}// there should still be a leader.cfg.checkOneLeader()cfg.end()
}
  • 初始选举检查
  • Term 一致性检查
  • Leader 稳定性检查
  • 最终 Leader 检查

test 2

func TestReElection2A(t *testing.T) {servers := 3cfg := make_config(t, servers, false)defer cfg.cleanup()cfg.begin("Test (2A): election after network failure")leader1 := cfg.checkOneLeader()// if the leader disconnects, a new one should be elected.cfg.disconnect(leader1)cfg.checkOneLeader()// if the old leader rejoins, that shouldn't// disturb the new leader.cfg.connect(leader1)leader2 := cfg.checkOneLeader()// if there's no quorum, no leader should// be elected.cfg.disconnect(leader2)cfg.disconnect((leader2 + 1) % servers)time.Sleep(2 * RaftElectionTimeout)cfg.checkNoLeader()// if a quorum arises, it should elect a leader.cfg.connect((leader2 + 1) % servers)cfg.checkOneLeader()// re-join of last node shouldn't prevent leader from existing.cfg.connect(leader2)cfg.checkOneLeader()cfg.end()
}

选出一个leader,断开这个leader,重新连接这个leader,防止一个集群产生两个leader、

大纲

需要完成的部分:

  • raft的初始化
  • 维护自身的定时器,在超时时做出对应的动作
  • 投票的服务和发送
  • 心跳的服务和发送

raft的初始化

type Raft struct {mu        sync.Mutex          // Lock to protect shared access to this peer's statepeers     []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister          // Object to hold this peer's persisted stateme        int                 // this peer's index into peers[]// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.currentTerm int        //当前的任期votedFor    int        // 投给了谁logs        []LogEntry // 日志条目commitIndex int        //已经提交的日志下标lastApplied int        //最后应用的日志下标nextIndex   []int      //server:每个peer的下一条日志索引matchIndex  []int      //server:已经提交的日志索引status       inttimer        TimervoteAgreeCnt int
}
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).rf.currentTerm = 0rf.votedFor = -1rf.logs = make([]LogEntry, 0)rf.commitIndex = -1rf.lastApplied = -1rf.nextIndex = make([]int, len(peers))rf.matchIndex = make([]int, len(peers))rf.status = Followerrf.timer = Timer{ticker: time.NewTicker(time.Duration(150+rand.Intn(200)) * time.Millisecond)}// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())go rf.HandleTimeOut()return rf
}

其中启动了一个rf.HandleTimeOut协程来处理超时情况

func (rf *Raft) HandleTimeOut() {for {select {case <-rf.timer.ticker.C:rf.mu.Lock()if rf.status == Follower || rf.status == Candidate {if rf.status == Follower {rf.status = Candidate}rf.currentTerm++rf.votedFor = rf.merf.timer.ResetTimeOut()rf.voteAgreeCnt = 1args := RequestVoteArgs{Term:         rf.currentTerm,CandidateId:  rf.me,LastLogIndex: rf.commitIndex,}if len(rf.logs) > 0 {args.LastLogTerm = rf.logs[len(rf.logs)-1].Term}for i, _ := range rf.peers {if i == rf.me {continue}resp := RequestVoteReply{}go rf.sendRequestVote(i, &args, &resp)}} else if rf.status == Leader {rf.timer.ResetHeartTime()args := AppendEntriesArgs{Term:     rf.currentTerm,LeaderId: rf.me,}for i := 0; i < len(rf.peers); i++ {if i == rf.me {continue}resp := AppendEntriesResp{}go rf.sendAppendEntries(i, args, &resp)}}rf.mu.Unlock()}}
}

该协程专门监听超时管道的消息,处理情况按照当前所处的状态分为三种:

  • 当状态为follower时,发生超时切换为候选者,执行和候选者相同的动作
  • 当状态为候选者时,开始拉票:设置自身状态,然后向其他节点发送投票的rpc请求,当超过一半的节点同意时,自动晋升为leader,重新设置超时时间为心跳时间
  • 当状态为leader时直接发送心跳到其他节点,其他节点收到之后重置超时时间

投票

type RequestVoteArgs struct {// Your data here (2A, 2B).Term         intCandidateId  intLastLogIndex intLastLogTerm  int
}// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {// Your data here (2A).Term intOk   bool
}// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()reply.Ok = falsereply.Term = rf.currentTermif args.Term < rf.currentTerm {return} else if args.Term > rf.currentTerm {reply.Ok = truerf.status = Followerrf.votedFor = args.CandidateIdrf.timer.ResetTimeOut()rf.currentTerm = args.Termreturn} else {if rf.votedFor == -1 || rf.votedFor == args.CandidateId {rf.status = Followerrf.votedFor = args.CandidateIdreply.Ok = truerf.timer.ResetTimeOut()}}
}func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {if ok := rf.peers[server].Call("Raft.RequestVote", args, reply); !ok {for !ok {ok = rf.peers[server].Call("Raft.RequestVote", args, reply)}}if args.Term < rf.currentTerm {return false}if reply.Ok {rf.mu.Lock()defer rf.mu.Unlock()rf.voteAgreeCnt++if rf.voteAgreeCnt >= (len(rf.peers)+1)/2 {rf.status = Leaderrf.timer.ResetHeartTime()}}return reply.Ok
}

关于是否投票,根据term可以分为三个情况:

如果当前节点 尚未投票(votedFor 为空),则可以继续检查日志新旧。

如果已经投票给 其他候选者,则直接拒绝(即使日志更新也不行)。

如果已经投票给 同一个候选者(例如重复收到请求),可以再次同意(但实际实现中通常忽略重复请求)。

心跳

type AppendEntriesArgs struct {Term         intLeaderId     intPervLogIndex intPervLogTerm  intLogEntries   []LogEntry
}
type AppendEntriesResp struct {Term intOk   bool
}
type LogEntry struct {Cmd  interface{}Term int
}func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, resp *AppendEntriesResp) {if ok := rf.peers[server].Call("Raft.AppendEntries", args, resp); !ok {for !ok {ok = rf.peers[server].Call("Raft.AppendEntries", args, resp)}}
}func (rf *Raft) AppendEntries(args AppendEntriesArgs, resp *AppendEntriesResp) {if args.Term < rf.currentTerm {return}rf.mu.Lock()defer rf.mu.Unlock()rf.status = Followerrf.timer.ResetTimeOut()rf.currentTerm = args.Term
}
http://www.dtcms.com/a/294400.html

相关文章:

  • 基于阿里云平台的文章评价模型训练与应用全流程指南
  • CASAIM自动蓝光检测供应商三维测量系统近线检测汽车变速箱尺寸
  • Java SE:类与对象的认识
  • 【kubernetes】-3 pod基础和yaml文件
  • 记录一本设计模式的书
  • vue3与ue5通信-工具类
  • [C/C++内存安全]_[中级]_[安全处理字符串]
  • ctfshow pwn40
  • 保护板测试仪:守护电池安全的“幕后卫士”
  • 关于SPring基础和Vue的学习
  • Docker 容器中的 HEAD 请求缺失 header?从 Content-MD5 缺失聊起
  • 超声原始数据重构成B扫成像的MATLAB实现
  • 【AI News | 20250722】每日AI进展
  • now能减少mysql的压力吗
  • 【Android】用 ViewPager2 + Fragment + TabLayout 实现标签页切换
  • linux性能调整和故障排查
  • LeetCode热题100--24. 两两交换链表中的节点--中等
  • Linux文件——Ext2文件系统(3)_软硬链接
  • Ubuntu 1804 编译ffmpeg qsv MediaSDK libva 遇到的问题记录
  • #Linux内存管理# 详细介绍madvise函数的工作原理
  • Elasticsearch(ES)安装
  • 分布式电商系统:缓存策略、负载均衡与容灾方案
  • 解决 Electron 中 window.open 打开新窗口的各种“坑”
  • Python 程序设计讲义(6):Python 的基本用法——运算符与表达式
  • API 汇总:ONLYOFFICE 文档最近更新
  • 背包DP之0/1背包
  • 11-1 浅层神经网络及计算前向传播
  • 局部重要性注意力LIA,通过区域重要性图与门控机制实现高阶信息交互,自适应增强有用特征、抑制冗余信息,平衡模型性能与效率。
  • VR-Doh: 革新3D建模的虚拟现实体验
  • DPVR亮相青岛品牌日,崂山科创力量引领AI眼镜新浪潮