当前位置: 首页 > news >正文

mit6.824 2024spring Lab1 MapReduce

master节点

package mrimport ("log""net""net/http""net/rpc""os""sync""time"
)// coordinator -> master节点// 1处理worker节点的rpc请求
// 2维护任务状态
// 3检测任务是否超时,任务执行时间太久要给他停掉然后重新分配type Task struct {FileName  stringStatus    stringStartTime time.TimeTaskID    int
}
type Coordinator struct {// Your definitions here.mu          sync.Mutex //worker节点并发来请求,所以需要加锁mapTasks    []TaskreduceTasks []TasknReduce     int  //下面的MakeCoordinator函数参数中有一个nReducemapFinished bool //map完成后进入reduce阶段allFinished boolfiles       []stringnextTaskID  int
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {reply.Y = args.X + 1return nil
}// c是方法的接收者,类似于其他原因的this或者self
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {c.mu.Lock()defer c.mu.Unlock()// 检查是否存在超时任务c.checkTimeOut()// 如果所有任务完成,通知worker退出if c.allFinished {reply.TaskType = ExitTaskreturn nil}// 如果map任务还没执行完,给worker分配map任务if !c.mapFinished {for i, task := range c.mapTasks {if task.Status == Idle {reply.TaskID = task.TaskIDreply.TaskType = MapTaskreply.FileName = task.FileNamereply.NReduce = c.nReduce //map后会分到n个桶// 更新任务状态c.mapTasks[i].Status = InProgressc.mapTasks[i].StartTime = time.Now()return nil}}// map没有全部完成,但是找不到一个空闲的map任务reply.TaskType = WaitTaskreturn nil}//所有map都执行完,则分配reduce任务for i, task := range c.reduceTasks {if task.Status == Idle {reply.TaskID = task.TaskIDreply.TaskType = ReduceTaskreply.ReduceTaskNum = i            //第几个reduce任务(编号)reply.MapTaskNum = len(c.mapTasks) //有多少个map任务//更新任务状态c.reduceTasks[i].Status = InProgressc.reduceTasks[i].StartTime = time.Now()return nil}}// 没有空闲的reduce任务reply.TaskType = WaitTaskreturn nil
}func (c *Coordinator) checkTimeOut() {// 超时时间10stimeOut := 10 * time.Secondnow := time.Now()if !c.mapFinished {allCompleted := truefor i, task := range c.mapTasks {if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {// 任务已经超时了c.mapTasks[i].Status = Idle}if task.Status != Completed {allCompleted = false}}c.mapFinished = allCompleted}allCompleted := truefor i, task := range c.reduceTasks {if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {c.reduceTasks[i].Status = Idle}if task.Status != Completed {allCompleted = false}}c.allFinished = allCompleted
}func (c *Coordinator) ReportTaskDone(args *ReportTaskArgs, reply *ReportTaskReply) error {c.mu.Lock()defer c.mu.Unlock()if args.TaskType == MapTask {for i, task := range c.mapTasks {if task.TaskID == args.TaskID && task.Status == InProgress {c.mapTasks[i].Status = CompletedallCompleted := truefor _, task := range c.mapTasks {if task.Status != Completed {allCompleted = falsebreak}}c.allFinished = allCompletedreply.OK = truereturn nil}}} else if args.TaskType == ReduceTask {for i, task := range c.reduceTasks {if task.TaskID == args.TaskID && task.Status == InProgress {c.reduceTasks[i].Status = CompletedallCompleted := truefor _, task := range c.reduceTasks {if task.Status != Completed {allCompleted = falsebreak}}c.allFinished = allCompletedreply.OK = truereturn nil}}}reply.OK = falsereturn nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {ret := false// Your code here.if c.allFinished {ret = true}return ret
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {// Your code here.c := Coordinator{files:       files,nReduce:     nReduce,mapTasks:    make([]Task, len(files)),reduceTasks: make([]Task, nReduce),mapFinished: false,allFinished: false,nextTaskID:  0,// mu不需要初始化}// 初始化map任务for i, file := range c.files {c.mapTasks[i] = Task{TaskID:   i,FileName: file,Status:   Idle,}}// 初始化reduce任务// reduce任务输入的是一些中间文件,是之后要产生的,现在并不知道名字,所以FileName不需要初始化for i := 0; i < nReduce; i++ {c.reduceTasks[i] = Task{TaskID: i,Status: Idle,}}c.server()return &c
}

worker节点

package mrimport ("encoding/json""fmt""hash/fnv""io""log""net/rpc""os""sort""time"
)/**
* 请求任务
* 执行map或reduce任务
* 处理文件输入输出
* 汇报任务状态*/// Map functions return a slice of KeyValue.
type KeyValue struct {Key   stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {workerID := os.Getpid()for {task := getTask(workerID)switch task.TaskType {case MapTask:doMap(task, mapf, workerID)case ReduceTask:doReduce(task, reducef, workerID)case WaitTask:time.Sleep(500 * time.Millisecond)continuecase ExitTask:return}}
}func doMap(task GetTaskReply, mapf func(string, string) []KeyValue, workerID int) {filename := task.FileNamefile, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := io.ReadAll(file)if err != nil {log.Fatalf("connot read %v", filename)}file.Close()// 调用用户自定义的map函数,得到键值对kva := mapf(filename, string(content))// 将中间结果分成nReduce个桶intermediate := make([][]KeyValue, task.NReduce)for _, kv := range kva {bucketNum := ihash(kv.Key) % task.NReduceintermediate[bucketNum] = append(intermediate[bucketNum], kv)}// 将每个桶放到对应的临时文件中for i := 0; i < task.NReduce; i++ {tempFile, err := os.CreateTemp("", "mr-tmp-*")if err != nil {log.Fatalf("connot create tmp file")}enc := json.NewEncoder(tempFile)for _, kv := range intermediate {err := enc.Encode(&kv)if err != nil {log.Fatalf("connot encode %v", kv)}}tempFile.Close()// 将临时文件重命名 mr-map任务编号-reduce桶编号os.Rename(tempFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, i))}reportTaskDone(task.TaskType, task.TaskID, workerID)
}
func doReduce(task GetTaskReply, reducef func(string, []string) string, workerID int) {reduceTaskNum := task.ReduceTaskNummapTaskNum := task.MapTaskNumintermediate := []KeyValue{}// 读取该reduce负责的中间文件for i := 0; i < mapTaskNum; i++ {filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNum)file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}intermediate = append(intermediate, kv)}file.Close()}sort.Sort(ByKey(intermediate))// 创建输出的临时文件tempfile, err := os.CreateTemp("", "mr-out-tmp-*")if err != nil {log.Fatalf("connot create temp file")}// 对每一个key调一下reduce函数i := 0for i < len(intermediate) {j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, intermediate[k].Value)}output := reducef(intermediate[i].Key, values)fmt.Fprintf(tempfile, "%v %v\n", intermediate[i].Key, output)i = j}tempfile.Close()os.Rename(tempfile.Name(), fmt.Sprintf("mr-out-%d", reduceTaskNum))reportTaskDone(ReduceTask, task.TaskID, workerID)
}
func getTask(workerID int) GetTaskReply {args := GetTaskArgs{WorkerID: workerID}reply := GetTaskReply{}call("Coordinator.GetTask", &args, &reply)return reply
}
func reportTaskDone(taskType string, taskID int, workerID int) {args := ReportTaskArgs{TaskType: taskType, TaskID: taskID, WorkerID: workerID, Completed: true}reply := ReportTaskReply{}call("Coordinator.ReportTaskDone", &args, &reply)
}// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {// declare an argument structure.args := ExampleArgs{}// fill in the argument(s).args.X = 99// declare a reply structure.reply := ExampleReply{}// send the RPC request, wait for the reply.// the "Coordinator.Example" tells the// receiving server that we'd like to call// the Example() method of struct Coordinator.ok := call("Coordinator.Example", &args, &reply)if ok {// reply.Y should be 100.fmt.Printf("reply.Y %v\n", reply.Y)} else {fmt.Printf("call failed!\n")}
}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}

rpc.go

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("os""strconv"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ExampleArgs struct {X int
}type ExampleReply struct {Y int
}// Add your RPC definitions here.// 任务类型常量
const (MapTask    = "map"ReduceTask = "reduce"ExitTask   = "exit"WaitTask   = "wait"
)// 任务状态常量
const (Idle       = "idle"InProgress = "in-progress"Completed  = "completed"
)// 请求任务
type GetTaskArgs struct {WorkerID int
}type GetTaskReply struct {TaskID        intTaskType      stringFileName      stringMapTaskNum    int //map任务总数ReduceTaskNum int //reduce任务的编号,负责分区编号,表示当前Reduce任务负责处理的分区编号。// Map任务完成后,结果会分到NReduce个桶里面,Reduce任务需要从所有Map任务的中间文件中读取对应分区的数据NReduce int //一共有多少个reduce任务
}// 汇报任务状态
type ReportTaskArgs struct {TaskType  stringWorkerID  intTaskID    intCompleted bool
}
type ReportTaskReply struct {OK bool
}// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
http://www.dtcms.com/a/334773.html

相关文章:

  • 衡石使用指南嵌入式场景实践之仪表盘嵌入
  • 3 统一建模语言(UML)(上)
  • 力扣 hot100 Day75
  • 动手学深度学习(pytorch版):第三章节—线性神经网络(6) softmax回归的从零开始实现
  • 基于深度学习的老照片修复系统
  • 嵌入式硬件篇---电源电路
  • SpringBoot自动配置原理(二)
  • 智能客服、AI工作流、语音、聊天模板
  • MySQL的下载安装(MSI和ZIP版本都有)
  • 【Kubernetes系列】Kubernetes 中 Pod 层参数与 Deployment 层 Env 参数的区别与级别分析
  • WSL中占用磁盘空间大问题解决
  • 自适应阈值二值化参数详解 ,计算机视觉,图片处理 邻域大小 调整常数(C=3)和可视化调节参数的应用程序
  • 区块链技术原理(14)-以太坊数据结构
  • ubuntu更新chrome版本
  • 我的世界Java版1.21.4的Fabric模组开发教程(十九)自定义生物群系
  • 力扣(LeetCode) ——622. 设计循环队列(C语言)
  • 《C语言程序设计》笔记p10
  • 如何拿捏unittest自动化测试框架?
  • 代码随想录算法训练营四十三天|图论part01
  • 同创物流学习记录2·电车
  • 【手撕JAVA多线程】1.从设计初衷去看JAVA的线程操作
  • 【C++】STL 容器—list 底层剖析
  • Java应届生求职八股(5)---并发编程篇
  • JCTools 无锁并发队列基础:ConcurrentCircularArrayQueue
  • 【论文阅读笔记】--Eurosys--HCache
  • 安全审计-firewall防火墙
  • 探索粒子世界:从基础理论到前沿应用与未来展望
  • 基于动捕实现Epuck2的轨迹跟踪
  • 每日算法刷题Day62:8.16:leetcode 堆8道题,用时2h30min
  • 【Java基础面试题】数据类型