// // RPC definitions. // // remember to capitalize all names. //
import"os" import"strconv"
// Add your RPC definitions here. type WorkerArgs struct { TaskType int TaskID int }
type WorkerReply struct { TaskType int
MapTaskCnt int ReduceTaskCnt int
// Map task field MapTaskID int Mapfilename string
// Reduce task field ReduceTaskID int }
// Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Can't use the current directory since // Athena AFS doesn't support UNIX-domain sockets. funccoordinatorSock()string { s := "/var/tmp/824-mr-" s += strconv.Itoa(os.Getuid()) return s }
// if TaskType == 1 { // fmt.Printf("Finished Inform from TaskType=%v, TaskID=%v, whose name is %v\n", TaskType, TaskID, c.files[TaskID]) // } if TaskType == 1 { c.MapFinished += 1 c.MapStatus[TaskID].Status = 2 } elseif TaskType == 2 { c.ReduceFinished += 1 c.ReduceStatus[TaskID].Status = 2 } else { log.Fatalf("FinishedInform - received undefined TaskType %v", TaskType) } c.lk.Unlock() returnnil }
// // start a thread that listens for RPCs from worker.go // func(c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil) }
// // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. // func(c *Coordinator) Done() bool { // Your code here. c.lk.Lock() ret := c.ReduceFinished == c.ReduceTaskCnt c.lk.Unlock() return ret }
// // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. // funcMakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{}
for i := 0; i < c.MapTaskCnt; i += 1 { c.MapStatus = append(c.MapStatus, cur_form) } for i := 0; i < c.ReduceTaskCnt; i += 1 { c.ReduceStatus = append(c.ReduceStatus, cur_form) }
// // Map functions return a slice of KeyValue. // type KeyValue struct { Key string Value string }
// // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. // funcihash(key string)int { h := fnv.New32a() h.Write([]byte(key)) returnint(h.Sum32() & 0x7fffffff) }
// Your worker implementation here. // uncomment to send the Example RPC to the coordinator. // CallExample() for { args := WorkerArgs{} reply := WorkerReply{} ok := call("Coordinator.ApplyTask", &args, &reply) if !ok || reply.TaskType == 4 { break } if reply.TaskType == 1 { // This is a Mapper intermediate := []KeyValue{} MapTaskID := reply.MapTaskID filename := reply.Mapfilename MapTaskCnt := reply.MapTaskCnt ReduceTaskCnt := reply.ReduceTaskCnt // Do the map as in sequential example file, err := os.Open(filename) if err != nil { log.Fatalf("Mapper - cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("Mapper - cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) intermediate = append(intermediate, kva...)
// hash the result to bucket buckets := make([][]KeyValue, ReduceTaskCnt) // buckets[i] is a slice storing all kvpairs with hashID i for _, kvpair := range intermediate { HashedID := ihash(kvpair.Key) % MapTaskCnt buckets[HashedID] = append(buckets[HashedID], kvpair) }
// write to temporary file for ReduceTaskID := range buckets { storename := "mr-" + strconv.Itoa(MapTaskID) + "-" + strconv.Itoa(ReduceTaskID) tmpfile, err := ioutil.TempFile("./", storename + "-tmp") if err != nil { fmt.Println(err) } enc := json.NewEncoder(tmpfile) for _, kv := range buckets[ReduceTaskID] { enc.Encode(&kv) } // Rename to mr-XX-XX tmpfile.Close() if err := os.Rename(tmpfile.Name(), storename); err != nil { fmt.Println("Error renaming file:", err) } }
// Inform Coordinator that this Map has finished args := WorkerArgs{} reply := WorkerReply{} args.TaskType = 1 args.TaskID = MapTaskID call("Coordinator.FinishedInform", &args, &reply)
} elseif reply.TaskType == 2 { // This is a Reducer intermediate := []KeyValue{} ReduceTaskID := reply.ReduceTaskID MapTaskCnt := reply.MapTaskCnt // ReduceTaskCnt := reply.ReduceTaskCnt // Read from file to intermediate for MapTaskID := 0; MapTaskID < MapTaskCnt; MapTaskID += 1 { storename := "mr-" + strconv.Itoa(MapTaskID) + "-" + strconv.Itoa(ReduceTaskID) storefile, err := os.Open(storename) if err != nil { log.Fatalf("Reducer - cannot open %v", storename) } dec := json.NewDecoder(storefile) for { var kvpair KeyValue if err := dec.Decode(&kvpair); err != nil { break } intermediate = append(intermediate, kvpair) } storefile.Close() } // Reduce and Save to temporary file, then rename to mr-out-XX sort.Sort(ByKey(intermediate)) oname := "mr-out-" + strconv.Itoa(ReduceTaskID) ofile, _ := ioutil.TempFile("./", oname + "-tmp")
i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j } os.Rename(ofile.Name(), oname) ofile.Close()
// Inform Coordinator that this Reduce has finished args := WorkerArgs{} reply := WorkerReply{} args.TaskType = 2 args.TaskID = ReduceTaskID call("Coordinator.FinishedInform", &args, &reply) } elseif reply.TaskType == 3 { // Currently no task to do time.Sleep(time.Second) } else { log.Fatalf("Worker - received unknown type %v\n", reply.TaskType) time.Sleep(time.Second) } } }
// // send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. // funccall(rpcname string, args interface{}, reply interface{})bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock() c, err := rpc.DialHTTP("unix", sockname) if err != nil { log.Fatal("dialing:", err) } defer c.Close()