Distributed System and MapReduce 02 | 分布式系统和MapReduce 02

Albert Wang / 2022-06-15 / 700 Words/has been Read   Times


这篇博客主要是关于 MIT 的公开课 6.824 分布式系统 的 lab 1相关内容,lab 1 要求我们实现一个分布式的mapreduce系统,并且已经为我们提供了一些基础的代码,lab 的地址请点击这里 。下面让我们从文件的架构开始一点点去了解我们所要做的事情。下面是一个精简了的代码结构图,它可以帮助我们理解 lab 1的内容。

单机mapreduce任务 #

首先我们需要明确的是main目录下的文件是我们的程序入口处, 如果我们执行mrsequential.go 文件将会启动一个单机的mapreduce任务,执行的命令如下

go run -race mrsequential.go wc.so pg*.txt

可以看到在执行的时候还传了两个参数,这两个参数的作用是什么呢?我们从代码中来观察一下,

if len(os.Args) < 3 {
    fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
    os.Exit(1)
}

mapf, reducef := loadPlugin(os.Args[1])

可以看到 wc.so 是用来加载 map 和 reduce 函数的, 而 wc.so 文件则是由下面这个命令生成的,通过-buildmode=plugin可以以插件的形式将wc.go 文件里的函数封装起来,然后通过loadPlugin 函数重新加载出来。

 go build -race -buildmode=plugin ../mrapps/wc.go

loadPlugin 函数是作者自己编写的,具体是封装了plugin.Open 和 p.Lookup两个函数的功能,最终的结果是获取到map 和 reduce 两个函数。

func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
	p, err := plugin.Open(filename)
	if err != nil {
		log.Fatalf("cannot load plugin %v", filename)
	}
	xmapf, err := p.Lookup("Map")
	if err != nil {
		log.Fatalf("cannot find Map in %v", filename)
	}
	mapf := xmapf.(func(string, string) []mr.KeyValue)
	xreducef, err := p.Lookup("Reduce")
	if err != nil {
		log.Fatalf("cannot find Reduce in %v", filename)
	}
	reducef := xreducef.(func(string, []string) string)

	return mapf, reducef
}

第二个参数的作用则是需要我们统计的文件的名称,我们可以在main/ 目录下看到这些文件。

我们说这是一个单机的mapreduce任务,它的单机就表现在下面的代码中。

intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("cannot open %v", filename)
    }
    content, err := ioutil.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", filename)
    }
    file.Close()
    kva := mapf(filename, string(content))
    intermediate = append(intermediate, kva...)
}

这里我们通过一个循环来遍历所有的文件,然后对每一个文件执行map函数,具体的操作就是将文件的每一个单词进行拆分,然后变成<key, value>的形式,最后将这个map返回。可以看到这里的文件名乘这个参数是没有用的,而且每一个文件其实都是由一台机器处理的。我们要做的事就是把这些文件分给不同的机器去处理。

在单机过程中,对map 过程得到的中间数据是通过sort来合并的

sort.Sort(ByKey(intermediate))

下面就是reduce的过程了,这个过程中只需要先将相同key 的元素都放入一个切片中,然后调用reduce函数统计切片的长度,并将最终的结果写入文件中就可以了。

oname := "mr-out-0"
ofile, _ := os.Create(oname)
i := 0
for i < len(intermediate) {
    j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    }
    output := reducef(intermediate[i].Key, values)

    // this is the correct format for each line of Reduce output.
    fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

    i = j
}

ofile.Close()

通过上面的任务我们可以发现 map函数和 reduce 函数其实是属于被一直重复调用的代码块,我们可以选择让不同的机器去调用它而不是让一台机器去重复执行这个过程。下面的改进主要就来自于此。

分布式mapreduce任务 #

在分布式任务中我们首先需要确定一个 Coordinator 和多个worker ,Coordinator 就相当于领导,它不干具体的事物,只负责统计有哪些任务需要完成,还有哪些任务尚未完成,哪些任务正在做,要给 worker 分配哪些任务等等。而 worker 节点要做的事就是在自己没有工作要做的时候向 Coordinator 申请任务,然后执行任务。

它们之间以 rpc 的方式进行通信,rpc 会实现商定通信的一些规范和准则,可以使整个过程更加高效。我们要做的所有任务都在mr/ 这个目录下。我们先从main/ 目录下的mrcoordinator.go 入手来看具体需要我们做什么。

go run mrcoordinator.go pg*.txt

首先执行上面的命令可以启动一个主节点的服务,参数 pg*.txt 表示待处理的文件名称。它里面执行的代码也很简单,就是调用了 mr/ 包下的 MakeCoordinator 函数,第一个参数是文件名,第二个参数表示reduce任务使用的数量。

if len(os.Args) < 2 {
    fmt.Fprintf(os.Stderr, "Usage: mrcoordinator inputfiles...\n")
    os.Exit(1)
}

m := mr.MakeCoordinator(os.Args[1:], 10)
for m.Done() == false {
    time.Sleep(time.Second)
}

time.Sleep(time.Second)

这个函数目前提供给我们的只有下面这几行代码,其中 Coordinator 是一个结构体,它里面的变量需要我们自己去定义。

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.


	c.server()
	return &c
}

然后我们可以发现程序调用了 Coordinator 结构体的 server() 方法,它所做的事就是开启了 rpc 的监听,等待 worker 节点来申请任务。

func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

到目前为止我们似乎只能确定我们需要定义一个结构体 Coordinator, 然后需要定义 rpc 的一些规范,其他的尚不明确。不过不用着急,我们可以再从 mrworker.go 出发看能不能得到更加有用的信息。执行下面的命令就可以执行这个文件。

go run mrworker.go wc.so

这段代码也很简洁,前几行代码我们已经很熟悉了,它的作用就是加载map 和 reduce 函数,然后再执行 mr 包下的 Worker 函数。

if len(os.Args) != 2 {
    fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
    os.Exit(1)
}

mapf, reducef := loadPlugin(os.Args[1])

mr.Worker(mapf, reducef)

但是 Worker 函数里的内容完全需要我们自己去实现。

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.

	// uncomment to send the Example RPC to the coordinator.
	// CallExample()

}

到目前为止我们已经明确了要做的事。从头梳理一遍要求,我们会发现目前有两件事比较迫切,第一是 rpc 里需要我们约定的内容,第二是 coordinator 里 Coordinator 结构体里的东西。

我们先来解决第一件事, Worker 和 Coordinator 要通信的唯一目的就是任务(Task),所以我们应该先对 Task进行抽象,本文对 Task 抽象为 Task 的 id,这个 Task 需要处理的文件名称(在本实验中一个 MAP 任务只需要处理一个文件,但在真实的场景中可能需要处理多个不同的块),还有该任务的当前状态。当前状态是用 Mapreduce 论文里的方式来设定的,分为 IDE,IN_PROGRESS和COMPLETED三种状态。

type TaskStatus int32

const (
	IDE         TaskStatus = 1
	IN_PROGRESS TaskStatus = 2
	COMPLETED   TaskStatus = 3
)

type Task struct {
	id       int
	fileName []string
	status   TaskStatus
}

Worker 向 Coordinator 请求任务的时候,Coordinator 返回的结果应该包括该任务的类型,考虑到这个任务类型只在 rpc过程中会有应用,所以并没有把它放入 Task 中,类型可以分为下面这四种

type TaskTypes int32 // 任务类型在 rpc 过程中会用到

const (
	MAP      TaskTypes = 1
	REDUCE   TaskTypes = 2
	WAITTING TaskTypes = 3
	FINISHED TaskTypes = 4
)

同样的,因为在 Worker 的 任务中只需要知道要处理的文件名称和任务编号,并不需要知道当前任务的状态,所以也没有将 Task 整体假如 CoordinatorResponse,只是选择了有用的类型。

type CoordinatorResponse struct {
	TaskNumber     int
	ReduceTasks    int // number of reduce tasks
	TaskType       TaskTypes
	FilesToProcess []string // currently single file for Map Task
}

Worker 完成任务之后要将中间文件的信息返回给 Coordinator ,传入的参数可以封装如下。

type WorkerRequest struct {
	TaskNumber                 int
	TaskType                   TaskTypes
	CompletedIntermediateFiles []string
}

至此,我们就完成了 RPC 部分的工作。然后我们可以看一下 Coordinator 结构体的内容

type Coordinator struct {
	// Your definitions here.
	mu          sync.Mutex
	mapTasks    map[int]Task
	reduceTasks map[int]Task

	finishedMapTaskNum int
	finishedReduceNum  int

	mapTasksCount    int // map 任务总数
	reduceTasksCount int // reduce 任务总数
}

这个结构体里保存了 map 任务和 reduce 任务的 map 类型数据结构,同时记录了 map 和 reduce 任务的总数,

finishedMapTaskNum 和 finishedReduceNum 则表示已经完成的两类任务总数。这是一种实现方式,我们也可以只用一个 Task 队列来存储全部的任务。队列里先存储 map 的任务,然后再存储 reduce 的任务,因为队列是先进先出的,所以也可以保证先把所有的 map 任务做完,然后再去执行 reduce 任务。

完成之后执行下面的命令就可以测试代码是否能通过检测

bash test-mr.sh

最终结果如下图所示,如果出现 PASS ALL TESTS 表示通过了全部的测试。

Last modified on 2022-06-15