MIT6.824 Lab 1 - MapReduce

前言

MIT6.824 是一门久负盛名的分布式系统课程,今天开始我将会将课程中的系统作业、学习感悟写在专栏 MIT6.824 Labs 中,与大家一起学习这门非常有趣的课程

完整代码(含答案)见 github

MapReduce

MapReduce 的具体架构及思想 paper 中已经讲的很详细了,在此不再赘述,仅贴出架构图供大家复习:

习题讲解

Part I: Map/Reduce input and output

要求完成 doMap 和 doReduce 函数

doMap

doMap 函数执行一个 map 任务,它的运行流程如下:

  1. 读取输入文件 (inFile)
  2. 调用用户编写的 map 函数 (mapF) 处理输入文件
  3. 然后将 map 函数的输出分片到 r 个缓存文件中(其中 r 为 reduce task 的个数)

Tips:

  1. map 函数的输出为键值对列表
  2. 使用 ihash 确定输出文件索引值的目的是为了使输出均匀分布,且对于不同的 worker,同一个 key 会落在 reduce index 相同的输出文件
  3. 空间复杂度为 O(n),其中 n 为 key 的个数
  4. 读取文件使用 ioutil#ReadFile
  5. 将 KeyValue 写入文件可以用 encoding/json#Encoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
// Read content of inFile
data, err := ioutil.ReadFile(inFile)
if err != nil {
log.Printf("Error: %v", err)
}

// Execute map function
keyValuePairs := mapF(inFile, string(data[:len(data)]))

// Split the keyValuePairs into a multidimensional array whose size is the
// number of reduce tasks
divided := make([][]KeyValue, nReduce)
for _, keyValue := range keyValuePairs {
index := ihash(keyValue.Key) % nReduce
divided[index] = append(divided[index], keyValue)
}
debug("%#v", divided)

// Write divided keyValuePairs to R files, where R is the number of reduce
// tasks
for i, subKeyValuePairs := range divided {
// Get filename for output
filePath := reduceName(jobName, mapTask, i)

// Create output file
f, err := os.Create(filePath)
if err != nil {
log.Printf("Error: %v", err)
}
enc := json.NewEncoder(f)
for _, kv := range subKeyValuePairs {
err := enc.Encode(&kv)
if err != nil {
log.Printf("Error: %v", err)
}
}

f.Close()
}
}

func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}

doReduce

doReduce 函数运行一个 reduce task,它的运行流程:

  1. 读取 m 个缓存文件(m 为 map task 的个数)
  2. 将相同 key 对应的 value 合并为一个列表
  3. 处理每个 key:将其对应的 value 列表传入 reduce 函数 (reduceF),得到最终的 value
  4. 将结果写入到磁盘

Tips:

  1. 按行读取文件可以用 bufio#Scanner
  2. JSON 解序列化可以用 json#Unmarshaler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
keyValuesMap := make(map[string][]string)

// Read intermediate files
for i := 0; i < nMap; i++ {
filename := reduceName(jobName, i, reduceTask)
file, err := os.Open(filename)
if err != nil {
log.Printf("Error: %v", err)
}

var tempKeyValuePairs []KeyValue
var keyValue KeyValue
fileScanner := bufio.NewScanner(file)
for fileScanner.Scan() {
json.Unmarshal([]byte(fileScanner.Text()), &keyValue)
tempKeyValuePairs = append(tempKeyValuePairs, keyValue)
}
debug("KeyValues: %v", tempKeyValuePairs)

for _, keyValue := range tempKeyValuePairs {
key := keyValue.Key
if values, ok := keyValuesMap[key]; ok {
keyValuesMap[key] = append(values, keyValue.Value)
} else {
keyValuesMap[key] = []string{keyValue.Value}
}
}
}

file, err := os.Create(outFile)
if err != nil {
log.Printf("Error: %v", err)
}
enc := json.NewEncoder(file)
for k, v := range keyValuesMap {
enc.Encode(KeyValue{k, reduceF(k, v)})
}
file.Close()
}

Part II: Single-worker word count

mapF

每读入一个输入文件,map 函数就会被执行一次,第一个参数是文件名,第二个参数是文件内容。由于我们这里的目标是做 word count,所以 key 是 word,value 是 word 在该文件中出现的次数

Tips:

  1. 单词分隔可以用 strings#FieldsFunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
notWord := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
words := strings.FieldsFunc(contents, notWord)
wordCountMap := make(map[string]int)
for _, word := range words {
if _, ok := wordCountMap[word]; ok {
wordCountMap[word]++
} else {
wordCountMap[word] = 1
}
}
var wordCount []mapreduce.KeyValue
for k, v := range wordCountMap {
wordCount = append(wordCount, mapreduce.KeyValue{Key: k, Value: strconv.Itoa(v)})
}
return wordCount
}

reduceF

对于每一个 key,reduce 函数会被执行一次,第一个参数是 key,第二个参数是其对应的所有值组成的列表

1
2
3
4
5
6
7
8
9
10
11
12
13
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, v := range values {
v, err := strconv.Atoi(v)
if err != nil {
fmt.Print("Error: %v, value: %v", err, v)
} else {
sum += v
}
}
return strconv.Itoa(sum)
}

Part III: Distributing MapReduce tasks

在这里我们将会完成 schedule 函数。schedule 函数负责调度工作,它在程序的整个生命周期中会运行两次:map 阶段一次,reduce 阶段一次。

这里我们在 goroutine 中使用指针访问外部数据是非常错误的写法,一旦 doTaskArgs 的值被外部程序修改,就会导致不可预料的结果。而这里我们的程序正常运行只是由于每次循环都会重新声明 doTaskArgs。

正确的做法是将 goroutine 所需的外部变量作为参数传入 goroutine。

同时,将任务分配完毕后直接跳出分配循环的做法也是不正确的,在 Part IV 中我们将会看到更正确、更优雅的写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

finishedTasks := 0
for i := 0; i < ntasks; i++ {
workerAddress := <-registerChan
fmt.Printf("Assigning #%d task to %s\n", i, workerAddress)
inputFile := mapFiles[i]
doTaskArgs := DoTaskArgs{
JobName: jobName,
File: inputFile,
Phase: phase,
TaskNumber: i,
NumOtherPhase: n_other,
}
go func() {
success := call(workerAddress, "Worker.DoTask", &doTaskArgs, nil)
if success {
fmt.Printf("Task success\n")
registerChan <- workerAddress
finishedTasks++
} else {
fmt.Printf("Task failed\n")
}
}()
}

unfinishedTasks := ntasks - finishedTasks
for i := 0; i < unfinishedTasks; i++ {
<-registerChan
}

fmt.Printf("Schedule: %v done\n", phase)
}

Part IV: Handling worker failures

之前完成的 schedule 函数将任务分配完之后就开始等待所有任务完成,无法处理 worker failures。

在这里我们对它进行改进,主要:

  1. 将 channel receive 改为 unblock
  2. 失败的 task 重新入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

finishedTasks := 0
tasksChan := make(chan int, ntasks)
var taskIndex int
for taskIndex := 0; taskIndex < ntasks; taskIndex++ {
tasksChan <- taskIndex
}
var inputFile string
for finishedTasks < ntasks {
select {
case taskIndex = <-tasksChan:
workerAddress := <-registerChan
fmt.Printf("Assigning #%d task to %s\n", taskIndex, workerAddress)
inputFile = mapFiles[taskIndex]
doTaskArgs := DoTaskArgs{
JobName: jobName,
File: inputFile,
Phase: phase,
TaskNumber: taskIndex,
NumOtherPhase: n_other,
}
go func() {
success := call(workerAddress, "Worker.DoTask", &doTaskArgs, nil)
if success {
fmt.Printf("Task #%d success\n", doTaskArgs.TaskNumber)
finishedTasks++
registerChan <- workerAddress
} else {
fmt.Printf("Task #%d failed\n", doTaskArgs.TaskNumber)
tasksChan <- doTaskArgs.TaskNumber
}
}()
default:
}
}

fmt.Printf("Schedule: %v done\n", phase)
}

Part V: Inverted index generation

在这里我们将会实现 inverted index, 与之前的 word count 正好相反,现在 word 作为 key,而 document 作为 value

doMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
notWord := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
words := strings.FieldsFunc(value, notWord)
wordDoc := make(map[string]string)
for _, word := range words {
if _, ok := wordDoc[word]; !ok {
wordDoc[word] = document
}
}
for k, v := range wordDoc {
res = append(res, mapreduce.KeyValue{Key: k, Value: v})
}
return res
}

doReduce

由于 values 不存在冲突的情况,所以直接 join 起来就可以了。

1
2
3
4
5
6
7
8
9
10
func reduceF(key string, values []string) string {
// Your code here (Part V).
var documents []string
for _, v := range values {
documents = append(documents, v)
}
res := strings.Join(documents, ",")
res = strconv.Itoa(len(documents)) + " " + res
return res
}