Skip to content
Self-Knowing

MapReduce

约 3093 个字 36 行代码 预计阅读时间 11 分钟

论文

MapReduce is a programming model and an associated implementation for processing and generating large data sets.

MapReduce编程模型 中,数据处理分为两个阶段:map阶段reduce阶段

  • Map: 读入 initial kv pairs,生成一系列 intermediate kv pairs.
  • Reduce:对每个 intermediate kv pairs.,执行reduce函数,将所有关联的值进行汇总。

Implementation

Map: 通过将输入数据自动分成 M个分片(splits) 来分布在多台机器上并行 处理。

Reduce:通过用分区函数(e.g., hash(Key) mod R) 将中间键空间划分为 R 份。

image-20241019210314347

  1. User Program 上的 The MapReduce library 先将 input 分为 M 个分区(大小由用户配置决定),然后,在集群机器上启动多个 program copies。

  2. 有一个程序副本很特殊:Master。Master 负责分配 map/reduce task。

  3. 分配到 task 的 map worker 会读取对应的分片内容,解析出 kv pairs,并将 kv pairs 传递给 Map function。Map function 读取这些键值对,产生缓存在内存中的 intermediate kv pairs。

  4. 缓存的中间键值对 会被周期性的写入本地磁盘(local disk)中,并分局 partitioning function 划分为 R 个区域。中间键在 local disk 的地址会告诉给 The Master。

  5. Master 将中间键值对 的地址告诉 reduce worker,worker 使用 RPC(remote procedure call)读取中间键值对。读取完所有的中间键值对 之后,会进行 sort ,排序所有的 中间键(intermediate key)

  6. reduce worker 迭代排序后的 ,将中间键 和他对应的所有 中间值 传递给 Reduce function。Reduce function 读取这些键值对,产生最后的输出。

  7. 所有的 map/reduce task 完成之后,master 会唤醒 the User Program ,至此 MapReduce 执行完毕。

image-20241023192645390

Master 维护了多个数据结构。对于每个Map任务Reduce任务,它会存储其状态(空闲进行中已完成)以及执行该任务的工作节点的身份(对于非空闲任务)。

Master 是中间文件区域位置信息从Map任务传递到Reduce任务的传输通道。因此,对于每个已完成的Map任务,主节点会存储R个中间文件区域的位置和大小信息。当Map任务完成时,这些位置和大小信息会被更新,并逐步推送给那些有进行中Reduce任务的工作节点。

Lab 1

Feel free to borrow code from mrsequential.go. You should also have a look at mrapps/wc.go to see what MapReduce application code looks like.

可以从 mrsequential.go 借鉴代码。

mrapps/wc.go 中有 MapReduce 的应用代码。

The workers will talk to the coordinator via RPC. Each worker process will ask the coordinator for a task, read the task's input from one or more files, execute the task, and write the task's output to one or more files. The coordinator should notice if a worker hasn't completed its task in a reasonable amount of time (for this lab, use ten seconds), and give the same task to a different worker.

Worker 通过 RPC 与 Coordinator 通信。

每个 Worker 进程从 Coordinator 获取一个 task,从一个或多个文件中读取 input,执行任务,写 output 到一个或多个文件。

如果 task 在 Worker 上超时的话,Coordinator 会重发该 task 给其他 Worker。

We have given you a little code to start you off. The "main" routines for the coordinator and worker are in main/mrcoordinator.go and main/mrworker.go; don't change these files. You should put your implementation in mr/coordinator.go, mr/worker.go, and mr/rpc.go.

mr/coordinator.go, mr/worker.go, and mr/rpc.go 实现对应的业务逻辑。

ou'll also see some errors from the Go RPC package that look like

2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three

Ignore these messages; registering the coordinator as an RPC server checks if all its methods are suitable for RPCs (have 3 inputs); we know that Done is not called via RPC.

启动

Here's how to run your code on the word-count MapReduce application.

First, make sure the word-count plugin is freshly built:

go build -race -buildmode=plugin ../mrapps/wc.go

In the main directory, run the coordinator.

rm mr-out*
go run -race mrcoordinator.go pg-*.txt

The pg-*.txt arguments to mrcoordinator.go are the input files; each file corresponds to one "split", and is the input to one Map task. The -race flags runs go with its race detector.

pg-*.txt 是 input 文件,每个文件对应一个 “split”(一个 Map task 的输入)。

In one or more other windows, run some workers:

go run -race mrworker.go wc.so

When the workers and coordinator have finished, look at the output in mr-out-*. When you've completed the lab, the sorted union of the output files should match the sequential output, like this:

$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8
...

当 Workers 和 Coordinator 的任务都完成之后,查看输出 mr-out-*

当做完 lab 之后,output 的顺序应该是排好序的。

We supply you with a test script in main/test-mr.sh. The tests check that the wc and indexer MapReduce applications produce the correct output when given the pg-xxx.txt files as input. The tests also check that your implementation runs the Map and Reduce tasks in parallel, and that your implementation recovers from workers that crash while running tasks.

测试 lab 的完成情况:main/test-mr.sh

If you run the test script now, it will hang because the coordinator never finishes:

$ cd ~/6.824/src/main
$ bash test-mr.sh
*** Starting wc test.

如果你现在执行测试脚本,只会出现上述代码,因为 Coordinator 一只被卡在这

image-20241023202101005

The test script expects to see output in files named mr-out-X, one for each reduce task. The empty implementations of mr/coordinator.go and mr/worker.go don't produce those files (or do much of anything else), so the test fails.

输出的命名格式为:mr-out-X , 每个 reduce task 对应一个文件。

When you've finished, the test script output should look like this:

$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$

Rules

The map phase should divide the intermediate keys into buckets for nReduce reduce tasks, where nReduce is the number of reduce tasks -- argument that main/mrcoordinator.go passes to MakeCoordinator(). So, each mapper needs to create nReduce intermediate files for consumption by the reduce tasks.

map 阶段:每个 mapper 将中间键划分到 nReduce 个桶(文件)中。

  • nReduce 是 reduce task 的数量,也是 main/mrcoordinator.go 传给 MakeCoordinator()的参数。
  • 每个 mapper 需要产生 nReduce 个中间文件。

The worker implementation should put the output of the X'th reduce task in the file mr-out-X.

第 X 个 reduce task 输出文件的命名格式:mr-out-X

A mr-out-X file should contain one line per Reduce function output. The line should be generated with the Go "%v %v" format, called with the key and value. Have a look in main/mrsequential.go for the line commented "this is the correct format". The test script will fail if your implementation deviates too much from this format.

mr-out-X 文件中应包含 nReduce 行数据,每个reducer只读取哈希值相同的数据(即在 map 阶段,每个 mapper 产生的第 X 个中间文件)。

image-20241023212127831

格式要正确:"%v %v" format, called with the key and value.

You can modify mr/worker.go, mr/coordinator.go, and mr/rpc.go. You can temporarily modify other files for testing, but make sure your code works with the original versions; we'll test with the original versions.

The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.

中间文件放在当前目录中。

main/mrcoordinator.go expects mr/coordinator.go to implement a Done() method that returns true when the MapReduce job is completely finished; at that point, mrcoordinator.go will exit.

mr/coordinator.go 实现 Done() 方法,表明 Map Reduce 完成。

When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the coordinator, it can assume that the coordinator has exited because the job is done, and so the worker can terminate too. Depending on your design, you might also find it helpful to have a "please exit" pseudo-task that the coordinator can give to workers.

当任务完全完成时,worker 进程应退出。实现这一功能的一个简单方法是利用 call() 的返回值:如果 worker 无法联系到 coordinator,可以假设 coordinator 已经退出,因为任务已经完成,因此 worker 也可以终止。

Hint

One way to get started is to modify mr/worker.go's Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

从修改 mr/worker.go 中的 Worker() 开始:

  • 给 Coordinator 发送 RPC 请求一个任务

然后修改 mr/coordinator.go

  • 使其响应 worker 的任务请求,并返回一个尚未开始的 Map 任务的文件名。

然后修改 mr/worker.go

  • 接收到文件后,读取文件,并调用 Map 函数。(类似于 mrsequential.go 中的实现)

The application Map and Reduce functions are loaded at run-time using the Go plugin package, from files whose names end in .so.

应用程序的 Map 和 Reduce 函数是在运行时通过 Go 的插件包加载的,这些文件的名称以 .so 结尾。

If you change anything in the mr/ directory, you will probably have to re-build any MapReduce plugins you use, with something like go build -race -buildmode=plugin ../mrapps/wc.go

修改之后,重新跑一下插件的加载。

This lab relies on the workers sharing a file system. That's straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.

A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

中间文件的命名格式:mr-X-Y

  • X 是 Map task number
  • Y 是 Reduce task number

The worker's map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go's encoding/json package. To write key/value pairs in JSON format to an open file:

enc := json.NewEncoder(file)
  for _, kv := ... {
    err := enc.Encode(&kv)

and to read such a file back:

dec := json.NewDecoder(file)
for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
      break
    }
    kva = append(kva, kv)
}

map task:将 intermediated kv pairs 以 JSON 格式写入中间文件。

The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.

worker 中的 map 部分可以使用 ihash(key) 函数(在 worker.go 中)为给定的键选择对应的 reduce 任务。

You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

可以从 mrsequential.go 中借鉴代码

  • Map task 的读入文件
  • 排序中间键值对
  • 存储 Reduce 的输出文件

The master, as an RPC server, will be concurrent; don't forget to lock shared data.

Master 作为一个 RPC 服务器,是并发的,要对共享数据进行枷锁。

Use Go's race detector, with go build -race and go run -race. test-mr.sh has a comment that shows you how to enable the race detector for the tests.

使用 Go 的竞争检测器,可以通过 go build -race 和 go run -race 来启用。在 test-mr.sh 中有一个注释,展示了如何为测试启用竞争检测器。

Workers will sometimes need to wait, e.g. reduces can't start until the last map has finished. One possibility is for workers to periodically ask the master for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the master to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won't prevent the master from processing other RPCs.

Worker 向 Coordinator 不断轮询,在每次请求之间加一个 Sleep()。

Go 为每个 RPC 处理程序分配独立的线程运行,因此即使一个处理程序处于等待状态,也不会阻止 Coordinator 处理其他 RPC 请求。

The master can't reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the master wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the master wait for ten seconds; after that the master should assume the worker has died (of course, it might not have).

如果 Coordinator 等待十秒,Worker 仍没有 response,则认为该 Worker 已经失效。

To test crash recovery, you can use the mrapps/crash.go application plugin. It randomly exits in the Map and Reduce functions.

要测试崩溃恢复,可以使用 mrapps/crash.go 应用程序插件。它会在 Map 和 Reduce 函数中随机退出。

To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

为了确保在发生崩溃时没有人会观察到部分写入的文件,MapReduce 论文提到了一种技巧:使用临时文件,并在文件完全写入后进行原子重命名。你可以使用 ioutil.TempFile 来创建临时文件,并使用 os.Rename 进行原子重命名。

test-mr.sh runs all the processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there.

test-mr.sh 在子目录 mr-tmp 中运行所有进程,因此如果出现问题并且你想查看中间文件或输出文件,可以在该目录中查找。

过程记录

image-20241024184720249

终于一步步看懂论文,慢慢分析代码逻辑,逐渐尝试添加功能。

image-20241024195308936

map 部分完成!

image-20241024212756452

将读入文件的逻辑放在 Coordinator 层,每个 task 分配一个文件。

  • 在中件文件中,key 是无序的,要在 reduce task 中进行排序
  • 通过 MapTaskId 来控制当前读到了第几个文件,全部 mapper 完之后,report 一下当前的状态,进入 reduce 阶段。

image-20241024214612913

reduce 阶段完成!

image-20241024220604794

现在的情况是 状态一致在 reduce task 中循环跑。

改好之后,出现同时访问共享资源的情况!

image-20241026115854254

  • 写操作:在 GetReportTask() 函数中,某个 goroutine 正在写入地址 0x00c00013f100。
  • 该函数位于 /Volumes/kioxia/Repo/Distribution/distributed-system/src/mr/coordinator.go:72。

要解决数据竞态问题,可以使用互斥锁(sync.Mutex)或读写锁(sync.RWMutex)来保护对共享数据的并发访问。

为了实现超时任务重新分配的问题,要重新添加一个数据结构,同时建立一个队列:存储当前的 task。

不是哥们?真的会有人把输入和输出写反吗?

  • Now you can see
  • 4f0dfeddc7e01f95623598419f6fabef
  • f7c918856e409e8b5d7b596511875804

心跳检测启动!go c.heartbeat()

test 成功:

image-20241028091831794


Created: June 27, 2025
Last update: June 27, 2025

Discussion