mit6.5840-lab4C-Snapshot-25Summer
文章目录
- 一、前言
- 二、实验内容
- 三、实验思路
- 1.实现Snapshot和Restore方法
- 2.何时调用Snapshot和Restore
- 3.如何实现线性一致性
- 四、实验中出现的问题
- 五、后言
一、前言
这一部分的代码也是写了两天,主要的原因我觉得是因为对这部分的逻辑理解的不够,闷着头写的,导致后面会有小错误,重写之后经常出现大多数测试可以通过,而某一个测试过不去。当大家没有思路的时候,建议去反复的阅读实验要求,经常可以发现新的东西。
二、实验内容
-
目前,你的键/值服务器不会调用 Raft 库的Snapshot()方法,因此重启的服务器必须重放完整的持久化 Raft 日志才能恢复其状态。现在,你将修改 kvserver 和rsm,使其与 Raft 配合使用,以节省日志空间并减少重启时间,并使用实验 3D 中 Raft 的Snapshot()方法。
-
测试程序将maxraftstate传递给 StartKVServer(),StartKVServer() 再将其传递给rsm 。maxraftstate表示持久化 Raft 状态的最大允许大小(以字节为单位)(包含日志,但不包括快照)。您应该将 maxraftstate 与 rf.PersistBytes() 进行比较。每当 您的rsm检测 到 Raft 状态大小接近此阈值时,它应该通过调用 Raft 的Snapshot来保存快照。rsm 可以通过调用StateMachine接口的Snapshot方法 来获取 kvserver 的快照来创建此快照。如果maxraftstate为 -1,则无需创建快照。maxraftstate的限制适用于 Raft 作为persister.Save()第一个参数传递的 GOB 编码字节 。
-
实现kvraft1/server.go 中的 Snapshot()和Restore() 方法,供rsm调用。修改rsm以处理包含快照的 applyCh 消息。
三、实验思路
1.实现Snapshot和Restore方法
这个地方实现起来可以参考raft中的序列化和反序列化,写起来还是比较简单的,这里多序列化了几个变量(会放在后面去说)
ps:反序列化变量的顺序应该与序列化时相同
func (kv *KVServer) Snapshot() []byte {kv.mu.Lock()defer kv.mu.Unlock()w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(kv.Kvs)e.Encode(kv.lastRequestId)e.Encode(kv.lastGetResult)e.Encode(kv.lastPutResult)return w.Bytes()
}func (kv *KVServer) Restore(data []byte) {kv.mu.Lock()defer kv.mu.Unlock()// Your code hered := labgob.NewDecoder(bytes.NewBuffer(data))Kvs := make(map[string]KeyValueStore)lastRequestId := make(map[int64]int64)lastGetResult := make(map[int64]*rpc.GetReply)lastPutResult := make(map[int64]*rpc.PutReply)if d.Decode(&Kvs) != nil || d.Decode(&lastRequestId) != nil ||d.Decode(&lastGetResult) != nil || d.Decode(&lastPutResult) != nil {return}kv.Kvs = Kvskv.lastRequestId = lastRequestIdkv.lastGetResult = lastGetResultkv.lastPutResult = lastPutResult
}
2.何时调用Snapshot和Restore
当snapshot的大小超过给定大小maxraftstate时,要去执行snapshot。
这个检测过程可以让在reader中实现,每操作一条指令,就去检测一下。
而当RSM重启后发现snapshot大小不为0,或者是reader中接收到SnapshotValid的日志的时候就调用restore
func (rsm *RSM) reader() {for {msg, ok := <-rsm.applyChif !ok {rsm.mu.Lock()DPrintf("rsm[%v] applyCh closed", rsm.me)// 说明applyCh已经关闭,当applyCh关闭时,reader只是退出循环,但没有通知正在等待的Submit操作。这导致Submit操作永远等待,造成超时// 通知所有等待的操作,让他们返回ErrWrongLeaderfor _, op := range rsm.pendingOps {op.Ch <- rpc.ErrWrongLeader}rsm.pendingOps = make(map[int]Op)rsm.mu.Unlock()break}if msg.CommandValid {result := rsm.sm.DoOp(msg.Command)if rsm.maxraftstate != -1 && rsm.rf.PersistBytes() > rsm.maxraftstate) {snapshot := rsm.sm.Snapshot()rsm.rf.Snapshot(msg.CommandIndex, snapshot)}rsm.mu.Lock()if op, ok := rsm.pendingOps[msg.CommandIndex]; ok {currentTerm, _ := rsm.rf.GetState()// 使用!reflect.DeepEqual(op.command, msg.Command) 这样可以兼容结构体/指针等各种命令类型的比较。if op.Term != currentTerm || !reflect.DeepEqual(op.Req, msg.Command) {// 当你在reader()里发现某个index被apply时,发现term或command不一致(即被覆盖),// 说明这个index及其之后的所有pendingOps都不可能被commit。for i, op := range rsm.pendingOps {if i >= msg.CommandIndex {op.Ch <- rpc.ErrWrongLeaderdelete(rsm.pendingOps, i)}}rsm.pendingOps = make(map[int]Op)rsm.mu.Unlock()continue}op.Ch <- resultdelete(rsm.pendingOps, msg.CommandIndex)}rsm.mu.Unlock()} else if msg.SnapshotValid {rsm.sm.Restore(msg.Snapshot)}}
}
3.如何实现线性一致性
对于重复请求,只会执行一次。具体来说,kvserver需要保存客户端的id和本次请求的id,对于raft返回来要执行的请求,需要判断是否已经执行过。我们只需要保存该client上一次执行的结果和id即可,因为本实验测试不会考察“返回上上次结果”的场景,只要求你能正确处理“最新 requestId 的重复请求”。
lastRequestId map[int64]int64 // 记录每个clientId的最后一个requestIdlastGetResult map[int64]*rpc.GetReply // 记录每个clientId的最后一个GetResultlastPutResult map[int64]*rpc.PutReply // 记录每个clientId的最后一个PutResultfunc (kv *KVServer) DoOp(req any) any {// 得到requestId 和 clientIdvar requestId int64var clientId int64switch args := req.(type) {case *rpc.PutArgs:requestId = args.RequestIdclientId = args.ClientIdcase *rpc.GetArgs:requestId = args.RequestIdclientId = args.ClientId}// log.Printf("DoOp: clientId=%d, requestId=%d", clientId%10000, requestId)// 幂等性判断kv.mu.Lock()if lastRequestId, ok := kv.lastRequestId[clientId]; ok && lastRequestId >= requestId {if lastRequestId > requestId {// 此请求已过期switch req.(type) {case *rpc.PutArgs:return &rpc.PutReply{Err: rpc.ErrWrongLeader}case *rpc.GetArgs:return &rpc.GetReply{Err: rpc.ErrWrongLeader}}}// 说明这个请求已经做过了,直接返回结果var result anyswitch req.(type) {case *rpc.PutArgs:result = kv.lastPutResult[clientId]case *rpc.GetArgs:result = kv.lastGetResult[clientId]}kv.mu.Unlock()// log.Printf("DoOp: 幂等性检查通过,返回缓存结果 clientId=%d, requestId=%d", clientId%10000, requestId)// 确保返回值不为nilif result == nil {// 如果结果为nil,说明之前没有成功执行过,返回错误// log.Printf("DoOp: 缓存结果为nil,返回错误 clientId=%d, requestId=%d", clientId%10000, requestId)return &rpc.PutReply{Err: rpc.ErrWrongLeader}}return result}// log.Printf("DoOp: 幂等性检查未通过,执行新请求 clientId=%d, requestId=%d", clientId%10000, requestId)kv.mu.Unlock()// 执行操作 - 直接在这里实现,避免调用需要锁的方法var result anyswitch args := req.(type) {case *rpc.PutArgs:// log.Printf("DoOp: 执行Put操作 key=%s, value=%s, version=%d", args.Key, args.Value, args.Version)kv.mu.Lock()result = kv.doPutInternal(args)kv.mu.Unlock()case *rpc.GetArgs:// log.Printf("DoOp: 执行Get操作 key=%s", args.Key)kv.mu.Lock()result = kv.doGetInternal(args)kv.mu.Unlock()default:rsm.DPrintf("DoOp receive error type: %v", reflect.TypeOf(req))result = &rpc.PutReply{Err: rpc.ErrWrongLeader}}// 保存结果 - 重新获取锁kv.mu.Lock()kv.lastRequestId[clientId] = requestIdswitch req.(type) {case *rpc.PutArgs:kv.lastPutResult[clientId] = result.(*rpc.PutReply)case *rpc.GetArgs:kv.lastGetResult[clientId] = result.(*rpc.GetReply)}// log.Printf("DoOp: 保存结果 clientId=%s, requestId=%d", fmt.Sprintf("%04d", clientId%10000), requestId)kv.mu.Unlock()return result
}
四、实验中出现的问题
问题1 :序列化有的时候会出错,导致反序列化出来的结果是nil
解决 :你保存的是 map[string]any,但反序列化时Go的gob对any(interface{})类型支持很差,容易失败。
建议把 lastRequestResult 的类型改成 map[string]*rpc.PutReply 或 map[string]*rpc.GetReply,并且只存储Put和Get的结果,不要用any。
问题2 :TestSnapshotRecoverManyClients4C测试会一直进行下去
解决2 :可以选择延长Submit的超时时间,并且在Raft中,关闭applyCh后,将rf.applyCh置为nil
五、后言
至此mit6.5840的前四个lab都已经做好了,还是蛮有成就感的,接下来努力去冲刺lab5。