当前位置: 首页 > news >正文

mit6.5840-lab4C-Snapshot-25Summer

文章目录

  • 一、前言
  • 二、实验内容
  • 三、实验思路
    • 1.实现Snapshot和Restore方法
    • 2.何时调用Snapshot和Restore
    • 3.如何实现线性一致性
  • 四、实验中出现的问题
  • 五、后言

一、前言

这一部分的代码也是写了两天,主要的原因我觉得是因为对这部分的逻辑理解的不够,闷着头写的,导致后面会有小错误,重写之后经常出现大多数测试可以通过,而某一个测试过不去。当大家没有思路的时候,建议去反复的阅读实验要求,经常可以发现新的东西。
请添加图片描述


二、实验内容

  • 目前,你的键/值服务器不会调用 Raft 库的Snapshot()方法,因此重启的服务器必须重放完整的持久化 Raft 日志才能恢复其状态。现在,你将修改 kvserver 和rsm,使其与 Raft 配合使用,以节省日志空间并减少重启时间,并使用实验 3D 中 Raft 的Snapshot()方法。

  • 测试程序将maxraftstate传递给 StartKVServer(),StartKVServer() 再将其传递给rsm 。maxraftstate表示持久化 Raft 状态的最大允许大小(以字节为单位)(包含日志,但不包括快照)。您应该将 maxraftstate 与 rf.PersistBytes() 进行比较。每当 您的rsm检测 到 Raft 状态大小接近此阈值时,它应该通过调用 Raft 的Snapshot来保存快照。rsm 可以通过调用StateMachine接口的Snapshot方法 来获取 kvserver 的快照来创建此快照。如果maxraftstate为 -1,则无需创建快照。maxraftstate的限制适用于 Raft 作为persister.Save()第一个参数传递的 GOB 编码字节 。

  • 实现kvraft1/server.go 中的 Snapshot()和Restore() 方法,供rsm调用。修改rsm以处理包含快照的 applyCh 消息。


三、实验思路

1.实现Snapshot和Restore方法

这个地方实现起来可以参考raft中的序列化和反序列化,写起来还是比较简单的,这里多序列化了几个变量(会放在后面去说)
ps:反序列化变量的顺序应该与序列化时相同

func (kv *KVServer) Snapshot() []byte {kv.mu.Lock()defer kv.mu.Unlock()w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(kv.Kvs)e.Encode(kv.lastRequestId)e.Encode(kv.lastGetResult)e.Encode(kv.lastPutResult)return w.Bytes()
}func (kv *KVServer) Restore(data []byte) {kv.mu.Lock()defer kv.mu.Unlock()// Your code hered := labgob.NewDecoder(bytes.NewBuffer(data))Kvs := make(map[string]KeyValueStore)lastRequestId := make(map[int64]int64)lastGetResult := make(map[int64]*rpc.GetReply)lastPutResult := make(map[int64]*rpc.PutReply)if d.Decode(&Kvs) != nil || d.Decode(&lastRequestId) != nil ||d.Decode(&lastGetResult) != nil || d.Decode(&lastPutResult) != nil {return}kv.Kvs = Kvskv.lastRequestId = lastRequestIdkv.lastGetResult = lastGetResultkv.lastPutResult = lastPutResult
}

2.何时调用Snapshot和Restore

当snapshot的大小超过给定大小maxraftstate时,要去执行snapshot。
这个检测过程可以让在reader中实现,每操作一条指令,就去检测一下。
而当RSM重启后发现snapshot大小不为0,或者是reader中接收到SnapshotValid的日志的时候就调用restore

func (rsm *RSM) reader() {for {msg, ok := <-rsm.applyChif !ok {rsm.mu.Lock()DPrintf("rsm[%v] applyCh closed", rsm.me)// 说明applyCh已经关闭,当applyCh关闭时,reader只是退出循环,但没有通知正在等待的Submit操作。这导致Submit操作永远等待,造成超时// 通知所有等待的操作,让他们返回ErrWrongLeaderfor _, op := range rsm.pendingOps {op.Ch <- rpc.ErrWrongLeader}rsm.pendingOps = make(map[int]Op)rsm.mu.Unlock()break}if msg.CommandValid {result := rsm.sm.DoOp(msg.Command)if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate) {snapshot := rsm.sm.Snapshot()rsm.rf.Snapshot(msg.CommandIndex, snapshot)}rsm.mu.Lock()if op, ok := rsm.pendingOps[msg.CommandIndex]; ok {currentTerm, _ := rsm.rf.GetState()// 使用!reflect.DeepEqual(op.command, msg.Command) 这样可以兼容结构体/指针等各种命令类型的比较。if op.Term != currentTerm || !reflect.DeepEqual(op.Req, msg.Command) {// 当你在reader()里发现某个index被apply时,发现term或command不一致(即被覆盖),// 说明这个index及其之后的所有pendingOps都不可能被commit。for i, op := range rsm.pendingOps {if i >= msg.CommandIndex {op.Ch <- rpc.ErrWrongLeaderdelete(rsm.pendingOps, i)}}rsm.pendingOps = make(map[int]Op)rsm.mu.Unlock()continue}op.Ch <- resultdelete(rsm.pendingOps, msg.CommandIndex)}rsm.mu.Unlock()} else if msg.SnapshotValid {rsm.sm.Restore(msg.Snapshot)}}
}

3.如何实现线性一致性

对于重复请求,只会执行一次。具体来说,kvserver需要保存客户端的id和本次请求的id,对于raft返回来要执行的请求,需要判断是否已经执行过。我们只需要保存该client上一次执行的结果和id即可,因为本实验测试不会考察“返回上上次结果”的场景,只要求你能正确处理“最新 requestId 的重复请求”。

	lastRequestId map[int64]int64         // 记录每个clientId的最后一个requestIdlastGetResult map[int64]*rpc.GetReply // 记录每个clientId的最后一个GetResultlastPutResult map[int64]*rpc.PutReply // 记录每个clientId的最后一个PutResultfunc (kv *KVServer) DoOp(req any) any {// 得到requestId 和 clientIdvar requestId int64var clientId int64switch args := req.(type) {case *rpc.PutArgs:requestId = args.RequestIdclientId = args.ClientIdcase *rpc.GetArgs:requestId = args.RequestIdclientId = args.ClientId}// log.Printf("DoOp: clientId=%d, requestId=%d", clientId%10000, requestId)// 幂等性判断kv.mu.Lock()if lastRequestId, ok := kv.lastRequestId[clientId]; ok && lastRequestId >= requestId {if lastRequestId > requestId {// 此请求已过期switch req.(type) {case *rpc.PutArgs:return &rpc.PutReply{Err: rpc.ErrWrongLeader}case *rpc.GetArgs:return &rpc.GetReply{Err: rpc.ErrWrongLeader}}}// 说明这个请求已经做过了,直接返回结果var result anyswitch req.(type) {case *rpc.PutArgs:result = kv.lastPutResult[clientId]case *rpc.GetArgs:result = kv.lastGetResult[clientId]}kv.mu.Unlock()// log.Printf("DoOp: 幂等性检查通过,返回缓存结果 clientId=%d, requestId=%d", clientId%10000, requestId)// 确保返回值不为nilif result == nil {// 如果结果为nil,说明之前没有成功执行过,返回错误// log.Printf("DoOp: 缓存结果为nil,返回错误 clientId=%d, requestId=%d", clientId%10000, requestId)return &rpc.PutReply{Err: rpc.ErrWrongLeader}}return result}// log.Printf("DoOp: 幂等性检查未通过,执行新请求 clientId=%d, requestId=%d", clientId%10000, requestId)kv.mu.Unlock()// 执行操作 - 直接在这里实现,避免调用需要锁的方法var result anyswitch args := req.(type) {case *rpc.PutArgs:// log.Printf("DoOp: 执行Put操作 key=%s, value=%s, version=%d", args.Key, args.Value, args.Version)kv.mu.Lock()result = kv.doPutInternal(args)kv.mu.Unlock()case *rpc.GetArgs:// log.Printf("DoOp: 执行Get操作 key=%s", args.Key)kv.mu.Lock()result = kv.doGetInternal(args)kv.mu.Unlock()default:rsm.DPrintf("DoOp receive error type: %v", reflect.TypeOf(req))result = &rpc.PutReply{Err: rpc.ErrWrongLeader}}// 保存结果 - 重新获取锁kv.mu.Lock()kv.lastRequestId[clientId] = requestIdswitch req.(type) {case *rpc.PutArgs:kv.lastPutResult[clientId] = result.(*rpc.PutReply)case *rpc.GetArgs:kv.lastGetResult[clientId] = result.(*rpc.GetReply)}// log.Printf("DoOp: 保存结果 clientId=%s, requestId=%d", fmt.Sprintf("%04d", clientId%10000), requestId)kv.mu.Unlock()return result
}

四、实验中出现的问题

问题1 :序列化有的时候会出错,导致反序列化出来的结果是nil
解决 :你保存的是 map[string]any,但反序列化时Go的gob对any(interface{})类型支持很差,容易失败。
建议把 lastRequestResult 的类型改成 map[string]*rpc.PutReply 或 map[string]*rpc.GetReply,并且只存储Put和Get的结果,不要用any。


问题2 :TestSnapshotRecoverManyClients4C测试会一直进行下去
解决2 :可以选择延长Submit的超时时间,并且在Raft中,关闭applyCh后,将rf.applyCh置为nil


五、后言

至此mit6.5840的前四个lab都已经做好了,还是蛮有成就感的,接下来努力去冲刺lab5。

http://www.dtcms.com/a/279406.html

相关文章:

  • Java Stream流详解
  • 文心一言 4.5 开源深度剖析:中文霸主登场,开源引擎重塑大模型生态
  • C++11 std::is_permutation:从用法到原理的深度解析
  • 什么是延迟双删
  • 算法训练营day18 530.二叉搜索树的最小绝对差、501.二叉搜索树中的众数、236. 二叉树的最近公共祖先
  • 通过 ip a 查看网络接口名
  • 【算法】贪心算法:摆动序列C++
  • 2025js——面试题(8)-http
  • Linux 系统下的 Sangfor VDI 客户端安装与登录完全攻略 (CentOS、Ubuntu、麒麟全线通用)
  • 程序跑飞是什么?
  • 核电概念盘中异动,中核科技涨停引领板块热度
  • 物联网技术促进能量收集创新应用落地
  • 第一章编辑器开发基础第一节绘制编辑器元素_4输入字段(4/7)
  • 【一维 前缀和+差分】
  • 互斥锁与同步锁
  • IIS错误:Service Unavailable HTTP Error 503. The service is unavailable.
  • Unity Shader 预热与缓存优化
  • Unity中HumanBodyBones骨骼对照
  • 卡在“pycharm正在创建帮助程序目录”
  • 笔试——Day6
  • 达梦国产数据库安装
  • React Hook 详解:原理、执行顺序与 useEffect 的执行机制
  • 切比雪夫多项式
  • leetcode 1290. 二进制链表转整数 简单
  • C++类模版与友元
  • 进程、线程、协程
  • windows内核研究(进程与线程-进程结构体EPROCESS)
  • Django基础(一)———创建与启动
  • 【反转链表专题】【LeetCode206.反转链表】【LeetCode25.K个一组翻转链表】【LeetCode234.回文链表】
  • Spring Boot 自带的 JavaMail 集成