实验描述

链接:http://nil.csail.mit.edu/6.824/2021/labs/lab-mr.html

任务:使用 Golang 实现 MapReduce 框架。

实验结果与感想

实验结果:本地运行 ./mr-test-many.sh 50 测试通过。

框架和算法不难,主要是熟悉 Go 的基础语法。在这里简述一下我对 Go 的优点和缺点的认识:

  • 具有原生的管道(channel)和协程(goroutine),对于多线程程序非常友好;在一定程度上鼓励了多线程和异步编程,甚至可以将一些经典的算法完全改为多线程版本(如遍历二叉树时,在每一个节点处开一个协程)
  • 对 RPC 的支持非常到位,很容易进行协程间的通信和调用
  • 运行时自带 sanitizer(race检测器)
  • 面向对象采用了 interface 的设计,使代码统一性较高(->.);但是个人不喜欢,会使得我的代码出现混乱(拷贝和引用的 implicit 推导在 python 中我就尤为反感)
  • 异常处理采用了C的风格,使用返回值表达;但是返回值表达不够清晰:err 是隐式强制传递的,在类型的判别上容易造成混乱;_ 依赖名称进行编译期检查(包括成员需要首字母大写进行 extern),导致命名系统不完备(不统一)
  • 内存声明类似 python,不容易如 C 一般精准操控

关于实验,个人感觉做了一个大号 JYYOS 的 M 实验。由于在本实验中,代码和设计较为简单,我没有遭遇非常棘手的 bug,因此目前还没有对 Go 进行调试的经验。

实验设计

coordinator

调度器的结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type ApplyForm struct {
    ApplyID int
    ApplyTime time.Time
    Status int // 0: unapplied, 1: running, 2: finished
}

type Coordinator struct {
    MapTaskCnt int
    MapFinished int
    MapStatus []ApplyForm

    ReduceTaskCnt int
    ReduceFinished int
    ReduceStatus []ApplyForm

    files []string
    lk sync.Mutex
}

对于每一个任务,都会开具一张申请单,其中记录了:任务编号、任务开始时间、任务状态。调度器自身的所有内容都属于临界区,使用 lk sync.Mutex 保护。

当 worker 调用 ApplyTask 认领任务时,可能出现以下三种情况:

  • 还有 Map 任务没有完成:此时指派待做 Map 任务并填写对应申请单;没有待做任务时发送等待信号
  • Map 已经全部完成,但有部分 Reduce 没有完成:此时指派待做 Reduce 任务并填写对应申请单;没有待做任务时发送等待信号
  • Reduce 已经全部完成:此时发送终止信号使协程退出

在查询待做任务时,如果发现某任务处于“正在处理”状态,但处理时长超过限时(本实验中为 10s),则视为该协程已 crash,该任务可参与分配。

worker

只需要不断轮询申请任务即可。具体实现没什么好说的,运算部分参考顺序执行代码即可,保存部分代码已经非常清晰。

印象深刻的bug

由于我在 WSL 上运行实验,导致文件系统和真实 Linux 有所区别。在 os.Rename 时,如果尝试将 tmpfs 中的文件重命名,将会得到报错:Error renaming file: invalid cross-device link。于是将临时文件迁移到 ./mr-tmp 下;由于该目录为挂载到 /mnt 下的 Windows 目录,因此运行速度显著降低了,在测试时没有通过 job count test(猜测时由于运行过慢,在超时后调度器重新分配了任务)。将文件夹迁移到 ~ 下后运行测试无异常。

特别的设计

在本实验中,我使用了C风格的设计处理超时的情况:记录开始时间,并在再次访问到任务时判定是否已经超时。在 Go 中,我们完全可以使用异步多线程处理超时。在此记录一下看到的两种方法:

  1. ApplyTask 返回前,立刻开启一个计时协程,如下所述。当计时结束发现任务仍处于“正在运行”状态时,立刻判定该 worker 已经 crash,并将该 task 的状态恢复。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    go func() {
    time.Sleep(time.Duration(10) * time.Second) // wait 10 seconds
    m.mu.Lock()
    if m.maptasklog[allocate] == 1 {
    // still waiting, assume the map worker is died
    m.maptasklog[allocate] = 0
    }
    m.mu.Unlock()
    }()
  2. 调度器启动一个全局的 heartbeat 协程,定时向所有协程发送心跳。若检测到某协程没有响应,则判定其已经 crash。

Code

rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

// Add your RPC definitions here.
type WorkerArgs struct {
TaskType int
TaskID int
}

type WorkerReply struct {
TaskType int

MapTaskCnt int
ReduceTaskCnt int

// Map task field
MapTaskID int
Mapfilename string

// Reduce task field
ReduceTaskID int
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

coordinator.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package mr

import (
// "fmt"
"log"
"net"
"os"
"net/rpc"
"net/http"
"time"
"sync"
)

type ApplyForm struct {
ApplyID int
ApplyTime time.Time
Status int // 0: unapplied, 1: running, 2: finished
}

type Coordinator struct {
// Your definitions here.
MapTaskCnt int
MapFinished int
MapStatus []ApplyForm

ReduceTaskCnt int
ReduceFinished int
ReduceStatus []ApplyForm

files []string
lk sync.Mutex
}

// Your code here -- RPC handlers for the worker to call.

func (c *Coordinator) ApplyTask(args *WorkerArgs, reply *WorkerReply) error {
c.lk.Lock()
if c.MapFinished < c.MapTaskCnt {
curtime := time.Now()
for i := 0; i < c.MapTaskCnt; i += 1 {
if c.MapStatus[i].Status == 0 ||
(c.MapStatus[i].Status == 1 && curtime.Sub(c.MapStatus[i].ApplyTime).Seconds() >= 10) {
reply.TaskType = 1
reply.MapTaskCnt = c.MapTaskCnt
reply.ReduceTaskCnt = c.ReduceTaskCnt
reply.MapTaskID = i
reply.Mapfilename = c.files[i]
c.MapStatus[i].Status = 1
c.MapStatus[i].ApplyID = i
c.MapStatus[i].ApplyTime = time.Now()
c.lk.Unlock()
return nil
}
}
reply.TaskType = 3
c.lk.Unlock()
return nil
} else if c.ReduceFinished < c.ReduceTaskCnt {
curtime := time.Now()
for i := 0; i < c.ReduceTaskCnt; i += 1 {
if c.ReduceStatus[i].Status == 0 ||
(c.ReduceStatus[i].Status == 1 && curtime.Sub(c.ReduceStatus[i].ApplyTime).Seconds() >= 10) {
reply.TaskType = 2
reply.MapTaskCnt = c.MapTaskCnt
reply.ReduceTaskCnt = c.ReduceTaskCnt
reply.ReduceTaskID = i
c.ReduceStatus[i].Status = 1
c.ReduceStatus[i].ApplyID = i
c.ReduceStatus[i].ApplyTime = time.Now()
c.lk.Unlock()
return nil
}
}
reply.TaskType = 3
c.lk.Unlock()
return nil
} else {
reply.TaskType = 4
c.lk.Unlock()
return nil
}
panic("ApplyTask - Impossible to reach here") // Impossible to reach here
return nil
}

func (c *Coordinator) FinishedInform(args *WorkerArgs, reply *WorkerReply) error {
c.lk.Lock()
TaskType, TaskID := args.TaskType, args.TaskID

// if TaskType == 1 {
// fmt.Printf("Finished Inform from TaskType=%v, TaskID=%v, whose name is %v\n", TaskType, TaskID, c.files[TaskID])
// }

if TaskType == 1 {
c.MapFinished += 1
c.MapStatus[TaskID].Status = 2
} else if TaskType == 2 {
c.ReduceFinished += 1
c.ReduceStatus[TaskID].Status = 2
} else {
log.Fatalf("FinishedInform - received undefined TaskType %v", TaskType)
}
c.lk.Unlock()
return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
// Your code here.
c.lk.Lock()
ret := c.ReduceFinished == c.ReduceTaskCnt
c.lk.Unlock()
return ret
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.
c.files = files
c.MapTaskCnt = len(files)
c.ReduceTaskCnt = nReduce
c.MapFinished = 0
c.ReduceFinished = 0

cur_form := ApplyForm{}
cur_form.ApplyID = -1
cur_form.Status = 0

for i := 0; i < c.MapTaskCnt; i += 1 {
c.MapStatus = append(c.MapStatus, cur_form)
}
for i := 0; i < c.ReduceTaskCnt; i += 1 {
c.ReduceStatus = append(c.ReduceStatus, cur_form)
}

c.server()
return &c
}

worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package mr

import (
"fmt"
"log"
"net/rpc"
"hash/fnv"
"strconv"
"io/ioutil"
"encoding/json"
"os"
"sort"
"time"
)

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.

// uncomment to send the Example RPC to the coordinator.
// CallExample()
for {
args := WorkerArgs{}
reply := WorkerReply{}
ok := call("Coordinator.ApplyTask", &args, &reply)
if !ok || reply.TaskType == 4 {
break
}
if reply.TaskType == 1 {
// This is a Mapper
intermediate := []KeyValue{}
MapTaskID := reply.MapTaskID
filename := reply.Mapfilename
MapTaskCnt := reply.MapTaskCnt
ReduceTaskCnt := reply.ReduceTaskCnt

// Do the map as in sequential example
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Mapper - cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("Mapper - cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)

// hash the result to bucket
buckets := make([][]KeyValue, ReduceTaskCnt) // buckets[i] is a slice storing all kvpairs with hashID i
for _, kvpair := range intermediate {
HashedID := ihash(kvpair.Key) % MapTaskCnt
buckets[HashedID] = append(buckets[HashedID], kvpair)
}

// write to temporary file
for ReduceTaskID := range buckets {
storename := "mr-" + strconv.Itoa(MapTaskID) + "-" + strconv.Itoa(ReduceTaskID)
tmpfile, err := ioutil.TempFile("./", storename + "-tmp")
if err != nil {
fmt.Println(err)
}
enc := json.NewEncoder(tmpfile)
for _, kv := range buckets[ReduceTaskID] {
enc.Encode(&kv)
}
// Rename to mr-XX-XX
tmpfile.Close()
if err := os.Rename(tmpfile.Name(), storename); err != nil {
fmt.Println("Error renaming file:", err)
}
}

// Inform Coordinator that this Map has finished
args := WorkerArgs{}
reply := WorkerReply{}
args.TaskType = 1
args.TaskID = MapTaskID
call("Coordinator.FinishedInform", &args, &reply)

} else if reply.TaskType == 2 {
// This is a Reducer
intermediate := []KeyValue{}
ReduceTaskID := reply.ReduceTaskID
MapTaskCnt := reply.MapTaskCnt
// ReduceTaskCnt := reply.ReduceTaskCnt

// Read from file to intermediate
for MapTaskID := 0; MapTaskID < MapTaskCnt; MapTaskID += 1 {
storename := "mr-" + strconv.Itoa(MapTaskID) + "-" + strconv.Itoa(ReduceTaskID)
storefile, err := os.Open(storename)
if err != nil {
log.Fatalf("Reducer - cannot open %v", storename)
}
dec := json.NewDecoder(storefile)
for {
var kvpair KeyValue
if err := dec.Decode(&kvpair); err != nil {
break
}
intermediate = append(intermediate, kvpair)
}
storefile.Close()
}

// Reduce and Save to temporary file, then rename to mr-out-XX
sort.Sort(ByKey(intermediate))
oname := "mr-out-" + strconv.Itoa(ReduceTaskID)
ofile, _ := ioutil.TempFile("./", oname + "-tmp")

i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}
os.Rename(ofile.Name(), oname)
ofile.Close()

// Inform Coordinator that this Reduce has finished
args := WorkerArgs{}
reply := WorkerReply{}
args.TaskType = 2
args.TaskID = ReduceTaskID
call("Coordinator.FinishedInform", &args, &reply)
} else if reply.TaskType == 3 {
// Currently no task to do
time.Sleep(time.Second)
} else {
log.Fatalf("Worker - received unknown type %v\n", reply.TaskType)
time.Sleep(time.Second)
}
}
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}