MIT 6.5840(原 6.824) lab1

MIT 6.5840 lab1

前言

论文为 MapReduce,中文版:MapReduce:在大型集群上简化数据处理,英文版:rfeet.qrk (mit.edu)

视频课看的翻译(p1-p8):simviso-开源分享,传播知识 (simtoco.com)

代码实现仓库地址:Hardews/6.5840: MIT 6.5840(原 6.824)lab (github.com)

我认为需要的前置知识

  1. Golang 基础(这个网上很多教程
  2. 论文阅读 / 视频:要知道 MapReduce 到底是个什么东西
  3. rpc:Go RPC开发简介 - 官方RPC库
  4. 要求:6.5840 lab 1:MapReduce (mit.edu)

实现

Coordinator

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Coordinator struct {
// base,这是一些基本的东西
Mu sync.Mutex // 互斥锁,操作一些共享变量时需要的
Exp time.Duration // 过期时间
NReduce int // reduce 的数目
InputFiles []string // 输入文件的文件名数组

// Distribute use
DistributeSeqChan chan int // 分发 Job 的 channel
DistributeChan chan JobInfo // 分发 Map Job 处理内容的 channel

// Job info
JobAlive []chan bool // Job 心跳 channel
JobInfo map[int]JobInfo // Job 的基本信息 map
IsAllJobDone, IsMapJobDone, IsReduceReady bool // 所有 Job 是否完成,Map Job 是否完成, Reduce Job 是否准备完毕
MapWorkerNum, ReduceWorkerNum int // 当前 Map Job 的 worker 数目,当前 Reduce Job 的 worker 数目
}

type JobInfo struct {
Filename string // 该 worker 正在处理的文件名
Content string // 该 worker 正在处理的内容
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.NReduce = nReduce

// 先运行内容分发程序
c.InputFiles = files
c.DistributeChan = make(chan JobInfo, nReduce)
go c.contentDistribute()

// seq 分发队列
c.DistributeSeqChan = make(chan int, nReduce)
for i := 0; i < nReduce; i++ {
c.DistributeSeqChan <- i
}

c.JobInfo = make(map[int]JobInfo)
c.JobAlive = make([]chan bool, c.NReduce)

c.Exp = MaxTimeLimit * time.Second

c.server()
return &c
}

并没有做 coordinator 的崩溃恢复(论文里给的解决方案是定时备份 master 的数据结构,然后 make 的时候进行恢复)

这里的初始化的关键是内容分发协程(实际上就是起一个协程,读取文件中的内容,发送到一个管道中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *Coordinator) contentDistribute() {
for len(c.InputFiles) != 0 {
filename := c.InputFiles[0]
file, err := os.Open(filename)
if err != nil {
log.Fatalf("open: %s, err: %s", filename, err.Error())
}

// 如果一个文件过大,这里可以改为向分发管道发送相应字节的 content
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("read: %s, err: %s", filename, err.Error())
}

c.DistributeChan <- JobInfo{
Filename: filename,
Content: string(content),
}

// 因为考虑不会有 coordinator 挂掉的情况,所以直接将其出队
// 出队
c.InputFiles = c.InputFiles[1:]
}
}

Worker

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
var isDone bool
for {
if !isDone && !DealMap(mapf) {
var isMapDone DoneReply
call(IsMapDoneRpcName, &NullReply{}, &isMapDone)
isDone = isMapDone.IsDone
} else if isDone && !DealReduce(reducef) {
// 没有分配到 reduce 操作
var Done DoneReply
call(IsReduceDoneRpcName, &NullReply{}, &Done)
if Done.IsDone {
// 全部处理完了,程序应该退出
return
}
}
// 休眠一下,防止它抢太快了别的抢不到
time.Sleep(time.Second / 2)
}
}

主要是通过一个 for 循环来获取任务分配

开始前会有个 isDone,这是用来判断 Map Job 是否已经结束。如果结束了,那么直接去请求是否可以执行 reduce 任务。

所以,这个初始化的逻辑就很简单了,就是循环去尝试是否能分配到任务,分配到执行完后,或者没分配到。我们都会让其睡眠一下,这是为了防止 map parallelism 阶段起的所谓并行程序太少(因为其实一个 map 或者一个 reduce 任务执行的速度是很快的),导致过不了。

并且,这里说一下两个 rpc 方法。用来给 worker 通知 map Job 是否都结束了,并且 reduce Job 是否也结束了。如果两者都结束了,worker 可以退出。

1
2
3
4
5
6
7
8
9
func (c *Coordinator) IsMapDone(args *NullArgs, reply *DoneReply) error {
reply.IsDone = c.IsMapJobDone
return nil
}

func (c *Coordinator) IsReduceDone(args *NullArgs, reply *DoneReply) error {
reply.IsDone = c.IsAllJobDone
return nil
}

可以看到主要是通过 coordinator 的两个字段来返回和判断。

RPC Setting

对于一些 rpc 的参数定义,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type NullArgs struct{}

type NullReply struct{}

type DisArgs struct {
JobType int
}

type DisReply struct {
JobSeq int
NReduce int
Filename string
Content string
}

type DoneReply struct {
IsDone bool
}

type SeqArgs struct {
JobSeq int
}

Job 处理

Job 的分发

去掉了一些打印信息(用于 debug 的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (c *Coordinator) JobDistribute(args *DisArgs, reply *DisReply) error {
reply.JobSeq = -1
if len(c.DistributeSeqChan) == 0 || c.IsAllJobDone {
return nil
}

if args.JobType == MapJob && c.IsMapJobDone {
return nil
}

select {
case seq := <-c.DistributeSeqChan:
if !c.IsMapJobDone {
// map job
jobInfo, ok := <-c.DistributeChan
if !ok {
// 已经没任务了
c.DistributeSeqChan <- seq
return nil
}

reply.Filename = jobInfo.Filename
reply.Content = jobInfo.Content

c.Mu.Lock()
c.JobInfo[seq] = jobInfo
c.MapWorkerNum++
c.Mu.Unlock()
} else if c.IsReduceReady {
// reduce job
c.Mu.Lock()
c.ReduceWorkerNum++
c.JobInfo[seq] = JobInfo{}
c.Mu.Unlock()
} else {
c.DistributeSeqChan <- seq
return nil
}

// 通用信息
reply.JobSeq = seq
reply.NReduce = c.NReduce
go c.handleAlive(seq, c.Exp)
default:
}

return nil
}

主要分为两个部分:

  • 第一部分是分发前的校验:如果没有可用的 seq,或者说此时任务已经全部处理完了,那么直接返回即可。如果是获取 Map Job 的请求,但是此时 Map Job 已经全部完成了,也全部返回。
  • 第二部分则是 seq 的处理,如果从 channel 中拿到了 seq,那么就设置相关参数,并返回。

其实文件的内容可以不用 rpc 传,直接传一个文件名就好了。但这里是为了采取如果遇到大文件,就分开读取发送这样的策略,所以没这么做(虽然测试用例没有大文件,实际上也没有将文件分成一份一份的。

Map Job

而 worker 中,执行 map 任务的代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func DealMap(mapf func(string, string) []KeyValue) (isTakeJob bool) {
/* 第一部分 */
isTakeJob = true
var disReply DisReply
ok := call(JobDistributionRpcName, &DisArgs{MapJob}, &disReply)
if !ok || disReply.JobSeq == -1 || disReply.Filename == "" {
// 没拿到资源,那么就等待
return false
}

nReduce := disReply.NReduce
seq, filename, content := disReply.JobSeq, disReply.Filename, disReply.Content

var ctx, cancel = context.WithCancel(context.Background())
go KeepAlive(ctx, seq)

/* 第二部分 */
// 记录分区文件对应的 KeyValue 对
var recordKF = make(map[string][]KeyValue)
kvs := mapf(filename, content)
for _, KV := range kvs {
subzone := ihash(KV.Key) % nReduce
// filename is mr-X-Y, X -> subzone num, Y -> worker seq
fn := fmt.Sprintf("mr-%d-%d.txt", seq, subzone)
if _, isExist := recordKF[fn]; isExist {
recordKF[fn] = append(recordKF[fn], KV)
} else {
recordKF[fn] = []KeyValue{KV}
}
}

for iFilename, in := range recordKF {
res, err := json.Marshal(&in)
if err != nil {
log.Printf("open: %s, err: %s", iFilename, err.Error())
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
cancel()
return
}

// 只写
file, err := os.OpenFile(iFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
if err != nil {
log.Printf("open: %s, err: %s", iFilename, err.Error())
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
cancel()
return
}

file.Write(res)
file.Close()
}

/* 第三部分 */
call(MapJobFinishRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
cancel()
return
}

在注释中,我将代码分成了三部分,其中:

  • 第一部分,从 coordinator 中获取 seq 号和分配的 Job 内容。如果没有拿到,那么此函数返回。
  • 第二部分,是 map 的主要处理逻辑:
    • 参考 mrsequential.go,先将 filename 和 content 的内容传入 mapf,得到一个 KeyValue 数组。
    • 之后,遍历这个数组,对于每一组 key,根据 ihash 函数的结果 mod nReduce 得到它应该的分区号。指导中写到,一个中间文件的合理命名是 mr-X-Y,X 是 map 的序号(我这里是 seq),而 Y 是分区号。需要提的一点是,我的中间文件命名是 mr-X-Y.txt,根据这个后缀进行清理(没有使用 tempFile)。
    • 对于这些计算出来的分区号,组合成它应该写入的中间文件文件名,先使用一个 map 记录,key 为中间文件的名字,value 则为 KeyValue 数组。
    • 遍历上述说到的 map,将该文件的 KeyValue 数组序列化为 json 字符串,然后写入。
    • 至此,我们算是完成了 map Job。
  • 第三部分,告诉 coordinator 我们完成了这个 map Job,并退出心跳处理程序。

对于 Map 任务何时完成,由上面知道,由 worker 主动通过 rpc 调用来报告。coordinator 对于 map 任务完成的处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *Coordinator) MapJobFinish(args *SeqArgs, reply *NullReply) error {
// 删除信息
c.Mu.Lock()
delete(c.JobInfo, args.JobSeq)
// 防止 seq 被复用, 不将已完成的 seq 退回
c.MapWorkerNum--
c.Mu.Unlock()

// log.Printf("map job finish, seq: %d, now num: %d, content chan: %d", args.JobSeq, c.MapWorkerNum, len(c.DistributeChan))

if !c.IsMapJobDone && c.MapWorkerNum == 0 && len(c.DistributeChan) == 0 && len(c.InputFiles) == 0 {
// 所有的 map worker 都处理完了
c.reduceJobStart()
c.IsMapJobDone = true
}

return nil
}

这里有一点是,已完成的 seq 并不会放回到原来的分发队列中。也就是 seq 不会被复用,至于为什么在后面对于 crash 的处理会提到。

我们可以看到,当所有的 map Job 完成后,会执行一个 reduceJobStart 方法。这个主要是 reduce Job 的一些初始化。之后,将 IsMapJobDone 设为 true。

Reduce Job

在 map Job 完成后,会进行 reduce Job 的初始化:

1
2
3
4
5
6
7
8
9
10
func (c *Coordinator) reduceJobStart() {
log.Println(strings.Repeat("-", c.NReduce), "start reduce job", strings.Repeat("-", c.NReduce))
c.DistributeSeqChan = make(chan int, c.NReduce)
for i := 0; i < c.NReduce; i++ {
c.DistributeSeqChan <- i
}
c.JobInfo = make(map[int]JobInfo)
c.JobAlive = make([]chan bool, c.NReduce)
c.IsReduceReady = true
}

在初始化完成后,将 IsReduceReady 设为 true。seq 分发程序就可以对 worker 进行 seq 的分发。

worker 执行 reduce 任务的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
func DealReduce(reducef func(string, []string) string) bool {
/* 第一部分 */
// 当前 worker 是否可以进行 reduce 的处理
var disReply DisReply
ok := call(JobDistributionRpcName, &DisArgs{ReduceJob}, &disReply)
if !ok || disReply.JobSeq == -1 {
// 没拿到
return false
}

// worker 的基本参数配置
seq := disReply.JobSeq

ctx, cancel := context.WithCancel(context.Background())
go KeepAlive(ctx, seq)

/* 第二部分 */
// 找到对应分区下的所有中间输出文件
pattern := fmt.Sprintf("mr-*-%d.txt", seq)

files, err := filepath.Glob(pattern)
if err != nil {
log.Printf("glob file, err: %s", err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

// reduce 处理核心逻辑
var intermediate []KeyValue
for _, middleFilename := range files {
// 该分区的所有文件,全是 KeyValue 的 JSON 存储
middleFile, err := os.OpenFile(middleFilename, os.O_RDONLY, os.ModePerm)
if err != nil {
log.Printf("open: %s, err: %s", middleFilename, err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

content, err := io.ReadAll(middleFile)
if err != nil {
log.Printf("read: %s, err: %s", middleFilename, err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

var i []KeyValue
err = json.Unmarshal(content, &i)
if err != nil {
log.Printf("read: %s, err: %s", middleFilename, err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

intermediate = append(intermediate, i...)
}

sort.Sort(ByKey(intermediate))

output := fmt.Sprintf("%s-%d", outputFilename, seq)
os.Remove(output)
file, err := os.OpenFile(output, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Printf("open: %s, err: %s", output, err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

var outputRes string
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
outputStr := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
outputRes += fmt.Sprintf("%v %v\n", intermediate[i].Key, outputStr)

i = j
}

_, err = file.Write([]byte(outputRes))
if err != nil {
log.Printf("write: %s, err: %s", output, err.Error())
cancel()
call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
return true
}

file.Close()

/* 第三部分 */
call(ReduceJobFinishRpcName, &SeqArgs{JobSeq: seq}, &NullReply{})
cancel()
return true
}

reduce 代码我也分成了三部分,第一部分和第三部分和 map 一样。所以这里将 reduce 的处理逻辑:

  • 我们需要获取该分区下的所有文件,那么就需要匹配 mr-*-seq.txt 的文件就 ok 了,seq 这是这个 Job 的序号,也是它应该处理的分区
  • 遍历所有匹配到的文件名,读取这些文件,将其内容反序列化到一个 intermediate 数组中
  • 这里和 mrsequentis.go 中的后半部分一样,排序,将这些和文件名一起传入到 reducef 函数中,将输出追加到 outputRes 中
  • 将 outputRes 写入输出文件
  • 至此,完成 reduce 的处理逻辑

而 coordinator 处理 reduce Job 完成的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *Coordinator) ReduceJobFinish(args *SeqArgs, reply *NullReply) error {
c.Mu.Lock()
delete(c.JobInfo, args.JobSeq)
c.ReduceWorkerNum--
c.Mu.Unlock()

pattern := fmt.Sprintf("mr-*-%d.txt", args.JobSeq)
files, err := filepath.Glob(pattern)
if err != nil {
log.Println("delete temp file failed, err:", err)
}

for _, filename := range files {
err = os.Remove(filename)
if err != nil {
log.Printf("clean: %s, err: %s", filename, err.Error())
}
}

if len(c.DistributeSeqChan) == 0 && c.ReduceWorkerNum == 0 && c.IsMapJobDone {
time.Sleep(c.Exp / 2)
if len(c.DistributeSeqChan) == 0 && c.ReduceWorkerNum == 0 && c.IsMapJobDone {
c.IsAllJobDone = true
}
}

return nil
}

和 map 处理差不多。唯一有区别的是,当我们判断到所有 Job 都完成时,等待一个过期时间(这里为了快点结束,除了 2),然后,再进行一次判断。如果成立,则将 IsAllJobDone 设为 true。此时,所有的 worker 都应该退出。

当所有的 Job 完成后,主程序调用 Done 方法会得到一个 true 的返回值。至此,coordinator 退出。

1
2
3
func (c *Coordinator) Done() bool {
return c.IsAllJobDone
}

容错

再提一下,并没有做 coordinator 的崩溃恢复(论文里给的解决方案是定时备份 master 的数据结构,然后 make 的时候进行恢复)。

心跳机制

关于判断当前 worker 是否存活我采用的是心跳机制。

对于 coordinator,在每一次分发一个 Job 时,会为这个 Job 特有的 seq 启动一个 handleAlive 进程。也就是:

1
go c.handleAlive(seq, c.Exp)

其中,seq 是 Job 的 seq, Exp 则是初始化时定义的一个过期时间(lab 提到是 10s,但是为了提高测试的速度,我都缩短了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 一个 goroutine,异步处理每一个正在工作的 worker
// 当达到了超时时间,还没调用 Ping 函数,则视为 done 掉
func (c *Coordinator) handleAlive(workerSeq int, exp time.Duration) {
c.JobAlive[workerSeq] = make(chan bool, 1)
// 这里这句,因为一开始可能没 Ping 到,然后 case <-t.C 会先执行
c.JobAlive[workerSeq] <- true

t := time.NewTicker(exp)
for {
select {
case <-c.JobAlive[workerSeq]:
if _, ok := c.JobInfo[workerSeq]; !ok {
// 这个资源已经完成了
return
}
t.Reset(exp)
case <-t.C:
if _, ok := c.JobInfo[workerSeq]; !ok {
// 这个资源已经完成了
return
}
// 这个 worker 寄了,释放它处理的资源
log.Printf("job seq: %d, time limit done", workerSeq)
c.giveJobBack(workerSeq)
return
default:
}
}
}

JobAlive 是一个类型为 chan bool 的数组,每一个 seq 对应数组中的一个 channel。因为我们的 seq 是固定的,也就是 seq 就是 0-nReduce。所以,用数组可以加快访问速度,且没有 map 那样的并发问题(同时读写)。

这个函数的大体逻辑就是,当对应的 seq 管道收到一个数据时,重置定时器的时间。那么当这个管道在超时时间里没有接收到数据时,就会执行释放资源的操作。关于将任务退回的逻辑,下面会讲。那么,这就构成了一个很经典的心跳监测机制。

有三行出现在每个 case 里:

1
2
3
4
if _, ok := c.JobInfo[workerSeq]; !ok {
// 这个资源已经完成了
return
}

正如注释所言,对于一个 Job,如果完成了在 JobInfo 这个 map 里就会删掉,直接退出这个函数即可。其实可以使用 context,使用 cancel 方法将这个进程退出,但为了实现方便,就这么做了。

并且,它需要给 worker 提供一个 rpc 方法是 Ping:

1
2
3
4
5
// Ping 判断是否存活的依据
func (c *Coordinator) Ping(args *SeqArgs, reply *NullReply) error {
c.JobAlive[args.JobSeq] <- true
return nil
}

这个函数在 worker 调用这个方法时,往对应的管道发送一个数据。异不异步都可以,这里是因为当时搞得没缓存的 channel 导致阻塞了。

对于每一个 worker,则时不时要调用一个 Ping 方法来告诉 coordinator 你还活着。

当 worker 分配到一个 Job 时,则起一个协程运行 KeepAlive 函数,调用 Ping ,如下:

1
go KeepAlive(ctx, seq)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func KeepAlive(ctx context.Context, workSeq int) {
var t = time.NewTicker(time.Second / 2)
for {
select {
case <-ctx.Done():
return
case <-t.C:
ok := call(PingRpcName, &SeqArgs{workSeq}, &NullReply{})
if !ok {
return
}
default:
continue
}
}
}

当处理结束或遇到意外的错误时,使用 context 退出 KeepAlive 程序。

giveJobBack

这个函数是用于将挂掉的,或者主动挂掉的 worker 正在处理的任务给放回分发队列中,使其任务可以再次被执行。

具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 出错的超时的 worker 送回队列,并将其正在处理的任务送入队列
func (c *Coordinator) giveJobBack(seq int) {
if !c.IsMapJobDone && !c.IsReduceReady && !c.IsAllJobDone {
pattern := fmt.Sprintf("mr-%d-*.txt", seq)
files, err := filepath.Glob(pattern)
if err != nil {
log.Printf("glob: %v, err: %v", pattern, err)
}
for _, filename := range files {
err = os.Remove(filename)
if err != nil {
log.Printf("remove temp file: %v, err: %v", pattern, err)
}
}

c.Mu.Lock()
wi := c.JobInfo[seq]
delete(c.JobInfo, seq)
c.MapWorkerNum--
c.Mu.Unlock()

c.DistributeChan <- wi
c.DistributeSeqChan <- seq
return
}

c.Mu.Lock()
c.ReduceWorkerNum--
delete(c.JobInfo, seq)
c.Mu.Unlock()

c.DistributeSeqChan <- seq
return
}

第一个 if 语句是用来区分是 map Job 重新执行还是 reduce Job 重新执行。

对于 map Job 重新执行,需要将挂掉的 worker 执行时生成的临时文件全部删掉,然后删除 JobInfo 的内容,并将 MapWorkerNum 减 1。这里用了锁去防止并发问题。然后是将该 seq 送回分发队列,并将处理的内容送回到内容分发队列。

对于 reduce Job,与 map Job 的区别是,不需要将临时文件删除。其余的都一致。

Handle Error

在进入到 Job 处理部分之前,先说一下对一些 error 的处理(虽然在测试的时候这些 error 都没出现过。

在 worker 部分,我们在处理时可能会遇到一种情况是:if err != nil 这个表达式为 true,那么此时我们需要做的就不仅仅是打印错误(仅代表个人观点,保命),退出当前的处理。应该主动去通知 coordinator 将这次处理给取消掉。对于这个方法,应该是由 coordinator 提供的 rpc 方法,如下:

1
2
3
4
5
// HandleError worker 处理时出错,主动断开
func (c *Coordinator) HandleError(args *SeqArgs, reply *NullReply) error {
c.giveJobBack(args.JobSeq)
return nil
}

总结

一些杂谈

  1. 是 worker 去请求 coordinator 分配资源(通过 rpc
  2. Map 和 Reduce 不是异步操作。换句话说,worker 执行完 mapf 后,再去请求执行 reducef 或者说继续执行 mapf 操作。
  3. rpc 并没有改成 tcp 通信,这是因为在测试脚本运行到 crash 阶段时,采用 tcp 通信大多数情况下只会起一个 worker。

debug 时遇到的 bug 以及一些思路解释

  1. 一开始按照自己的想法来,没有参考 mrsequential.go 的代码,导致输出文件与结果不一致(不认真看指导导致的
  2. 之前说到 map Job 的 seq 是不复用的,这是因为在将挂掉的 worker 的 job 重新执行时,我会将它产生的所有临时文件删除。那么,可以预见的一种情况是:一个 worker 执行完了一个 map Job,将这个 seq 退回。此时,有一个 worker 再一次拿到了这个 seq,当这个 worker 挂了后,coordinator 会将上一个 worker 执行完的临时文件也删掉。就算第二个 worker 没挂掉,那么在处理中间文件的时候,如果遇到了已存在的文件,就需要将其读取出来,和当前的结果一起序列化成一个 json 字符串,写入这个中间文件。
  3. 为什么加上 IsReduceReady 这个字段呢。这是因为当时 worker 的请求是不带 wType 的,那么有可能在 reduce 初始化没完成的时候,就将 seq 拿走,但是却是用于 map,虽然不会执行 map Job,但是这个 seq 也是会消失了。后面还是加上了 wType,算是做了双重验证。
  4. 在一些特殊情况(跟第三条差不多),会遇到拿了 seq,此时是 reduce Job 阶段,但是它请求的是 map Job,发现了但是 seq 没有放回去,导致 reduce Job 的分区有一些没处理。呃这个问题也是双重验证解决。
  5. map 的同时读写,是前面我很头疼的一个问题,这是因为我之前在记录 Job 信息的时候使用了 map 记录存活状态,当收到 Ping 的时候,我会修改对应 seq 的状态。后面,我加上了互斥锁,但是我发现性能会降低不少。所以我采用了一个管道数组的方案。因为使用这种心跳机制,状态的修改会很频繁。后面,其实可以不用状态,直接使用一个管道即可。
  6. 而对于一个 bug,是测试时,一个 worker 完成了 Job,但是 coordinator 为它运行的 handleAlive 进程还没退出。后面判断超时的时候,又将这个 seq 送了回去。这就导致了重复处理,并且分区文件被删除了,得到的输出文件就为空了。
  7. 还有一些忘了,大多数 bug 都是并发导致的。

最后,所有代码均为独立完成。