Hadoop
Hadoop 是一個提供分布式存儲和分布式計算的框架,為大量數(shù)據(jù)的存儲和計算提供了一個可靠的平臺支持?,F(xiàn)在 Hadoop 和其它相關(guān)的衍生產(chǎn)品構(gòu)成了大數(shù)據(jù)生態(tài)系統(tǒng)。

HDFS
HDFS 是 Hadoop 提供的一個分布式存儲的文件系統(tǒng),基本思想就是分而存之,讓多臺計算機分別存儲一個大文件的一部分,這樣就解決了大文件無法在單臺計算機上存儲和無法在單臺計算機上快速計算的問題。

與磁盤一樣,HDFS 也有塊的概念,將一個大文件進行拆分,每一部分就稱為一個塊 (block)。在 Hadoop 2.x 中,默認一個塊大小為 128M。當有一個 1M 的數(shù)據(jù)存放到 HDFS 中時并不會占用一個 128M 的數(shù)據(jù)空間,而是占有 1M,這是和磁盤塊不同的地方。
一個塊大小為 128M 主要原因是為了讓尋道時間小于傳輸時間的 1%,假設(shè)傳輸速率為 100m/s,尋道時間為 10ms,則 (10ms / 1000) / (1%) * 100m/s = 100M。
HDFS 默認情況下,會為每個數(shù)據(jù)塊存儲 3 份,并存儲在不同的計算機上,這就是其備份機制。HDFS 的可靠性就體現(xiàn)在這里。
這些數(shù)據(jù)塊分別由多臺不同的計算機存儲,而記錄這些塊在那臺機器上存儲、塊大小是多少、屬于哪個文件的信息稱為元數(shù)據(jù),也可以稱為是描述數(shù)據(jù)的數(shù)據(jù)。
元數(shù)據(jù)由一個單獨的進程來維護,這個進程稱為 NameNode。一般會由一臺單獨的計算機作為 NameNode 節(jié)點,也就是在那臺計算機上啟動一個 NameNode 進程。
管理這些數(shù)據(jù)塊的工作也是由一個單獨的進程來完成,這個進程為 DataNode。數(shù)據(jù)塊則是由多臺計算機協(xié)同存儲的,在每臺計算機上都會啟動一個 DataNode 進程,并時刻與 NameNode 進行通信。
-
NameNode
NameNode 用于維護著元數(shù)據(jù)信息,為了提高查找效率,NameNode 會將元數(shù)據(jù)信息存放在內(nèi)存中。所以,NameNode 節(jié)點必須是大內(nèi)存的,這也是 HDFS 的一個瓶頸。
內(nèi)存是不可靠的,所以元數(shù)據(jù)信息也會持久化到磁盤一份,這個稍后再說。
由于每個數(shù)據(jù)塊都會產(chǎn)生一條元數(shù)據(jù)信息,如果 HDFS 中存放大量小文件,就會產(chǎn)生大量的元數(shù)據(jù)信息,這樣 NameNode 的內(nèi)存就會很快就會撐爆。(最簡單的解決辦法就是將文件進行合并,然后上傳到 HDFS 中)
-
DataNode
DataNode 是一個維護其所在計算機的數(shù)據(jù)塊的進程,主要工作可以分為讀和寫兩部分,也就是檢索和存儲功能。同時 DataNode 也會定時向 NameNode 報告其健康狀況和其所維護的數(shù)據(jù)塊列表。
NameNode 是 HDFS 的命門,如果 NameNode 掛掉之后 HDFS 就徹底的無法提供服務了,并且存儲在內(nèi)存中的元數(shù)據(jù)信息也會丟失,那樣就永遠無法提供服務了。所以,為防止元數(shù)據(jù)丟失問題,HDFS 有了就 SecondaryNameNode 來幫助 NameNode 進行元數(shù)據(jù)的持久化,有高可用機制來進行 NameNode 的主備切換工作。
-
SecondaryNameNode
SecondaryNameNode 可以說是 NameNode 的輔助節(jié)點,在一定程度上也可以起到備份節(jié)點的作用。在 NameNode 中會有一個名為 FSimage 的舊的元數(shù)據(jù)持久化文件和一個名為 Edits Log 的預寫日志。
SecondaryNameNode 會定期詢問 NameNode 是否需要將 FSimage 和 Edits Log 進行合并 (稱為檢查點),通過設(shè)置間隔時間和 Edits Log 的文件大小閾值來限定是否需要合并。
SecondaryNameNode 從 NameNode 拉取過來 FSimage 和 Edits Log 后,會根據(jù)預寫日志進行重演,然后合并到 FSimage 中,最后將合并后的 FSimage 發(fā)給 NameNode,并且自身也會存儲一份。
當 NameNode 重啟的時候,就會讀取 FSimage 中的持久化文件進行元數(shù)據(jù)的恢復。當 NameNode 節(jié)點磁盤也壞的時候,SecondaryNameNode 保存的 FSimage 也可以一定程度上進行元數(shù)據(jù)的恢復,但是會丟失一部分數(shù)據(jù)數(shù)據(jù)。
基本流程
在介紹完 HDFS 基本組成后,我們再看看 HDFS 讀寫操作流程:
-
寫操作
當客戶端上傳文件的時候,會先向 NameNode 發(fā)送一個寫請求,NameNode 會先對身份和時候已經(jīng)存在這個文件進行一個校驗。校驗通過后,就會給客戶端一個確認消息,告訴它可以上傳。
客戶端收到確認消息后,就會對文件進行分塊,分塊操作是在客戶端進行的。分塊完成后,就會向 NameNode 詢問第一個數(shù)據(jù)塊的存放地址,NameNode 會根據(jù)動態(tài)感知機制,為這個數(shù)據(jù)塊找到一個合適的存儲位置,然后將 DataNode 的地址返回給客戶端。
客戶端在收到上傳地址后,就會與 DataNode 進行通信并上傳,由于有備份機制, DataNode 在收到數(shù)據(jù)后,會發(fā)送給備份 DataNode。
數(shù)據(jù)塊傳輸完成后,就會給客戶端發(fā)送一個確認消息。然后客戶端告訴 NameNode 上傳完成,再向 NameNode 發(fā)送上傳第二個數(shù)據(jù)塊的請求,以此類推。
由此可見 HDFS 的寫操作是串行進行的。
寫操作流程 -
讀操作
客戶端會向 NameNode 發(fā)送一個讀操作請求,NameNode 會返回最近通信的存儲請求文件的 DataNode 的節(jié)點地址。
客戶端收到地址后,就會從 DataNode 中并行讀取,然后將讀取到的數(shù)據(jù)在客戶端進行合并。
如果,讀取失敗后,客戶端就會想 NameNode 重新發(fā)送請求,NameNode 會從其它備份節(jié)點中選擇一個,返回給客戶端。
讀操作流程
HDFS 就簡單說到這里。
MapReduce
MapReduce 是 Hadoop 提供的一個分布式計算框架,基本思想就是分而算之和移動程序不移動數(shù)據(jù),也就是針對每個數(shù)據(jù)塊進行運算 (MapTask),最后將每個節(jié)點的運算結(jié)果進行匯總 (ReduceTask)。

MapReduce 的工作可以基本分為讀取、Shuffle和輸出三步:
-
分片
分片是 MR 程序?qū)斎胛募指畹囊粋€文件集的引用。不同于 HDFS 中的數(shù)據(jù)塊,一個分片就代表著一個 MapTask 輸入數(shù)據(jù)的引用。
MapReduce 程序會為每個分片都會啟動一個 MapTask 進程,讓其專門處理這個分片所引用的數(shù)據(jù)。
默認情況下,一個數(shù)據(jù)塊就對應一個分片,這樣主要是為了避免數(shù)據(jù)在網(wǎng)絡上傳輸,只需要將 MapTask 程序發(fā)送到數(shù)據(jù)塊所在的節(jié)點就行了,這就是數(shù)據(jù)不動程序動。
這樣就會產(chǎn)生數(shù)據(jù)塊過大和數(shù)據(jù)塊過小兩種情況:
- 如果數(shù)據(jù)塊過小(大量小文件),這樣每個數(shù)據(jù)塊作為一個分片,就會啟動大量的 MapTask。而每個 MapTask 都是一個進程,這樣就把大量的時間花費在了創(chuàng)建線程銷毀線程上了。MapReduce 提供了 CombineFileInputFormat 類,將所有數(shù)據(jù)塊作為一個分片,也就是只啟動一個 MapTask。
- 如果數(shù)據(jù)塊過大,那就降低了并行度,無法發(fā)揮分布式計算的優(yōu)勢??梢愿鶕?jù)具體的業(yè)務,將數(shù)據(jù)塊大小調(diào)整為合適的尺寸。
-
輸入 / InputFormat
InputFormat 用來為 MR 程序提供計算分片和獲取對應分片的 Reader 服務:
public abstract class InputFormat<K, V> { // 計算分片 public abstract List<InputSplit> getSplits(JobContext context); // 根據(jù) split 獲取數(shù)據(jù) Reader public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context); }getSplits() 方法是由客戶端進行調(diào)用的,然后將分片信息存放到 HDFS 中。ApplicationMaster 會從 HDFS 中進行拉取,并根據(jù)分片信息,選擇最優(yōu)的位置在 Worker 上啟動 MapTask。
-
MapTask
我們寫 MR 程序的時候,Map 端都會繼承 Mapper:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } // 初始化 protected void setup(Context context) { } // 我們一般都會重寫這個方法 // 默認是,將 key-value 原樣輸出 protected void map(KEYIN key, VALUEIN value, Context context){ context.write((KEYOUT) key, (VALUEOUT) value); } // 清理工作 protected void cleanup(Context context) { } // Map 任務啟動的時候,就會調(diào)用這個方法 public void run(Context context) throws IOException, InterruptedException { setup(context); try { // 內(nèi)部就是調(diào)用 RecordReader 的 nextKeyValue() 方法 while (context.nextKeyValue()) { // RecordReader 的 getCurrentKey() 和 getCurrentValue() 方法 map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }這里只是簡單的看一下工作流程,具體的細節(jié)在剖析源碼的時候再看。
Mapper 將從 RecordReader 中獲取到的 key-value 交給 map() 方法去做具體的運算工作,最后我們會調(diào)用 context.write() 方法將處理后的 key-value 寫出。
這樣就進行到了 Map 端的 Shuffle 操作。
-
Shuffle
Shuffle 操作可以分為 Map 端和 Reduce 端兩部分,總的來說,Shuffle 就做了分區(qū)、聚合和排序三件事。
Map 端調(diào)用完 context.write() 方法后,就會通過 RecordWriter 將 key-value 按 key 進行分區(qū),并寫入到環(huán)形緩沖區(qū)中,并會在環(huán)形緩沖區(qū)中進行一次快排操作。
緩沖區(qū)大小默認為 100M,閾值為 80%,也就是緩沖區(qū)寫滿 80% 的時候就會發(fā)生溢寫操作,將緩沖區(qū)的數(shù)據(jù)溢寫到磁盤,每次溢寫都會產(chǎn)生一個新的文件??梢詫⒕彌_區(qū)大小設(shè)的更大一些,盡量避免溢寫的發(fā)生。
當 Map 端將數(shù)據(jù)寫完后,會將溢寫文件進行合并,然后按在進行一次歸并排序,這樣就產(chǎn)生了分區(qū)且排序后的 key-value,來等待 Reduce 端的拉取。
如果設(shè)置了 Combiner 的話,就會在溢寫的時候執(zhí)行和最后合并數(shù)據(jù)的時候執(zhí)行,并不是只執(zhí)行一次。
分區(qū)數(shù)是由 ReduceTasK 的數(shù)量來決定的,默認使用 HashPartiton 進行分區(qū)操作,當然,也可以根據(jù)業(yè)務需求進行自定義分區(qū):
// 自定義 partition public class PhonePartition extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTask) { String phoneNumber = key.toString(); if(phoneNumber.startsWith("137")){ return 0; } if(phoneNumber.startsWith("138")){ return 1; } if(phoneNumber.startsWith("139")){ return 2; } if(phoneNumber.startsWith("135")){ return 3; } if(phoneNumber.startsWith("136")){ return 4; } return 5; } }排序操作默認使用的是字典排序,也可以自定義排序器:
// 方式一 public class MyPair implements WritableComparable<SortPair> { private String first; private int second; // ... @Override public int compareTo(SortPair sortPair) { String anoFirst = sortPair.getFirst(); int firstComp = first.compareTo(anoFirst); if(firstComp != 0){ return firstComp; } else { int anoSecond = sortPair.getSecond(); return second - anoSecond; } } // ... } // 方式二 public class MyPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text text, Text text2, int i) { return text.compareTo(text2); } } // Job job.setPartitionerClass(MyPartitioner.class);Reduce 端 Shuffle 就是對從 Map 端拉取過來的數(shù)據(jù)進行一次聚合操作,將相同 key 的 value 方法一起,并暴露給 ReduceTask 一個 value 迭代器。
-
輸出
輸出就比較簡單了,將數(shù)據(jù)輸出到指定介質(zhì)中。
YARN
Yarn 是一個獨立的資源調(diào)度框架,由 ResourceManger 和 NodeManager 兩部分構(gòu)成:
-
ResourceManger
ResourceManger 用來處理用戶提交的任務請求,并維護 NodeManager 的節(jié)點信息。
-
NodeManager
NodeManager 主要用于資源管理、任務管理和 Container (容器) 管理、

-
ApplicationMaster
ApplicationMaster 是 ResourceManger 在 NodeManager 上啟動個一個負責管理特定任務的進程,ResourceManger 只負責分派任務,不復制管理任務,這個工作就是由 ApplicationMaster 來完成的。
-
Container
資源管理單位,ApplicationMaster 向 ResourceManager 請求任務所需要的資源的時候,ResourceManager 分配給它的資源就可以理解為一個 Container。



