MapReduce概述
- 源自于Google的MapReduce論文,發(fā)表于2004年12月
- Hadoop MapReduce是Google MapReduce的克隆版
- MapReduce優(yōu)點(diǎn):海量數(shù)據(jù)的離線處理、易開發(fā)、易運(yùn)行。
所謂海量數(shù)據(jù),說明MapReduce可以處理的數(shù)據(jù)量非常大,離線處理說明MapReduce跟實(shí)時響應(yīng)不同,用戶將作業(yè)提交,系統(tǒng)按批次進(jìn)行處理,由于數(shù)據(jù)量大,自然非常耗時。
所謂易開發(fā),如果我們自己要實(shí)現(xiàn)這樣的分布式應(yīng)用程序的話需要考慮很多東西,比如如何對文件進(jìn)行拆分、如何處理節(jié)點(diǎn)故障問題,MapReduce框架在內(nèi)部已經(jīng)封裝好了這些問題,我們只要把重心關(guān)注在應(yīng)用邏輯的處理上就行了。
所謂易運(yùn)行,當(dāng)然這個易運(yùn)行是相對的。
- MapReduce缺點(diǎn):無法滿足實(shí)時流式計算
MapReduce的作業(yè)是非常耗時的,不可能實(shí)時處理數(shù)據(jù);流式計算也是無法滿足的,我們之前看過MapReduce的處理圖,它的各個流程是由依賴關(guān)系的,也就是說如果map沒做完,那么reduce也做不了。
Hadoop官網(wǎng)是這么介紹MapReduce框架的:
Overview
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
Hadoop Mapreduce是一個易于編程并且能在大型集群(上千節(jié)點(diǎn))快速地并行得處理大量數(shù)據(jù)的軟件框架,以可靠,容錯的方式部署在商用機(jī)器上。
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
MapReduce Job通常將輸入的數(shù)據(jù)集切分成獨(dú)立的切片,以完全并行(completely parallel)的方式在map任務(wù)中處理。該框架對maps輸出的數(shù)據(jù)進(jìn)行排序,這些排序后的數(shù)據(jù)將做為reduce任務(wù)的輸入,一般Job的輸入輸出都是存儲在文件系統(tǒng)中。該框架可以調(diào)度任務(wù)、監(jiān)控任務(wù)和重啟失效的任務(wù)。
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.
一般來說計算節(jié)點(diǎn)和存儲節(jié)點(diǎn)都是同樣的設(shè)置,MapReduce框架和HDFS運(yùn)行在同組節(jié)點(diǎn)。這樣的設(shè)定使得MapReduce框架能夠在有數(shù)據(jù)的節(jié)點(diǎn)上有效地調(diào)度任務(wù),這樣可以在集群上實(shí)現(xiàn)高聚合(aggregate)的帶寬(bandwidth)。
The MapReduce framework consists of a single master
ResourceManager, one workerNodeManagerper cluster-node, andMRAppMasterper application.
MapReduce 框架包含一個主ResourceManager,每個集群節(jié)點(diǎn)都有一個從NodeManager并且每個應(yīng)用都有一個MRAppMaster。
Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.
應(yīng)用最少必須指定輸入和輸出的路徑并且通過實(shí)現(xiàn)合適的接口或者抽象類來提供map和reduce功能。前面這部分內(nèi)容和其他Job參數(shù)構(gòu)成了Job的配置。
The Hadoop job client then submits the job (jar/executable etc.) and configuration to the
ResourceManagerwhich then assumes the responsibility of distributing the software/configuration to the workers, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
Hadoop 客戶端提交Job和配置信息給ResourceManger,它將負(fù)責(zé)把配置信息分配給從屬節(jié)點(diǎn),調(diào)度任務(wù)并且監(jiān)控它們,把狀態(tài)信息和診斷信息傳輸給客戶端。
MapReduce編程模型
MapReduce執(zhí)行步驟
在之前說明統(tǒng)計詞頻的例子中我們曾用到過這幅圖:
可以很清楚的看到一個MapReduce作業(yè)會被拆成map階段和reduce階段,分別交給map task和reduce task進(jìn)行處理。整個MapReduce的執(zhí)行步驟從這張圖中已經(jīng)看得非常清晰了。接下來我們來看看官網(wǎng)文檔中關(guān)于這部分的說明。
Inputs and Outputs
The MapReduce framework operates exclusively on
<key, value>pairs, that is, the framework views the input to the job as a set of<key, value>pairs and produces a set of<key, value>pairs as the output of the job, conceivably of different types.
MapReduce 框架只操作鍵值對,MapReduce 將job的不同類型輸入當(dāng)做鍵值對來處理并且生成一組鍵值對作為輸出。
The
keyandvalueclasses have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.
Key和Value類必須通過實(shí)現(xiàn)Writable接口來實(shí)現(xiàn)序列化。此外,Key類必須實(shí)現(xiàn)WritableComparable 來使得排序更簡單。
至于什么是Writable接口和WritabelComparable接口,我們等會再說。
Input and Output types of a MapReduce job:
(input)
<k1, v1> ->map-> <k2, v2> ->combine-> <k2, v2> ->reduce-> <k3, v3>(output)
可以看出,在整個MapReduce階段,整個過程中都是以鍵值對的方式進(jìn)行數(shù)據(jù)傳遞的。
關(guān)于輸入輸出的介紹就看到這里,回到我們剛才提到的,什么是Writable接口?我們通過官網(wǎng)的鏈接點(diǎn)進(jìn)去:
A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.
這句話說,Writable接口是一個基于數(shù)據(jù)輸入輸出的實(shí)現(xiàn)的一個簡單高效的序列化協(xié)議的接口。
這個接口中一共有兩個方法:
void write(DataOutput out) throws IOException;
void readFields(DataInput input) throws IOException;
一個是寫方法,將對象的字段序列化到out中,比如一個用戶的字段可以有賬號、密碼、年齡等等。
另一個是讀方法,把對象中的屬性字段從in中反序列化出來。
官網(wǎng)還給了個例子:
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; // Default constructor to allow (de)serialization MyWritable() { } public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
Writable接口看完了我們再來看看什么是WritableComparable接口。
WritableComparable接口是繼承自Writable和Comparable接口的,Writable我們說過了,Comparable接口是干嘛的呢?其實(shí)這個接口主要用在排序中,比如定義了一個Student類,里面有一些姓名、年齡、成績的屬性,如果我要按照年齡給學(xué)生排序怎么辦?解決辦法就是讓Student實(shí)現(xiàn)Comparable接口,而Comparable接口中有個compareTo方法,在Student中實(shí)現(xiàn)這個方法,將按年齡排序的邏輯寫在這個方法中就可以對學(xué)生按照年齡進(jìn)行排序了。
Writabel接口的描述是這樣的:
WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.
因?yàn)閷?shí)現(xiàn)了Comparable接口,所以WriteComparable是可以相互比較的,一般是通過Comparators比較器進(jìn)行比較。在MapReduce中會用到鍵值對的組合,所有的鍵(key)的都必須要實(shí)現(xiàn)這個接口,以便MapReduce可以根據(jù)鍵對鍵值對(key-value pairs)進(jìn)行排序(后面運(yùn)行過程的圖中我們會看到有個sort的步驟)。
同樣官網(wǎng)給我們提供了一個例子:
public class MyWritableComparable implements WritableComparable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public int compareTo(MyWritableComparable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + counter; result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); return result } }
MapReduce的運(yùn)行過程
下面這張圖展示了MapReduce在機(jī)器上的運(yùn)行過程:
首先將文件從本地或者HDFS中讀取出來,使用InputFormat將文件拆分成若干個splits,這個InputFormat又是個什么玩意兒?我們通過他的一個實(shí)現(xiàn)類TextInputFormat來更好的理解他,這個類的主要作用是處理文本型的文件。
TextInputFormat這個類中有這樣的兩個方法:
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
這個方法的作用從命名就能看出來,他可以把輸入文件拆分成一個InputSplit類型的數(shù)組,至于拆成幾份可以用參數(shù)numSplits控制。
RecorderReader<K,V> getRecorderReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
這個方法可以得到一個RR(記錄讀取器),用于讀取拆分后每個split的記錄內(nèi)容。
InputFormat將文件拆分成split后,再由RR把每個split上的數(shù)據(jù)讀取出來,每讀一行就交給一個map task進(jìn)行處理,對于wordcount例子來說,經(jīng)過map處理后得到的是每個單詞出現(xiàn)的記錄,下一步是將相同單詞的記錄扔到一臺機(jī)器上去進(jìn)行統(tǒng)計,這就是Partitioner要做的事情了。有可能會發(fā)到node2上,也有可能在本機(jī)上(node1)處理。排序后最終交由reduce處理,處理的結(jié)果由outputformat輸出到本地文件系統(tǒng)或者HDFS上。
說到這里,我們會發(fā)現(xiàn),上面提到的split是不是跟之前討論的時候說到的block很相似,這兩者是一樣的東西么?
其實(shí),split是交由MapReduce作業(yè)來處理的數(shù)據(jù)塊,是MapReduce中最小的計算單元。而block的是HDFS中最小的存儲單。默認(rèn)情況下這兩者是一一對應(yīng)的,也就是說split的大小默認(rèn)和hdfs的block塊大小一致,當(dāng)然也可以手動設(shè)置兩者的關(guān)系,但是并不建議這么做。
如果設(shè)置過兩者的對應(yīng)關(guān)系,那么可能就會像下面這張圖一樣:
上面這張圖就是一個block對應(yīng)兩個split,每個split交由一個mapper task進(jìn)行處理。如果我們假設(shè)K1是文件的偏移量,那么V1就是每個偏移量對應(yīng)的文件內(nèi)容,比如K1取每行行首的偏移量,V1就是一行的文件內(nèi)容。經(jīng)過map階段后,K2就是每個單詞,V2就是這個單詞出現(xiàn)的一條條記錄,比如可能是["hello",{1,1,1,1,1}]這種形式。把這樣的數(shù)據(jù)交給reduce處理,得出來的就是hello出現(xiàn)的次數(shù)了。reduce處理完了后就會寫到HDFS上去,因?yàn)橛?個reduce task,因此會寫三個文件。
MapReduce架構(gòu)1.x
MapReduce1.x的框架的這張圖我們在上一篇YARN的文章中已經(jīng)見過了,現(xiàn)在我們來分析一下每個組件的具體功能。
-
JobTracker(JT)
- 是作業(yè)的管理者。
- 將作業(yè)分解成一堆的任務(wù)(task),其中任務(wù)又分為map task和reduce task。
- 將任務(wù)分派給TaskTracker運(yùn)行。
- 作業(yè)的監(jiān)控、容錯處理(task作業(yè)掛了,重啟task的機(jī)制)。
- 在一定的時間間隔內(nèi),JT沒有收到TT的心跳信息,那么說明TT很有可能是掛了,TT上運(yùn)行的任務(wù)會被指派到其他TT上去執(zhí)行。
-
TaskTracker(TT)
- 任務(wù)的執(zhí)行者。
- 在TT上運(yùn)行task。
- 會與JT進(jìn)行交互:執(zhí)行/啟動/作業(yè),發(fā)送心跳信息給JT。
-
map task
- 自己開發(fā)的map任務(wù)交由task進(jìn)行處理。
- 解析每條記錄的數(shù)據(jù),交給自己的map方法處理。
- 將map的輸出結(jié)果寫到本地磁盤。
-
redece task
- 將map task輸出的數(shù)據(jù)進(jìn)行讀取。
- 按照數(shù)據(jù)進(jìn)行分組傳給編寫的reduce方法處理。
- 輸出結(jié)果寫到HDFS。
MapReduce架構(gòu)2.x
下面是MapReduce2.x的架構(gòu)圖,可以跟我們之前說YARN畫的那張圖做個比較,大體結(jié)構(gòu)是一樣的,只是把App Mstr換成了MapReduce App Mstr了,這兒也體現(xiàn)了我們之前說的YARN框架的通用性,只需要換個模塊就可以直接運(yùn)行其他的計算框架。
2.x的架構(gòu)去掉了JobTracker和TaskTracker,用ResourceManager和NodeManager替代。Client的差別不大,Client先提交個請求給RM,RM啟動一個NM,并在NM上啟動MapReduce Application Master,然后App Mstr向RM提交資源申請,在獲取到資源后給相應(yīng)的NM發(fā)送請求并在相應(yīng)的NM上啟動map task和reduce task。