MIT 6.5840 lab1 前言 论文为 MapReduce,中文版:MapReduce:在大型集群上简化数据处理 ,英文版:rfeet.qrk (mit.edu)
视频课看的翻译(p1-p8):simviso-开源分享,传播知识 (simtoco.com)
代码实现仓库地址:Hardews/6.5840: MIT 6.5840(原 6.824)lab (github.com)
我认为需要的前置知识
Golang 基础(这个网上很多教程
论文阅读 / 视频:要知道 MapReduce 到底是个什么东西
rpc:Go RPC开发简介 - 官方RPC库
要求:6.5840 lab 1:MapReduce (mit.edu)
实现 Coordinator 数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type Coordinator struct { Mu sync.Mutex Exp time.Duration NReduce int InputFiles []string DistributeSeqChan chan int DistributeChan chan JobInfo JobAlive []chan bool JobInfo map [int ]JobInfo IsAllJobDone, IsMapJobDone, IsReduceReady bool MapWorkerNum, ReduceWorkerNum int } type JobInfo struct { Filename string Content string }
初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func MakeCoordinator (files []string , nReduce int ) *Coordinator { c := Coordinator{} c.NReduce = nReduce c.InputFiles = files c.DistributeChan = make (chan JobInfo, nReduce) go c.contentDistribute() c.DistributeSeqChan = make (chan int , nReduce) for i := 0 ; i < nReduce; i++ { c.DistributeSeqChan <- i } c.JobInfo = make (map [int ]JobInfo) c.JobAlive = make ([]chan bool , c.NReduce) c.Exp = MaxTimeLimit * time.Second c.server() return &c }
并没有做 coordinator 的崩溃恢复(论文里给的解决方案是定时备份 master 的数据结构,然后 make 的时候进行恢复)
这里的初始化的关键是内容分发协程(实际上就是起一个协程,读取文件中的内容,发送到一个管道中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (c *Coordinator) contentDistribute() { for len (c.InputFiles) != 0 { filename := c.InputFiles[0 ] file, err := os.Open(filename) if err != nil { log.Fatalf("open: %s, err: %s" , filename, err.Error()) } content, err := io.ReadAll(file) if err != nil { log.Fatalf("read: %s, err: %s" , filename, err.Error()) } c.DistributeChan <- JobInfo{ Filename: filename, Content: string (content), } c.InputFiles = c.InputFiles[1 :] } }
Worker 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { var isDone bool for { if !isDone && !DealMap(mapf) { var isMapDone DoneReply call(IsMapDoneRpcName, &NullReply{}, &isMapDone) isDone = isMapDone.IsDone } else if isDone && !DealReduce(reducef) { var Done DoneReply call(IsReduceDoneRpcName, &NullReply{}, &Done) if Done.IsDone { return } } time.Sleep(time.Second / 2 ) } }
主要是通过一个 for 循环来获取任务分配
开始前会有个 isDone,这是用来判断 Map Job 是否已经结束。如果结束了,那么直接去请求是否可以执行 reduce 任务。
所以,这个初始化的逻辑就很简单了,就是循环去尝试是否能分配到任务,分配到执行完后,或者没分配到。我们都会让其睡眠一下,这是为了防止 map parallelism 阶段起的所谓并行程序太少(因为其实一个 map 或者一个 reduce 任务执行的速度是很快的),导致过不了。
并且,这里说一下两个 rpc 方法。用来给 worker 通知 map Job 是否都结束了,并且 reduce Job 是否也结束了。如果两者都结束了,worker 可以退出。
1 2 3 4 5 6 7 8 9 func (c *Coordinator) IsMapDone(args *NullArgs, reply *DoneReply) error { reply.IsDone = c.IsMapJobDone return nil } func (c *Coordinator) IsReduceDone(args *NullArgs, reply *DoneReply) error { reply.IsDone = c.IsAllJobDone return nil }
可以看到主要是通过 coordinator 的两个字段来返回和判断。
RPC Setting 对于一些 rpc 的参数定义,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type NullArgs struct {}type NullReply struct {}type DisArgs struct { JobType int } type DisReply struct { JobSeq int NReduce int Filename string Content string } type DoneReply struct { IsDone bool } type SeqArgs struct { JobSeq int }
Job 处理 Job 的分发 去掉了一些打印信息(用于 debug 的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 func (c *Coordinator) JobDistribute(args *DisArgs, reply *DisReply) error { reply.JobSeq = -1 if len (c.DistributeSeqChan) == 0 || c.IsAllJobDone { return nil } if args.JobType == MapJob && c.IsMapJobDone { return nil } select { case seq := <-c.DistributeSeqChan: if !c.IsMapJobDone { jobInfo, ok := <-c.DistributeChan if !ok { c.DistributeSeqChan <- seq return nil } reply.Filename = jobInfo.Filename reply.Content = jobInfo.Content c.Mu.Lock() c.JobInfo[seq] = jobInfo c.MapWorkerNum++ c.Mu.Unlock() } else if c.IsReduceReady { c.Mu.Lock() c.ReduceWorkerNum++ c.JobInfo[seq] = JobInfo{} c.Mu.Unlock() } else { c.DistributeSeqChan <- seq return nil } reply.JobSeq = seq reply.NReduce = c.NReduce go c.handleAlive(seq, c.Exp) default : } return nil }
主要分为两个部分:
第一部分是分发前的校验:如果没有可用的 seq,或者说此时任务已经全部处理完了,那么直接返回即可。如果是获取 Map Job 的请求,但是此时 Map Job 已经全部完成了,也全部返回。
第二部分则是 seq 的处理,如果从 channel 中拿到了 seq,那么就设置相关参数,并返回。
其实文件的内容可以不用 rpc 传,直接传一个文件名就好了。但这里是为了采取如果遇到大文件,就分开读取发送这样的策略,所以没这么做(虽然测试用例没有大文件,实际上也没有将文件分成一份一份的。
Map Job 而 worker 中,执行 map 任务的代码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 func DealMap (mapf func (string , string ) []KeyValue) (isTakeJob bool ) { isTakeJob = true var disReply DisReply ok := call(JobDistributionRpcName, &DisArgs{MapJob}, &disReply) if !ok || disReply.JobSeq == -1 || disReply.Filename == "" { return false } nReduce := disReply.NReduce seq, filename, content := disReply.JobSeq, disReply.Filename, disReply.Content var ctx, cancel = context.WithCancel(context.Background()) go KeepAlive(ctx, seq) var recordKF = make (map [string ][]KeyValue) kvs := mapf(filename, content) for _, KV := range kvs { subzone := ihash(KV.Key) % nReduce fn := fmt.Sprintf("mr-%d-%d.txt" , seq, subzone) if _, isExist := recordKF[fn]; isExist { recordKF[fn] = append (recordKF[fn], KV) } else { recordKF[fn] = []KeyValue{KV} } } for iFilename, in := range recordKF { res, err := json.Marshal(&in) if err != nil { log.Printf("open: %s, err: %s" , iFilename, err.Error()) call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) cancel() return } file, err := os.OpenFile(iFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm) if err != nil { log.Printf("open: %s, err: %s" , iFilename, err.Error()) call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) cancel() return } file.Write(res) file.Close() } call(MapJobFinishRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) cancel() return }
在注释中,我将代码分成了三部分,其中:
第一部分,从 coordinator 中获取 seq 号和分配的 Job 内容。如果没有拿到,那么此函数返回。
第二部分,是 map 的主要处理逻辑:
参考 mrsequential.go,先将 filename 和 content 的内容传入 mapf,得到一个 KeyValue 数组。
之后,遍历这个数组,对于每一组 key,根据 ihash 函数的结果 mod nReduce 得到它应该的分区号。指导中写到,一个中间文件的合理命名是 mr-X-Y
,X 是 map 的序号(我这里是 seq),而 Y 是分区号。需要提的一点是,我的中间文件命名是 mr-X-Y.txt
,根据这个后缀进行清理(没有使用 tempFile)。
对于这些计算出来的分区号,组合成它应该写入的中间文件文件名,先使用一个 map 记录,key 为中间文件的名字,value 则为 KeyValue 数组。
遍历上述说到的 map,将该文件的 KeyValue 数组序列化为 json 字符串,然后写入。
至此,我们算是完成了 map Job。
第三部分,告诉 coordinator 我们完成了这个 map Job,并退出心跳处理程序。
对于 Map 任务何时完成,由上面知道,由 worker 主动通过 rpc 调用来报告。coordinator 对于 map 任务完成的处理如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (c *Coordinator) MapJobFinish(args *SeqArgs, reply *NullReply) error { c.Mu.Lock() delete (c.JobInfo, args.JobSeq) c.MapWorkerNum-- c.Mu.Unlock() if !c.IsMapJobDone && c.MapWorkerNum == 0 && len (c.DistributeChan) == 0 && len (c.InputFiles) == 0 { c.reduceJobStart() c.IsMapJobDone = true } return nil }
这里有一点是,已完成的 seq 并不会放回到原来的分发队列中。也就是 seq 不会被复用,至于为什么在后面对于 crash 的处理会提到。
我们可以看到,当所有的 map Job 完成后,会执行一个 reduceJobStart 方法。这个主要是 reduce Job 的一些初始化。之后,将 IsMapJobDone 设为 true。
Reduce Job 在 map Job 完成后,会进行 reduce Job 的初始化:
1 2 3 4 5 6 7 8 9 10 func (c *Coordinator) reduceJobStart() { log.Println(strings.Repeat("-" , c.NReduce), "start reduce job" , strings.Repeat("-" , c.NReduce)) c.DistributeSeqChan = make (chan int , c.NReduce) for i := 0 ; i < c.NReduce; i++ { c.DistributeSeqChan <- i } c.JobInfo = make (map [int ]JobInfo) c.JobAlive = make ([]chan bool , c.NReduce) c.IsReduceReady = true }
在初始化完成后,将 IsReduceReady 设为 true。seq 分发程序就可以对 worker 进行 seq 的分发。
worker 执行 reduce 任务的流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 func DealReduce (reducef func (string , []string ) string ) bool { var disReply DisReply ok := call(JobDistributionRpcName, &DisArgs{ReduceJob}, &disReply) if !ok || disReply.JobSeq == -1 { return false } seq := disReply.JobSeq ctx, cancel := context.WithCancel(context.Background()) go KeepAlive(ctx, seq) pattern := fmt.Sprintf("mr-*-%d.txt" , seq) files, err := filepath.Glob(pattern) if err != nil { log.Printf("glob file, err: %s" , err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } var intermediate []KeyValue for _, middleFilename := range files { middleFile, err := os.OpenFile(middleFilename, os.O_RDONLY, os.ModePerm) if err != nil { log.Printf("open: %s, err: %s" , middleFilename, err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } content, err := io.ReadAll(middleFile) if err != nil { log.Printf("read: %s, err: %s" , middleFilename, err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } var i []KeyValue err = json.Unmarshal(content, &i) if err != nil { log.Printf("read: %s, err: %s" , middleFilename, err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } intermediate = append (intermediate, i...) } sort.Sort(ByKey(intermediate)) output := fmt.Sprintf("%s-%d" , outputFilename, seq) os.Remove(output) file, err := os.OpenFile(output, os.O_CREATE|os.O_RDWR, os.ModePerm) if err != nil { log.Printf("open: %s, err: %s" , output, err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } var outputRes string i := 0 for i < len (intermediate) { j := i + 1 for j < len (intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string {} for k := i; k < j; k++ { values = append (values, intermediate[k].Value) } outputStr := reducef(intermediate[i].Key, values) outputRes += fmt.Sprintf("%v %v\n" , intermediate[i].Key, outputStr) i = j } _, err = file.Write([]byte (outputRes)) if err != nil { log.Printf("write: %s, err: %s" , output, err.Error()) cancel() call(HandleErrorRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) return true } file.Close() call(ReduceJobFinishRpcName, &SeqArgs{JobSeq: seq}, &NullReply{}) cancel() return true }
reduce 代码我也分成了三部分,第一部分和第三部分和 map 一样。所以这里将 reduce 的处理逻辑:
我们需要获取该分区下的所有文件,那么就需要匹配 mr-*-seq.txt 的文件就 ok 了,seq 这是这个 Job 的序号,也是它应该处理的分区
遍历所有匹配到的文件名,读取这些文件,将其内容反序列化到一个 intermediate 数组中
这里和 mrsequentis.go 中的后半部分一样,排序,将这些和文件名一起传入到 reducef 函数中,将输出追加到 outputRes 中
将 outputRes 写入输出文件
至此,完成 reduce 的处理逻辑
而 coordinator 处理 reduce Job 完成的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (c *Coordinator) ReduceJobFinish(args *SeqArgs, reply *NullReply) error { c.Mu.Lock() delete (c.JobInfo, args.JobSeq) c.ReduceWorkerNum-- c.Mu.Unlock() pattern := fmt.Sprintf("mr-*-%d.txt" , args.JobSeq) files, err := filepath.Glob(pattern) if err != nil { log.Println("delete temp file failed, err:" , err) } for _, filename := range files { err = os.Remove(filename) if err != nil { log.Printf("clean: %s, err: %s" , filename, err.Error()) } } if len (c.DistributeSeqChan) == 0 && c.ReduceWorkerNum == 0 && c.IsMapJobDone { time.Sleep(c.Exp / 2 ) if len (c.DistributeSeqChan) == 0 && c.ReduceWorkerNum == 0 && c.IsMapJobDone { c.IsAllJobDone = true } } return nil }
和 map 处理差不多。唯一有区别的是,当我们判断到所有 Job 都完成时,等待一个过期时间(这里为了快点结束,除了 2),然后,再进行一次判断。如果成立,则将 IsAllJobDone 设为 true。此时,所有的 worker 都应该退出。
当所有的 Job 完成后,主程序调用 Done 方法会得到一个 true 的返回值。至此,coordinator 退出。
1 2 3 func (c *Coordinator) Done() bool { return c.IsAllJobDone }
容错 再提一下,并没有做 coordinator 的崩溃恢复(论文里给的解决方案是定时备份 master 的数据结构,然后 make 的时候进行恢复)。
心跳机制 关于判断当前 worker 是否存活我采用的是心跳机制。
对于 coordinator ,在每一次分发一个 Job 时,会为这个 Job 特有的 seq 启动一个 handleAlive 进程。也就是:
1 go c.handleAlive(seq, c.Exp)
其中,seq 是 Job 的 seq, Exp 则是初始化时定义的一个过期时间(lab 提到是 10s,但是为了提高测试的速度,我都缩短了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (c *Coordinator) handleAlive(workerSeq int , exp time.Duration) { c.JobAlive[workerSeq] = make (chan bool , 1 ) c.JobAlive[workerSeq] <- true t := time.NewTicker(exp) for { select { case <-c.JobAlive[workerSeq]: if _, ok := c.JobInfo[workerSeq]; !ok { return } t.Reset(exp) case <-t.C: if _, ok := c.JobInfo[workerSeq]; !ok { return } log.Printf("job seq: %d, time limit done" , workerSeq) c.giveJobBack(workerSeq) return default : } } }
JobAlive 是一个类型为 chan bool 的数组,每一个 seq 对应数组中的一个 channel。因为我们的 seq 是固定的,也就是 seq 就是 0-nReduce。所以,用数组可以加快访问速度,且没有 map 那样的并发问题(同时读写)。
这个函数的大体逻辑就是,当对应的 seq 管道收到一个数据时,重置定时器的时间。那么当这个管道在超时时间里没有接收到数据时,就会执行释放资源的操作。关于将任务退回的逻辑,下面会讲。那么,这就构成了一个很经典的心跳监测机制。
有三行出现在每个 case 里:
1 2 3 4 if _, ok := c.JobInfo[workerSeq]; !ok { return }
正如注释所言,对于一个 Job,如果完成了在 JobInfo 这个 map 里就会删掉,直接退出这个函数即可。其实可以使用 context,使用 cancel 方法将这个进程退出,但为了实现方便,就这么做了。
并且,它需要给 worker 提供一个 rpc 方法是 Ping:
1 2 3 4 5 func (c *Coordinator) Ping(args *SeqArgs, reply *NullReply) error { c.JobAlive[args.JobSeq] <- true return nil }
这个函数在 worker 调用这个方法时,往对应的管道发送一个数据。异不异步都可以,这里是因为当时搞得没缓存的 channel 导致阻塞了。
对于每一个 worker,则时不时要调用一个 Ping 方法来告诉 coordinator 你还活着。
当 worker 分配到一个 Job 时,则起一个协程运行 KeepAlive 函数,调用 Ping ,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func KeepAlive (ctx context.Context, workSeq int ) { var t = time.NewTicker(time.Second / 2 ) for { select { case <-ctx.Done(): return case <-t.C: ok := call(PingRpcName, &SeqArgs{workSeq}, &NullReply{}) if !ok { return } default : continue } } }
当处理结束或遇到意外的错误时,使用 context 退出 KeepAlive 程序。
giveJobBack 这个函数是用于将挂掉的,或者主动挂掉的 worker 正在处理的任务给放回分发队列中,使其任务可以再次被执行。
具体逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (c *Coordinator) giveJobBack(seq int ) { if !c.IsMapJobDone && !c.IsReduceReady && !c.IsAllJobDone { pattern := fmt.Sprintf("mr-%d-*.txt" , seq) files, err := filepath.Glob(pattern) if err != nil { log.Printf("glob: %v, err: %v" , pattern, err) } for _, filename := range files { err = os.Remove(filename) if err != nil { log.Printf("remove temp file: %v, err: %v" , pattern, err) } } c.Mu.Lock() wi := c.JobInfo[seq] delete (c.JobInfo, seq) c.MapWorkerNum-- c.Mu.Unlock() c.DistributeChan <- wi c.DistributeSeqChan <- seq return } c.Mu.Lock() c.ReduceWorkerNum-- delete (c.JobInfo, seq) c.Mu.Unlock() c.DistributeSeqChan <- seq return }
第一个 if 语句是用来区分是 map Job 重新执行还是 reduce Job 重新执行。
对于 map Job 重新执行,需要将挂掉的 worker 执行时生成的临时文件全部删掉,然后删除 JobInfo 的内容,并将 MapWorkerNum 减 1。这里用了锁去防止并发问题。然后是将该 seq 送回分发队列,并将处理的内容送回到内容分发队列。
对于 reduce Job,与 map Job 的区别是,不需要将临时文件删除。其余的都一致。
Handle Error 在进入到 Job 处理部分之前,先说一下对一些 error 的处理(虽然在测试的时候这些 error 都没出现过。
在 worker 部分,我们在处理时可能会遇到一种情况是:if err != nil
这个表达式为 true,那么此时我们需要做的就不仅仅是打印错误(仅代表个人观点,保命),退出当前的处理。应该主动去通知 coordinator 将这次处理给取消掉。对于这个方法,应该是由 coordinator 提供的 rpc 方法,如下:
1 2 3 4 5 func (c *Coordinator) HandleError(args *SeqArgs, reply *NullReply) error { c.giveJobBack(args.JobSeq) return nil }
总结 一些杂谈
是 worker 去请求 coordinator 分配资源(通过 rpc
Map 和 Reduce 不是异步操作。换句话说,worker 执行完 mapf 后,再去请求执行 reducef 或者说继续执行 mapf 操作。
rpc 并没有改成 tcp 通信,这是因为在测试脚本运行到 crash 阶段时,采用 tcp 通信大多数情况下只会起一个 worker。
debug 时遇到的 bug 以及一些思路解释
一开始按照自己的想法来,没有参考 mrsequential.go 的代码,导致输出文件与结果不一致(不认真看指导导致的
之前说到 map Job 的 seq 是不复用的,这是因为在将挂掉的 worker 的 job 重新执行时,我会将它产生的所有临时文件删除。那么,可以预见的一种情况是:一个 worker 执行完了一个 map Job,将这个 seq 退回。此时,有一个 worker 再一次拿到了这个 seq,当这个 worker 挂了后,coordinator 会将上一个 worker 执行完的临时文件也删掉。就算第二个 worker 没挂掉,那么在处理中间文件的时候,如果遇到了已存在的文件,就需要将其读取出来,和当前的结果一起序列化成一个 json 字符串,写入这个中间文件。
为什么加上 IsReduceReady 这个字段呢。这是因为当时 worker 的请求是不带 wType 的,那么有可能在 reduce 初始化没完成的时候,就将 seq 拿走,但是却是用于 map,虽然不会执行 map Job,但是这个 seq 也是会消失了。后面还是加上了 wType,算是做了双重验证。
在一些特殊情况(跟第三条差不多),会遇到拿了 seq,此时是 reduce Job 阶段,但是它请求的是 map Job,发现了但是 seq 没有放回去,导致 reduce Job 的分区有一些没处理。呃这个问题也是双重验证解决。
map 的同时读写,是前面我很头疼的一个问题,这是因为我之前在记录 Job 信息的时候使用了 map 记录存活状态,当收到 Ping 的时候,我会修改对应 seq 的状态。后面,我加上了互斥锁,但是我发现性能会降低不少。所以我采用了一个管道数组的方案。因为使用这种心跳机制,状态的修改会很频繁。后面,其实可以不用状态,直接使用一个管道即可。
而对于一个 bug,是测试时,一个 worker 完成了 Job,但是 coordinator 为它运行的 handleAlive 进程还没退出。后面判断超时的时候,又将这个 seq 送了回去。这就导致了重复处理,并且分区文件被删除了,得到的输出文件就为空了。
还有一些忘了,大多数 bug 都是并发导致的。
最后,所有代码均为独立完成。