Hadoop詳解 - HDFS - MapReduce - YARN - HA

為什么要有Hadoop?

????? 從計(jì)算機(jī)誕生到現(xiàn)今,積累了海量的數(shù)據(jù),這些海量的數(shù)據(jù)有結(jié)構(gòu)化、半結(jié)構(gòu)化、非

結(jié)構(gòu)的數(shù)據(jù),并且這些海量的數(shù)據(jù)存儲(chǔ)和檢索就成為了一大問題。

????? 我們都知道大數(shù)據(jù)技術(shù)難題在于一個(gè)數(shù)據(jù)復(fù)雜性、數(shù)據(jù)量、大規(guī)模的數(shù)據(jù)計(jì)算。

Hadoop就是為了解決這些問題而出現(xiàn)的。


Hadoop的誕生

????? DougCutting是Lucene的作者,當(dāng)時(shí)Lucene面臨和谷歌同樣的問題,就是海量的數(shù)據(jù)存儲(chǔ)和檢索,于是就誕生了Nutch。

????? 在這之后,谷歌的大牛就為解決這個(gè)問題發(fā)了三篇論文(GFS、Map-Reduce、BigTable),這三篇論文總體表達(dá)的意思就是部署多臺(tái)廉價(jià)的服務(wù)器集群,通過分布式的方式將海量數(shù)據(jù)存儲(chǔ)在這個(gè)集群上,然后利用集群上的所有機(jī)器進(jìn)行數(shù)據(jù)計(jì)算,這樣谷歌就不用買很多很貴的服務(wù)器,只需要把普通的機(jī)器組合在一起。

Doug Cutting等人就去研究這三篇論文,發(fā)現(xiàn)價(jià)值巨大,于是Doug Cutting等人在Nutch上實(shí)現(xiàn)了GFS和Map-Reduce,使得Nutch的性能飆升。

????? 于是Doug Cutting等人就把這兩部分納入到Hadoop項(xiàng)目中,主要還是為了將Hadoop項(xiàng)目作為一個(gè)大數(shù)據(jù)整體化的解決方案。

????? 所以為什么后面就出現(xiàn)了Hadoop而不是在Nutch上去做整體化大數(shù)據(jù)解決方案。

????? 這三篇論文對(duì)應(yīng)Hadoop的組件:

????? GFS-> HDFS?????????????????? 文件系統(tǒng)

????? Map-Reduce-> MR??????? ?????計(jì)算框架

????? BigTable-> Hbase?????????????? 數(shù)據(jù)庫系統(tǒng)



什么是Hadoop?

????? Hadoop是Apache下的一個(gè)分布式系統(tǒng)基礎(chǔ)架構(gòu),主要是為了解決海量數(shù)據(jù)存儲(chǔ)和海量的數(shù)據(jù)計(jì)算問題。

????? 在這個(gè)基礎(chǔ)之上發(fā)展出了的更多的技術(shù),使得Hadoop稱為大數(shù)據(jù)技術(shù)生態(tài)圈之一。


Hadoop發(fā)行版本

1、Apache版本最原始的版本

2、Clodera版本,在大型互聯(lián)網(wǎng)企業(yè)中用的比較多,軟件免費(fèi),通過服務(wù)收費(fèi)。

3、Hortonworks文檔比較好



特點(diǎn)

????? 高可靠:維護(hù)多個(gè)副本,假設(shè)計(jì)算元素和存儲(chǔ)出現(xiàn)故障時(shí),可以對(duì)失敗節(jié)點(diǎn)重新分布處理

????? 高擴(kuò)展:在集群間分配任務(wù)數(shù)據(jù),可方便的擴(kuò)展數(shù)以千計(jì)的節(jié)點(diǎn)

????? 高效性:并行工作

????? 高容錯(cuò):自動(dòng)保存多個(gè)副本,并且能夠?qū)κ∪蝿?wù)重新分配


Hadoop組成

HDFS:一個(gè)高可靠高吞吐量的分布式文件系統(tǒng)

?? NameNode(nn):存儲(chǔ)文件的元數(shù)據(jù),如:文件名、文件目錄結(jié)構(gòu)等信息

?? DataNode(dn)在文件系統(tǒng)存儲(chǔ)文件塊數(shù)據(jù),以及數(shù)據(jù)的校驗(yàn)和,也就是真正存儲(chǔ)文件內(nèi)容的,只是文件大的時(shí)候會(huì)切割成一小塊一小塊的。

?? SecondayNameNode(2nn)用于監(jiān)控HDFS狀態(tài)的輔助后臺(tái)程序,每隔一段時(shí)間就獲取HDFS的快照,就是備份和監(jiān)控狀態(tài)



Yarn:作業(yè)調(diào)度與集群資源管理框架。(Hadoop2.0加入)

?? ResourceManager(rm)處理客戶端請(qǐng)求、啟動(dòng)和監(jiān)控MRAppMaster、監(jiān)控NodeManager,以及資源分配和調(diào)度。

?? NodeManager(nn)單個(gè)節(jié)點(diǎn)上的資源管理、處理來自ResourceManager的命令,處理來自MRAppMaster的命令。

?? MRAppMaster數(shù)據(jù)切分、為應(yīng)用程序申請(qǐng)資源,并分配內(nèi)部任務(wù)、任務(wù)監(jiān)控和容錯(cuò)。

?? Container對(duì)任務(wù)運(yùn)行環(huán)境的抽象,封裝了CPU、內(nèi)存等多維資源以及環(huán)境變量、啟動(dòng)命令等任務(wù)運(yùn)行相關(guān)信息(hadoop內(nèi)部文件操作命令和Liunx差不多)



MapReduce:分布式離線并行計(jì)算框架。

?? Map階段:并行處理數(shù)據(jù)

?? Reduce階段:對(duì)Map階段處理的結(jié)果數(shù)據(jù)進(jìn)行匯總

?Common:支持其他模塊的工具模塊。


理解Hadoop組成

????? 有一個(gè)建筑工地的建造時(shí)間很緊急,設(shè)立了一個(gè)支持小組,支援各個(gè)小分隊(duì)(Common),首先1000包水泥,這些水泥要進(jìn)行存儲(chǔ)(HDFS),假設(shè)這些水泥有防水的和不防水的,防水的水泥存到倉(cāng)庫1(HDFS-dn),不防水的存儲(chǔ)到倉(cāng)庫2(HDFS-dn),那么就要進(jìn)行記錄,哪些水泥存放到哪里了(HDFS-nn),因?yàn)橼s工期擔(dān)心水泥可能會(huì)因?yàn)槌睗衲切﹩栴},出現(xiàn)不可用,所以又準(zhǔn)備了1000包水泥,并且每天都要對(duì)這些水泥進(jìn)行檢查(HDFS-2nn)。

????? 如果一個(gè)小分隊(duì)要領(lǐng)取水泥就要和工地倉(cāng)儲(chǔ)管理人員申請(qǐng),倉(cāng)儲(chǔ)管理人員同意了,就要向公司申請(qǐng)人員來搬水泥(Yarn-MRAppMaster),開始調(diào)動(dòng)這些人員搬運(yùn)水泥(Yarn-rm),小分隊(duì)領(lǐng)取到了水泥之后,開始決定給修外墻的多少包水泥(Yarn-nm)。

????? 修外墻小組就開始拿著水泥干活了(MapReduce-Map),直到整棟樓的外墻修好了(MapReduce-Reduce),第N棟也是如此(MapReduce-Map)。



Hadoop內(nèi)為什么要如此劃分?

數(shù)據(jù)存放在Hadoop,那么Hadoop必然需要對(duì)數(shù)據(jù)進(jìn)行管理,如果沒有一個(gè)專門管理數(shù)據(jù)存儲(chǔ)的組件或數(shù)據(jù)運(yùn)算的組件,全部都融合在一個(gè)東西里面就會(huì)顯得很臃腫,并且組件之間只需要通過接口進(jìn)行溝通,那么各自的組件就可以僅僅自身的需求做優(yōu)化等,那么就不會(huì)影響到其他的組件。

各自的組件只需要做好自己的事情,對(duì)外提供接口接收相應(yīng)的數(shù)據(jù)及返回?cái)?shù)據(jù),只要符合我組件規(guī)范的就運(yùn)行,不符合就不運(yùn)行,而不需要關(guān)心其他,專心做自己的事情,也可以使得組件之間可以單獨(dú)的運(yùn)行。



Hadoop目錄

????? bin程序級(jí)命令(hdfs、Yarn等)

????? etc配置文件

????? include類庫等文件

????? lib類庫等文件

????? libexec類庫等文件

????? sbinhadoop系統(tǒng)命令(關(guān)閉、啟動(dòng)等)

????? share官方提供的案例等



Hadoop運(yùn)行模式

????? 本地模式:不需要啟動(dòng)單獨(dú)進(jìn)程,直接運(yùn)行,一般測(cè)試和開發(fā)使用,一臺(tái)機(jī)器就可以運(yùn)行,如果是在Liunx,跑的是本地,可以直接通過命令運(yùn)行相應(yīng)的jar包。


偽分布式模式:等同于分布式,但只有一個(gè)節(jié)點(diǎn),具有集群的配置信息和運(yùn)行,由于偽分布式只有一臺(tái)機(jī)器,可以不啟動(dòng)Yarn,那么也就算是Hadoop的HDFS啟動(dòng)了,直接運(yùn)行MapReduce程序的話,結(jié)果都在HDFS上,不在是在本地,如果需要交由YARN上進(jìn)行資源調(diào)度和分配任務(wù),則需要配置Yarn地址,以及指定數(shù)據(jù)獲取方式。


????? 完全分布式模式:多個(gè)節(jié)點(diǎn)一起運(yùn)行,可以指定不同節(jié)點(diǎn)干不同的活,比如機(jī)器1干NameNode的活,機(jī)器2干ResourceManger的活。


注意:啟動(dòng)NameNode時(shí),DataNode會(huì)記錄NameNode信息(id),當(dāng)緩存的NameNode記錄刪除了,這個(gè)時(shí)候啟動(dòng)就會(huì)報(bào)錯(cuò),這個(gè)時(shí)候就需要將NameNode格式化(刪除所有數(shù)據(jù)),之后在重新啟動(dòng)。


HDFS

HDFS是什么?

????? HDFS就是一個(gè)分布式文件存儲(chǔ)系統(tǒng),通過目錄樹來定位文件,由于分布式特點(diǎn)那么集群中的服務(wù)器就有各自的角色。



特點(diǎn)

????? 低成本:由于是眾多服務(wù)器組成的,那么在某服務(wù)器掛了,只需要付出一臺(tái)廉價(jià)的服務(wù)器。

????? 高容錯(cuò)性:HDFS是由眾多服務(wù)器實(shí)現(xiàn)的分布式存儲(chǔ),每個(gè)文件都會(huì)有冗余備份,那么如果存儲(chǔ)數(shù)據(jù)的某個(gè)服務(wù)器掛了,那么還有備份的數(shù)據(jù),允許服務(wù)器出現(xiàn)故障。

????? 高吞吐量:HDFS是一次寫多次讀的訪問模型,不允許修改文件,并簡(jiǎn)化了數(shù)據(jù)的一致性問題。

????? 就近原則:在數(shù)據(jù)附近執(zhí)行程序,也體現(xiàn)出來移動(dòng)計(jì)算比移動(dòng)數(shù)據(jù)效率高。

????? 可移植性:HDFS可以實(shí)現(xiàn)不同平臺(tái)之間的移植。



應(yīng)用場(chǎng)景

????? 一次寫入,多次讀取,且不支持文件的修改。

????? 適合數(shù)據(jù)分析場(chǎng)景,不適合網(wǎng)盤應(yīng)用。



HDFS數(shù)據(jù)塊

????? HDFS的文件在物理上是分塊存儲(chǔ)的,1.x版本的數(shù)據(jù)塊默認(rèn)大小是64MB,2.x版本的數(shù)據(jù)塊默認(rèn)塊大小是128MB,這個(gè)值是可以通過配置參數(shù)(dfs.blocksize)進(jìn)行調(diào)整的。

????? HDFS的塊比磁盤的塊大,目的就在于要減少尋址的開銷(標(biāo)準(zhǔn):尋址時(shí)間只占傳輸時(shí)間的1%),如果塊設(shè)置的夠大,從磁盤傳輸數(shù)據(jù)的時(shí)間明顯就大于定位這個(gè)塊開始位置所需要的文件,因此傳輸一個(gè)由多個(gè)塊組成的文件的時(shí)間取決于磁盤傳輸速率。



HDFS常用命令(和Liunx差不多)

基本命令:hadoop fs

查看幫助:hadoop fs 或 hadoop fs -help(詳情)

創(chuàng)建目錄:hadoop fs -mkdir /usr

查看目錄信息:hadoop fs -ls /usr

本地剪切,粘貼到集群:hadoop fs -moveFromLocal test.txt /usr/

追加一個(gè)文件到已存在文件的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt

顯示文件內(nèi)容:hadoop fs -cat /usr/test.txt

顯示一個(gè)文件末尾:hadoop fs -tail /usr/ test.txt

以字符形式打印一個(gè)文件內(nèi)容:hadoop fs -text /usr/test.txt

修改文件所屬權(quán)限(-chgrp、-chomd、chown)(liunx一樣用法):hadoop fs -chmod777 /usr/test.txt

從本地復(fù)制到hdfs:hadoopfs -copyFormLocal text.txt /usr/test

hdfs復(fù)制到本地:hadoop fs -copyToLocal /usr/ text.txt ./

從hdfs路徑拷貝到hdfs另一個(gè)路徑:hadoop fs -cp /usr/dir1 /usr/dir2

在hdfs目錄中移動(dòng)文件:hadoop fs -mv /usr/test.txt /usr/dir

從hdfs下載文件到本地:hadoop fs -get /usr/test.txt ./

合并下載多個(gè)文件:hadoop fs -getmerge /usr /*.txt ./result.txt

上傳文件等于copyFormLocal:hadoop fs -put test.txt /usr

刪除文件或文件夾:hadoop fs -rmr /usr/test.txt

刪除空目錄:hadoop fs -rmdir /usr/test3

統(tǒng)計(jì)文件系統(tǒng)可用空間信息(-h格式化信息):hadoop fs -df -h

統(tǒng)計(jì)文件夾大小信息:hadoop fs -du -h /

統(tǒng)計(jì)制定目錄下的文件節(jié)點(diǎn)數(shù)據(jù)量(嵌套級(jí),當(dāng)前文件個(gè)數(shù),大小):hadoop fs -count -h /usr

設(shè)置文件的副本數(shù):hadoop fs -setrep 3 /usr/test.txt


NameNode

NameNode和SecondaryNameNode工作機(jī)制


第一階段:NameNode的工作

????? 1、第一次啟動(dòng)namenode格式化后,創(chuàng)建fsimage和edits文件,如果不是第一次啟動(dòng),直接加載編輯日志和鏡像文件到內(nèi)存。

????? 2、客戶端對(duì)元數(shù)據(jù)進(jìn)行操作請(qǐng)求

????? 3、NameNode記錄操作日志,更新滾動(dòng)日志。

????? 4、NameNode在內(nèi)存中對(duì)數(shù)據(jù)進(jìn)行操作


第二階段:Secondary NameNode的工作

????? 1、Secondary NameNode詢問NameNode是否需要checkpoint,直接帶回NameNode檢查結(jié)果。

????? 2、Secondary NameNode請(qǐng)求執(zhí)行checkpoint

????? 3、NameNode滾動(dòng)正在寫的eits日志

????? 4、將滾動(dòng)前的編輯日志和鏡像文件拷貝到Secondary NameNode

????? 5、Secondary NameNode加載編輯日志和鏡像文件到內(nèi)存并且合并

????? 6、生成新的鏡像文件fsimage.chkpoint

????? 7、拷貝fsimage.chkpoint到NameNode

????? 8、NameNode將fsimage.chkpoint重命名為fsimage



說明

Fsimage文件:HDFS文件系統(tǒng)元數(shù)據(jù)的一個(gè)永久檢查點(diǎn),其中包含HDFS文件系統(tǒng)所有目錄和文件,以及node序列化信息。


Edits文件:存放HDFS文件系統(tǒng)的所有更新操作,文件系統(tǒng)客戶端執(zhí)行的所有寫操作日志都會(huì)記錄到edits文件。


Secondary

NameNode在主NameNode掛了,可以從Secondary NameNode中恢復(fù)數(shù)據(jù),但是由于同步的條件限制,會(huì)出現(xiàn)數(shù)據(jù)不一致。



DataNode

工作機(jī)制


集群安全模式

????? NameNode啟動(dòng)時(shí),受限將鏡像文件加載進(jìn)去內(nèi)存,并編輯日志文件中的各項(xiàng)操作,一旦內(nèi)存中成功建立文件系統(tǒng)元數(shù)據(jù)鏡像,則創(chuàng)建一個(gè)新的fsimage文件和一個(gè)空的編輯日志。

????? 此時(shí)的NameNode開始監(jiān)聽DataNode請(qǐng)求,但此刻,NameNode是運(yùn)行在安全模式,則此時(shí)NameNode文件系統(tǒng)對(duì)于客戶端來說是只可讀

????? 系統(tǒng)中數(shù)據(jù)塊文件并不是由NameNode維護(hù)的,而是以塊列表的形式存儲(chǔ)在DataNode,在系統(tǒng)正常操作期間,NameNode會(huì)在內(nèi)存中保留所有塊位置影像信息。

????? 在安全模式下,各個(gè)DataNode會(huì)向NameNode發(fā)送最新的塊列表信息,NameNode了解到足夠多的塊信息之后,即可高效運(yùn)行文件系統(tǒng)。

????? 如果滿足最小復(fù)本條件,NameNode會(huì)在30秒后就退出安全模式,最小復(fù)本條件指的是整個(gè)文件系統(tǒng)中99%的塊都滿足最小復(fù)本級(jí)別,在啟動(dòng)一個(gè)剛剛格式化的HDFS集群時(shí),因?yàn)橄到y(tǒng)中還沒有塊,所以NameNode不會(huì)進(jìn)入安全模式。

????? 集群?jiǎn)?dòng)完成后自動(dòng)退出安全模式。


安全模式的應(yīng)用場(chǎng)景

????? 銀行對(duì)賬、維護(hù)。



Java操作HDFS

Demo

public static void main(String[] args) throws IllegalArgumentException, IOException,

InterruptedException, URISyntaxException {


??????? //配置信息

??????? Configurationconfiguration = new Configuration();


??????? //獲取文件系統(tǒng)

??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");


??????? //拷貝本地文件到集群

??????? fileSystem.copyFromLocalFile(new Path("e:/hdfs/test.txt"), new Path("/usr/hdfs/test.txt"));


??????? //關(guān)閉

??????? fileSystem.close();

}


HDFS數(shù)據(jù)流

IO流寫流程


IO流方式上傳文件 (Java)

public void fileUpload() throws IOException, InterruptedException,

URISyntaxException {

??????? //配置

??????? Configurationconfiguration = new Configuration();


??????? //文件系統(tǒng)

??????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"),configuration,"levi");


??????? //獲取輸出流(上傳到服務(wù)器) - 服務(wù)器

??????? FSDataOutputStreamfsDataOutputStream = fileSystem.create(new Path("/usr/hdfs/test03.txt"));


??????? //文件輸入流(本地上傳)

??????? FileInputStreamfileInputStream = new java.io.FileInputStream(new File("E:/hdfs/test03.txt"));


??????? //流對(duì)接

??????? IOUtils.copyBytes(fileInputStream, fsDataOutputStream, configuration);


??????? fsDataOutputStream.hflush();

??????? IOUtils.closeStream(fileInputStream);

??????? IOUtils.closeStream(fsDataOutputStream);


??????? //關(guān)閉

??????? fileSystem.close();


??? }


IO流讀流程



IO流方式下載文件 (Java)

public void readFile() throws IOException, InterruptedException,

URISyntaxException {

?????? //配置

?????? Configurationconfiguration = new Configuration();


?????? //文件系統(tǒng)

?????? FileSystemfileSystem= FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "levi");


?????? //輸入流(下載) 服務(wù)器

?????? FSDataInputStreamfsDataInputStream = fileSystem.open(new Path("/usr/hdfs/hadoop-2.7.2.tar.gz"));


?????? //輸出(本地)

?????? FileOutputStreamfileOutputStream = new FileOutputStream(new File("E:/hdfs/block.txt"));


?????? //流對(duì)接

?????? //--- 第一塊

?????? /*byte[]buf= newbyte[1024];

?????? for (inti = 0; i <1024*128; i++) {

?????????? fsDataInputStream.read(buf);

?????????? fileOutputStream.write(buf);

?????? }

?????? //關(guān)閉

?????? fsDataInputStream.close();

?????? fileOutputStream.close();*/


?????? //--- 第二塊

?????? fsDataInputStream.seek(1024 * 1024 * 128);//定位這個(gè)位置開始讀

?????? IOUtils.copyBytes(fsDataInputStream, fileOutputStream, 1024);

?????? IOUtils.closeStream(fileOutputStream);

?????? IOUtils.closeStream(fsDataInputStream);


?????? fileSystem.close();

??? }


副本節(jié)點(diǎn)選擇

????? 在海量數(shù)據(jù)的處理中,節(jié)點(diǎn)之間的數(shù)據(jù)傳輸速率是很重要,特別是在帶寬很稀缺的情況下,而節(jié)點(diǎn)和節(jié)點(diǎn)之間的距離越遠(yuǎn),那么必然會(huì)影響數(shù)據(jù)的傳輸。

????? 在成千的服務(wù)器集群中,Hadoop是怎么選擇副本節(jié)點(diǎn)呢?


低版本Hadoop

第一個(gè)副本在客戶端所處的節(jié)點(diǎn)上,但是如果客戶端是在集群外,隨機(jī)選取一個(gè)節(jié)點(diǎn)

第二個(gè)副本和第一個(gè)副本位于不同機(jī)架的隨機(jī)節(jié)點(diǎn)上,也就是不和第一個(gè)副本在相同機(jī)架。

第三個(gè)副本和第二個(gè)副本位于相同機(jī)架,節(jié)點(diǎn)隨機(jī)


Hadoop2.5版本以上

????? 第一個(gè)副本在客戶端所處節(jié)點(diǎn)上。如果客戶端在集群外,隨機(jī)選一個(gè)

????? 第二個(gè)副本和第一個(gè)副本位于相同機(jī)架,隨機(jī)節(jié)點(diǎn)

????? 第三個(gè)副本位于不同機(jī)架,節(jié)點(diǎn)隨機(jī)


HDFS誤區(qū)

小文件存儲(chǔ)

每個(gè)文件均按照塊存儲(chǔ),每個(gè)塊的元數(shù)據(jù)存儲(chǔ)在NamNode的內(nèi)存中(一個(gè)文件/目錄/文件塊一般占有150字節(jié)的元數(shù)據(jù)內(nèi)存空間),因此Hadoop存儲(chǔ)小文件會(huì)非常低效,因?yàn)榇罅啃∥募?huì)耗盡NameNode中大部分內(nèi)存,但存儲(chǔ)小文件所需要的磁盤容量和存儲(chǔ)這些文件原始內(nèi)容所需要的磁盤空間相比也不會(huì)增多。


例如:上傳一個(gè)文件1MB,那么這個(gè)文件會(huì)在HDFS中的一個(gè)塊存儲(chǔ)著,這個(gè)塊默認(rèn)是128MB,那么是不是占用了128MB的磁盤空間呢?

????? 每一個(gè)塊128MB只是HDFS的邏輯上的劃分,所以在磁盤占用空間還是1MB,只有當(dāng)一個(gè)或多個(gè)文件在一個(gè)塊內(nèi)超過128MB,之后將這個(gè)文件進(jìn)行切割。




副節(jié)點(diǎn)處理

HDFS是先把當(dāng)前這個(gè)節(jié)點(diǎn)處理完,在去處理副本節(jié)點(diǎn)的。



回收站

????? 回收站默認(rèn)是不啟用的,在core-site.xml文件中的配置fs.trash.interval默認(rèn)是為0.


HDFS全過程


MapReduce

MapReduce是什么?

????? MapReduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)基于Hadoop的數(shù)據(jù)分析應(yīng)用的核心框架。

????? MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)的運(yùn)行在一個(gè)Hadoop集群上。


作用

????? 由于硬件資源限制,海量數(shù)據(jù)無法在單機(jī)上處理,單機(jī)版程序擴(kuò)展到集群進(jìn)行分布式運(yùn)算,增加程序的復(fù)雜度和開發(fā)難度。

????? MapReduce框架就是要使得開發(fā)人員開源將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,而分布式運(yùn)算的復(fù)雜性交由MapReduce來處理。



特點(diǎn)

????? 適合數(shù)據(jù)復(fù)雜度運(yùn)算

????? 不適合算法復(fù)雜度運(yùn)算

????? 不適合實(shí)時(shí)計(jì)算、流式計(jì)算


核心思想


分布式的運(yùn)算程序最少需要分成兩個(gè)階段:

第一個(gè)階段:MapTask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干

第二個(gè)階段:ReduceTask并發(fā)實(shí)例,互不相干,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有MapTask并發(fā)實(shí)例的輸出


MapReduce編程模型只能包含一個(gè)Map階段和Reduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個(gè)MapReduce程序,串行運(yùn)行。

?

總結(jié)

????? Map并行處理任務(wù)(運(yùn)算)。

????? Reduce:等待相關(guān)的所有Map處理完任務(wù),在將任務(wù)數(shù)據(jù)匯總輸出。

????? MRAppMaster負(fù)責(zé)整個(gè)程序的過程調(diào)度和狀態(tài)協(xié)調(diào)。



MapReduce進(jìn)程

????? 一個(gè)完整的MapReduce程序在分布式允許時(shí)有三類實(shí)例進(jìn)程:

????? MRAppMaster負(fù)責(zé)整個(gè)程序的過程調(diào)度和狀態(tài)協(xié)調(diào)。

????? MapTask負(fù)責(zé)Map階段的整個(gè)數(shù)據(jù)處理流程。

????? ReduceTask負(fù)責(zé)Reduce階段的整個(gè)數(shù)據(jù)處理流程。



序列化

????? 序列化就是把內(nèi)存中的對(duì)象轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議),以便于存儲(chǔ)(持久化)和網(wǎng)絡(luò)傳輸。

????? 而序列化就是Map到Reducer的橋梁。


????? Java序列化是一個(gè)重量級(jí)的序列化框架(Serializable),使用這個(gè)框架進(jìn)行序列化后會(huì)附帶很多額外信息(各種校驗(yàn)信息、header等),不便于網(wǎng)絡(luò)傳輸,所以Hadoop自己開發(fā)了一套序列化機(jī)制(Writable),精確、高效。


Java類型Hadoop

? Writable類型

booleanBooleanWritable

byteByteWritable

intIntWritable

floatFloatWritable

longLongWritable

doubleDoubleWritable

stringText

mapMapWritable

arrayArrayWritable


備注:自定義的反序列類中的write方法和read方法中DataOutput和DataInput這兩個(gè)類所提供的方法中,對(duì)應(yīng)Java類型String的方法,分別是writeUTF()和readUTF()。



實(shí)例(統(tǒng)計(jì)單詞)

public class WordCountMapper extends Mapper<LongWritable, Text, Text,

IntWritable> {


??? private Text key = new Text();


??? @Override

??? protected void map(LongWritable key, Text value, Context context)

?????????? throws IOException, InterruptedException {

?????? //讀取每一行

?????? Stringline = value.toString();


?????? //切割出每一個(gè)單詞

?????? String []words = line.split("\t");


?????? //將讀取到的每一個(gè)單詞都寫出,并且值都為1,因?yàn)槭窃趍ap計(jì)算完后到reduce進(jìn)行匯總,形成Key 多個(gè)Value

?????? for (String word : words) {

?????????? //每次文件內(nèi)的讀取一行都調(diào)用一次map,那樣就形成了調(diào)用多次map,那樣的話就不用創(chuàng)建多個(gè)key對(duì)象了

?????????? this.key.set(word);

?????????? context.write(this.key, new IntWritable(1));

?????? }

??? }

}


public class WorkCountReducer extends Reducer<Text, IntWritable, Text,

IntWritable>{


??? @Override

??? protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

?????? //這里就形成 多個(gè)值的匯總結(jié)果,那么將這個(gè)值多個(gè)進(jìn)行匯總后,統(tǒng)一歸并到一個(gè)key,就形成了一個(gè)key對(duì)應(yīng)多個(gè)value

?????? int count = 0;

?????? for (IntWritable value : values) {

?????????? count += value.get();

?????? }

?????? context.write(key, new IntWritable(count));


??? }

}


public class WordCountDriver {

??? public static void main(String[] args) throws Exception {

?????? //配置

?????? Configurationconfiguration = new Configuration();


?????? //任務(wù)運(yùn)行

?????? Jobjob= Job.getInstance(configuration);

?????? job.setJarByClass(WordCountDriver.class);


?????? //運(yùn)算類和匯總類

?????? job.setMapperClass(WordCountMapper.class);

?????? job.setReducerClass(WorkCountReducer.class);


?????? //運(yùn)算和匯總輸入和輸出

?????? job.setMapOutputKeyClass(Text.class);

?????? job.setMapOutputValueClass(IntWritable.class);


?????? //最終輸出

?????? job.setOutputKeyClass(Text.class);

?????? job.setOutputValueClass(IntWritable.class);


?????? //運(yùn)算文件的輸入和結(jié)果輸出

?????? FileInputFormat.setInputPaths(job, new Path("E:/hadooptest/mapreduce/input"));

?????? FileOutputFormat.setOutputPath(job, new Path("E:/hadooptest/mapreduce/output"));


?????? //提交

?????? job.submit();


?????? //等待

?????? boolean result = job.waitForCompletion(true);

?????? System.exit(result ? 0 : 1);

??? }

}


程序流程分析


1、MapReduce程序讀取輸入目錄存放的相應(yīng)文件。

2、客戶端在submit方法執(zhí)行之前,獲取到待處理的數(shù)據(jù)信息,讓后根據(jù)急群眾參數(shù)配置形成一個(gè)任務(wù)分配規(guī)劃。

????? 1、建立連接

????? 2、創(chuàng)建提交任務(wù)的代理(本地:LocalRunner、遠(yuǎn)程:YarnRunner)

????? 3、創(chuàng)建給集群提交數(shù)據(jù)的stag路徑

????? 4、獲取到任務(wù)id,并創(chuàng)建任務(wù)路徑

????? 5、獲取到任務(wù)jar包,拷貝jar包到集群(這個(gè)jar就是程序運(yùn)行的業(yè)務(wù)代碼)

????? 6、計(jì)算切片,生成切片規(guī)劃文件

computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB

????? 7、提交任務(wù),返回提交狀態(tài)


3、客戶端提交job.split、jar包、job.xml等文件給Yarn,Yarn中的resourcemanager啟動(dòng)MRAppMater。

4、MRAppMater啟動(dòng)后根據(jù)job的描述信息,計(jì)算出需要的MapTask實(shí)例數(shù)量,然后向集群申請(qǐng)機(jī)器,啟動(dòng)相應(yīng)數(shù)量的Map Task進(jìn)程。

5、MapTask利用客戶指定的InputFormat來讀取數(shù)據(jù),形成KV對(duì)。

6、MapTask將輸入KV對(duì)傳遞給客戶定義的map()方法,做邏輯運(yùn)算。

7、map()運(yùn)算完畢后將運(yùn)算結(jié)果的KV對(duì),手機(jī)到MapTask緩存。

8、MapTask緩存中的KV對(duì)按照K分區(qū)排序后不斷寫到磁盤文件。

9、MRAppMaster監(jiān)控到所有MapTask進(jìn)程任務(wù)完成后,會(huì)根據(jù)用戶指定的參數(shù)啟動(dòng)相應(yīng)數(shù)量的ReduceTask進(jìn)程,并告知ReduceTask進(jìn)程要處理的數(shù)據(jù)分區(qū)。

10、ReduceTask進(jìn)程啟動(dòng)后,根據(jù)MRAppMaster告知待處理數(shù)據(jù)所在位置,從N臺(tái)MapTask運(yùn)行所在的機(jī)器上獲取到N個(gè)MapTask輸出結(jié)果文件,并在本地運(yùn)行重新歸并排序,按照相同Key的KV為一個(gè)組,調(diào)用客戶定義的reduce()方法進(jìn)行邏輯運(yùn)算。

11、ReduceTask運(yùn)算完畢后,調(diào)用客戶指定的OuputFormat將結(jié)果數(shù)據(jù)輸出(文件)到外部存儲(chǔ)。


說明:

????? 切片是邏輯上的切片

????? 規(guī)劃文件就是里面描述了切多少個(gè)片,每個(gè)片是怎么樣的。



數(shù)據(jù)切片

????? MapTask的并行任務(wù)是否越多越好?并行度是如何決定的?MapTask到底開多少個(gè)合適?


????? 1、一個(gè)job的map()階段并行度(MapTask開幾個(gè)),由客戶端在提交job時(shí)決定。

????? 2、每一個(gè)Split切片分配一個(gè)MapTask并行實(shí)例處理。

????? 3、默認(rèn)情況下切片大小=塊大小(blocksize)

????? 4、切片時(shí)不考慮數(shù)據(jù)集整體,而是針對(duì)每一個(gè)文件單獨(dú)切片(這個(gè)是邏輯上的劃分)



切片流程

????? 1、獲取到數(shù)據(jù)存儲(chǔ)目錄

????? 2、找到要便利處理目錄下的每一個(gè)文件

????? 3、讀取第一個(gè)文件test.txt(257MB)

??????????? 1、獲取文件大小

??????????? 2、計(jì)算分片大小,每次切片時(shí),都要判斷剩下的部分是否大于塊大小的1.1倍,大于就在劃分一個(gè)塊切片

??????????? 切片:

第一塊:128MB

????????????????? 第二塊:129MB / 128MB = 1.0078125

????????????????? 1.0078125< 1.1 =不在切片,反之繼續(xù)切

????????????????? 源碼:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));

??????????? 3、將切片信息寫到一個(gè)切片規(guī)劃文件(說明文件)中

??????????? 4、整個(gè)切片的核心過程在于getSplit()方法(看submit()源碼)中完成,數(shù)據(jù)切片只是邏輯上對(duì)輸入數(shù)據(jù)進(jìn)行切片,并不會(huì)在磁盤上,將文件切分進(jìn)行存儲(chǔ)。

??????????? InputSplit只是記錄了分片的元數(shù)據(jù)信息。比如:起始位置、長(zhǎng)度、所在的節(jié)點(diǎn)列表等。


注意:塊是HDFS上物理存儲(chǔ)的數(shù)據(jù),切片只是邏輯上的劃分。

??????????? 5、提交切片規(guī)劃文件(說明文件)到Y(jié)arn上,Yarn上的MrAppMaster就根據(jù)切片規(guī)劃文件(說明文件)計(jì)算開啟的MapTask個(gè)數(shù)(多少個(gè)切片就多少個(gè)MapTask)。



FileInputFormat中默認(rèn)的切片機(jī)制

????? 1、簡(jiǎn)單按照文件內(nèi)容長(zhǎng)度切片

????? 2、切片大小,默認(rèn)是塊大小

????? 3、切片時(shí)不考慮數(shù)據(jù)集整體性,而是逐個(gè)文件的單獨(dú)切片,循環(huán)遍歷每一個(gè)文件。


????? MaxSize(切片最大值):如果比塊大小還小,則會(huì)讓切片變小。

MinSize(切片最小值):如果比塊大小還大,則會(huì)讓切片變得比塊還大。


假設(shè):塊大小128MB

?????????????????????? MaxSize設(shè)為100MB

?????????????????????? 切片后的存儲(chǔ)占?jí)K大小100MB



小文件切片處理

????? 如果有大量的小文件,而每一個(gè)文件都是一個(gè)單獨(dú)的切片,都會(huì)各自交給一個(gè)MapTask處理,那么需要開啟大量的MapTask,則會(huì)產(chǎn)生大量的MapTask,導(dǎo)致處理效率低下。


解決方案

????? 1、在數(shù)據(jù)處理前端,先把小文件合并成大文件,在上傳到HDFS做后續(xù)分析

????? 2、如果已經(jīng)有大量的小文件存在HDFS,使用CombineFileInputFormat進(jìn)行處理,CombineFileInputFormat的切片邏輯跟TextFileInputFormat不同,他可以將多個(gè)小文件邏輯上規(guī)劃到一個(gè)切片中,這樣多個(gè)小文件就可以交給一個(gè)MapTask。

????? 3、優(yōu)先滿足最小切片大小,不超過最大切片大小的前提下。




文件合并

//-------- 使用提供的自定義類,指定切片大小

job.setInputFormatClass(CombineTextInputFormat.class);

//最大輸入切片大小,一個(gè)文件的大小是4M就開始切,算法是1.1倍

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

//最小輸入切片大小,多個(gè)文件合并到了一起,到了2M就切,算法是1.1倍,優(yōu)先滿足最小切片大小

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);


備注:在運(yùn)行日志中查找number of就可以看到了


Shuffle機(jī)制

1、在MapReduce中,Map階段處理的數(shù)據(jù)如何傳遞給Reduce階段額,是MapReduce框架中關(guān)機(jī)的一個(gè)流程,這個(gè)流程就叫Shuffle。

????? 2、Shuffle(洗牌、發(fā)牌):核心就是數(shù)據(jù)分區(qū)、排序、緩存

????? 3、MapTaks輸出處理結(jié)果數(shù)據(jù),分發(fā)給ReduceTask并在分發(fā)過程中對(duì)數(shù)據(jù)按照Key進(jìn)行分區(qū)和排序。



Shuffle機(jī)制緩存流程圖

????? Shuffle是MapReduce處理流程中一個(gè)過程,每一個(gè)步驟都是分散在各個(gè)MapTask和ReduceTask節(jié)點(diǎn)上。


MapReduce詳細(xì)運(yùn)行流程



總結(jié)

????? MapReduce詳細(xì)運(yùn)行流程圖就是一個(gè)流水線一般的作業(yè),從左向右過去,而在開發(fā)的過程中,需要使用到什么組件,這些組件會(huì)起到什么作用,在哪一個(gè)時(shí)間起作用,都可以在這個(gè)圖中詳細(xì)的描述


分區(qū)

自定義分區(qū)

public classMyPartitionerextends Partitioner<Text, FlowBean>{

??? @Override

??? public int getPartition(Text key, FlowBean value, int numPartitions) {

?????? //拿到手機(jī)號(hào)碼前三位

?????? StringphoneNum = key.toString().substring(0, 3);


?????? //建立5個(gè)分區(qū),從0開始

?????? int partition = 4;


?????? //判斷

?????? if("135".equals(phoneNum)) {

?????????? partition = 0;


?????? }else if("136".equals(phoneNum)) {

?????????? partition = 1;


?????? }else if("137".equals(phoneNum)) {

?????????? partition = 2;


?????? }else if("138".equals(phoneNum)) {

?????????? partition = 3;

?????? }


?????? return partition;

??? }

}


?//設(shè)置分區(qū)類

?????? //如果沒有設(shè)置分區(qū),那么則會(huì)按照塊大小去計(jì)算什么時(shí)候進(jìn)行分區(qū)

?????? job.setPartitionerClass(MyPartitioner.class);


?????? //設(shè)置ReduceTask數(shù)量

?????? job.setNumReduceTasks(5);


總結(jié)

reduce數(shù)量小于分區(qū)數(shù)量就會(huì)報(bào)錯(cuò)。

reduce數(shù)量是1,那么則所有結(jié)果輸出到一個(gè)文件內(nèi),即便配置了分區(qū)也不會(huì)去跑分區(qū)的代碼(執(zhí)行分區(qū))

reduce數(shù)量大于分區(qū)數(shù)量,輸出的其他文件為空

分區(qū)數(shù)量 = reduce數(shù)量,按照分區(qū)數(shù)量輸出結(jié)果文件數(shù)量


分區(qū)就是對(duì)map的結(jié)果數(shù)據(jù)進(jìn)行二次處理,從而再去決定是否影響輸出的reduce結(jié)果輸出。


排序

????? MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)(按Key排序)進(jìn)行排序,這個(gè)操作屬于Hadoop默認(rèn)行為,任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要。

????? 對(duì)于MapTask,他會(huì)把處理的結(jié)果暫時(shí)放到一個(gè)緩沖區(qū),當(dāng)緩沖區(qū)使用率達(dá)到了閾值就對(duì)緩沖區(qū)的數(shù)據(jù)進(jìn)行排序,并將這些有序的數(shù)據(jù)寫到磁盤上,而當(dāng)數(shù)據(jù)處理完后,他會(huì)對(duì)磁盤上所有文件進(jìn)行一次合并,將這些文件合并成一個(gè)有序的文件。

????? 對(duì)于ReduceTask,他從每個(gè)MapTask遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次合并,生成一個(gè)更大的文件,如果內(nèi)存文件大小或數(shù)目超過閾值,則進(jìn)行合并后將數(shù)據(jù)寫出到磁盤上,當(dāng)所有的數(shù)據(jù)拷貝完畢后,再統(tǒng)一的對(duì)內(nèi)存核磁盤上的所有文件進(jìn)行一次合并。


自定義排序

public class FlowBean implements WritableComparable<FlowBean>

{


??? private Long sum;


??? public FlowBean() {

?????? super();

??? }


??? @Override

??? public void write(DataOutput dataOutput) throws IOException {

?????? dataOutput.writeLong(sum);

??? }


??? @Override

??? public void readFields(DataInput dataInput) throws IOException {

?????? sum = dataInput.readLong();

??? }


??? public Long getSum() {

?????? return sum;

??? }


??? public void setSum(Long sum) {

?????? this.sum = sum;

??? }


??? @Override

??? public String toString() {

?????? return sum.toString();

??? }


??? @Override

??? public int compareTo(FlowBean o) {

?????? return this.sum > o.getSum() ? -1 : 1;

??? }

}


總結(jié)

Shullt規(guī)定Key是要進(jìn)行排序的,如果作為Key是必須要實(shí)現(xiàn)WritableComparable接口的。

Combiner合并

ReducrTask是接收總的MapTask結(jié)果,Combiner在每一個(gè)MapTask運(yùn)行的,對(duì)每每個(gè)MapTask的結(jié)果匯總(局部匯總),將MapTask匯總后之后進(jìn)行壓縮傳輸,可以減少網(wǎng)絡(luò)傳輸量。

????? 但是Combiner的前提是不能影響到最終的業(yè)務(wù)邏輯,如果是累加求和是沒有問題的,如果是求平均值就有問題的。

如:

1、在每一個(gè)MapTask進(jìn)行求平均值之后在ReduceTask再求一次平均值,結(jié)果是不一樣的。

????? 2、將MapTask的數(shù)據(jù)全部匯總到ReduceTask之后再求平均值。


這兩種結(jié)果不一樣的。



自定義Combiner合并

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,

IntWritable>{

??? @Override

??? protected void reduce(Text key, Iterable<IntWritable> values,

?????????? Contextcontext)throws IOException, InterruptedException {

?????? int count = 0;

?????? for (IntWritable intWritable : values) {

?????????? count += intWritable.get();

?????? }

?????? context.write(key, new IntWritable(count));

??? }

}

//reduce是接收總的MapTask匯總,combiner在每一個(gè)maptask運(yùn)行的,對(duì)每一個(gè)maptask匯總

//如:每一個(gè)maptask都進(jìn)行匯總,之后進(jìn)行壓縮傳輸

job.setCombinerClass(WordCountCombiner.class);


分組

????? 就是對(duì)分區(qū)排序好的數(shù)據(jù),在進(jìn)行一次合并分類開來,再一次合并的話,就有個(gè)比較標(biāo)識(shí),如果兩個(gè)數(shù)據(jù)標(biāo)識(shí)是一樣的,就認(rèn)為是一組數(shù)據(jù),最后過濾去重,最終得到有哪些組。


自定義分組

public classOrderGoupingComparatorextends WritableComparator {


??? protected OrderGoupingComparator() {

?????? super(OrderBean.class, true);

??? }

??? @Override

??? public intcompare(WritableComparablea,WritableComparableb) {

?????? OrderBeanabean = (OrderBean) a;

?????? OrderBeanbbean = (OrderBean) b;

?????? // 將orderId相同的bean都視為一組

?????? return abean.getOrderId().compareTo(bbean.getOrderId());


??? }

}

//設(shè)置Reduce端分組

job.setGroupingComparatorClass(OrderGoupingComparator.class);


//分區(qū)

job.setPartitionerClass(OrderPartition.class);

job.setNumReduceTasks(3);


自定義InputFormat

????? 對(duì)小文件的輸入進(jìn)行合并處理。


1、設(shè)置文件不可切割

2、讀取到整個(gè)文件,并且整個(gè)文件的數(shù)據(jù)作為value輸出給MapTask(將分片傳進(jìn)去去讀取后,將讀取到的所有分片數(shù)據(jù)合并給到MapTask)

3、MapTask在對(duì)合并后的數(shù)據(jù)做操作

public class DistriutedCacheMapper extends Mapper<LongWritable, Text, Text,

NullWritable>{


??? private Map<String, String> map = new HashMap<>();


??? private Text key = new Text();


??? @Override

??? protected void setup(Mapper<LongWritable, Text,

Text, NullWritable>.Context context)

?????????? throws IOException, InterruptedException {

?????? //獲取緩存文件,這個(gè)文件給加載進(jìn)了hadoop系統(tǒng)了,在緩存中的根,可以直接通過名字調(diào)用

?????? BufferedReaderbufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("pd.txt"))));

?????? Stringline;

?????? while(StringUtils.isNotEmpty(line = bufferedReader.readLine())) {

?????????? //數(shù)據(jù)處理

?????????? String []strings = line.split("\t");


?????????? //將數(shù)據(jù)放到緩存集合中

?????????? map.put(strings[0], strings[1]);

?????? }

?????? bufferedReader.close();

??? }


??? @Override

??? protected void map(LongWritable key, Text value, Context context)

?????????? throws IOException, InterruptedException {

?????? //讀取到order的每一行

?????? String[]strings = value.toString().split("\t");


?????? StringorderId = strings[0];


?????? Stringname = map.get(orderId);


?????? this.key.set(value.toString() + "\t" + name);

?????? context.write(this.key, NullWritable.get());

??? }

}


自定義OutputFormat

????? 獲取到ReduceTask的運(yùn)行結(jié)果,自定義要輸出的結(jié)果數(shù)據(jù)和文件

public class FilterRecordWriter extends RecordWriter<Text,

NullWritable>{


??? private FileSystem fileSystem;

??? private FSDataOutputStream aCreate;

??? private FSDataOutputStream oCreate;


??? publicFilterRecordWriter(TaskAttemptContext job) {

?????? try {

?????????? fileSystem= FileSystem.get(job.getConfiguration());

?????????? //創(chuàng)建輸出文件路徑

?????????? PathaPath = new Path("E:\\hadooptest\\mapreduce\\a.log");

?????????? PathoPath = new Path("E:\\hadooptest\\mapreduce\\o.log");


?????????? //創(chuàng)建輸出流

?????????? aCreate = fileSystem.create(aPath);

?????????? oCreate = fileSystem.create(oPath);

?????? }catch (IOException e) {

?????????? e.printStackTrace();

?????? }

??? }


??? @Override

??? public void write(Text key, NullWritable value) throws IOException, InterruptedException {

?????? if(key.toString().contains("levi")) {

?????????? aCreate.write(key.toString().getBytes());

?????? }else {

?????????? oCreate.write(key.toString().getBytes());

?????? }

??? }


??? @Override

??? public void close(TaskAttemptContext context) throws IOException, InterruptedException {

?????? if(null != aCreate) {

?????????? aCreate.close();

?????? }

?????? if(null != oCreate) {

?????????? oCreate.close();

?????? }

??? }

}


//設(shè)置輸出類為自定義輸出類

job.setOutputFormatClass(FliterOutputFormat.class);

//雖然自定義了一個(gè)輸出,但是還是要輸出,因?yàn)橛袀€(gè)成功狀態(tài)標(biāo)識(shí)文件要輸出,不然會(huì)報(bào)錯(cuò)

FileOutputFormat.setOutputPath(job, new Path("E:\\hadooptest\\mapreduce\\output"));


計(jì)數(shù)器

Hadoop為每一個(gè)作業(yè)維護(hù)了若干個(gè)內(nèi)置計(jì)算器,以描述多項(xiàng)指標(biāo),例如:某些計(jì)數(shù)器記錄已處理的字節(jié)數(shù)等。


計(jì)數(shù)器的使用

context.getCounter("counterGroup組名","countera變量"),increment(1);

說明:計(jì)數(shù)器的結(jié)果在程序運(yùn)行后的控制臺(tái)日志中可查看



總結(jié)

????? HDFS根據(jù)配置在各個(gè)節(jié)點(diǎn)存儲(chǔ)數(shù)據(jù),并且存儲(chǔ)相應(yīng)的副本數(shù)據(jù)。

????? MapReduce就是在需要執(zhí)行無論是MapTask或ReduceTask的時(shí)候,會(huì)先去ResouceManager去詢問,任務(wù)要在哪里運(yùn)行,其實(shí)ResourceManger就是看要運(yùn)行這個(gè)任務(wù)的輸入數(shù)據(jù)在哪個(gè)節(jié)點(diǎn),從而去告知這個(gè)節(jié)點(diǎn)執(zhí)行任務(wù),那么就形成了直接移動(dòng)計(jì)算,而不是移動(dòng)數(shù)據(jù)的方式。

因?yàn)閿?shù)據(jù)可能存儲(chǔ)在服務(wù)器1或服務(wù)器2…服務(wù)器,那么不需要移動(dòng)數(shù)據(jù),負(fù)責(zé)執(zhí)行任務(wù)的服務(wù)器,到指定的路徑,下載要運(yùn)算的任務(wù)jar包,直接在本地運(yùn)行,那么當(dāng)數(shù)據(jù)非常大的時(shí)候就不用去移動(dòng)數(shù)據(jù)。


YARN

Yarn是什么?

????? Yarn是一個(gè)資源調(diào)度平臺(tái),負(fù)責(zé)為運(yùn)算程序提供服務(wù)器運(yùn)算資源,相當(dāng)于一個(gè)分布式的操作系統(tǒng)平臺(tái),在之前說了,可以配置MapReduce在Yarn之上運(yùn)行,所以MapReduce等運(yùn)算程序則相當(dāng)于運(yùn)行于操作系統(tǒng)之上的應(yīng)用程序。


Yarn機(jī)制

????? 1、不需要知道用戶提交的程序運(yùn)行機(jī)制,只要符合Yarn規(guī)范的資源請(qǐng)求機(jī)制即可使用Yarn,Spark、Storm等運(yùn)算框架都可以整合在Yarn上運(yùn)行,意味著與用戶程序完全解耦。

????? 2、只提供運(yùn)算資源的調(diào)度,程序向Yarn申請(qǐng)資源,Yarn負(fù)責(zé)分配資源

????? 3、Yarn總的資源調(diào)度是ResourceManager,提供運(yùn)算資源的角色叫NodeManager。

????? 4、Yarn作為一個(gè)通用的資源調(diào)度平臺(tái),企業(yè)以前存在的各種運(yùn)算集群都可以整合在一個(gè)物理集群上,提高資源利用率,方便數(shù)據(jù)共享。


Yarn作業(yè)流程



1、客戶端將MapReduce程序提交到客戶端所在的節(jié)點(diǎn)。

2、YarnRunner就向RsourceManager申請(qǐng)一個(gè)Application。

3、RsourceManager內(nèi)部運(yùn)行一下,看看哪個(gè)節(jié)點(diǎn)離提交申請(qǐng)節(jié)點(diǎn)近,以及系統(tǒng)資源等,內(nèi)部運(yùn)行完了,就將應(yīng)用程序資源路徑返回給YarnRunner。

4、程序就將運(yùn)行程序所需要的資源提交到HDFS上。

5、程序資源提交完后,申請(qǐng)運(yùn)行MRAppMaster。

6、RsourceManager將用戶請(qǐng)求轉(zhuǎn)化為一個(gè)task(任務(wù)),并尋找最適合的NodeManager,并將任務(wù)分配給這個(gè)NodeManager。

7、NodeManager領(lǐng)取到任務(wù)后,創(chuàng)建容器(Container),并產(chǎn)生MRAppMaster。

8、MRAppMaster向RsourceManager申請(qǐng)運(yùn)行N個(gè)MapTask容器(切片文件中有說明)。

9、RsourceManager又尋找了一下,將MapTask分配給另外兩個(gè)NodeManager,這兩個(gè)NodeManager領(lǐng)取到任務(wù),并且創(chuàng)建容器(Container)。

10、RsourceManager告知申請(qǐng)運(yùn)行MapTask容器的NodeManger,向那兩個(gè)接受到任務(wù)的NodeManager發(fā)送程序啟動(dòng)腳本,這兩個(gè)NodeManger就分別啟動(dòng)MapTask,MapTask對(duì)數(shù)據(jù)進(jìn)行分區(qū)排序。

11、MRAppMaster看程序都跑完了,趕緊申請(qǐng)2個(gè)容器,運(yùn)行ReduceTask。

12、ReduceTask的容器向MapTask容器獲取相應(yīng)分區(qū)的數(shù)據(jù),并執(zhí)行任務(wù)。

13、程序運(yùn)行完畢后,MapResource會(huì)向RsourceManager注銷自己。


Hadoop – HelloWorld

準(zhǔn)備

1、三臺(tái)機(jī)器

2、ssh

3、防火墻



配置

JAVA_HOME

hadoop-env.sh

yarn-env.sh

mapred-env.sh



core-site.xml

<!-- 指定HDFS中NameNode的地址-->

fs.defaultFS

hdfs://hadoop-senior00-levi.com:8082


<!-- 指定hadoop運(yùn)行時(shí)產(chǎn)生文件的存儲(chǔ)目錄-->

hadoop.tmp.dir

/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp



yarn-site.xml


<!-- reducer獲取數(shù)據(jù)的方式-->

yarn.nodemanager.aux-services

mapreduce_shuffle


<!-- 指定YARN的ResourceManager的地址-->

yarn.resourcemanager.hostname

hadoop-senior01-levi.com


<!-- 任務(wù)歷史服務(wù)-->

?????

??????????? yarn.log.server.url

??????????? http://hadoop-senior00-levi.com:19888/jobhistory/logs/



hdfs-site.xml

<!-- 指定seconddaryNameNode地址,主要這個(gè)是避免NameNode掛了? -->

dfs.namenode.secondary.http-address

hadoop-senior02-levi.com:50090


<!-- 指定name.dir,默認(rèn)就是,但是避免未啟用,設(shè)置一下-->

?dfs.namenode.name.dir

?/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp/name



mapred-site.xml

<!-- 指定mr運(yùn)行在yarn上-->

mapreduce.framework.name

yarn


<!-- 配置 MapReduce JobHistory Server 地址 ,默認(rèn)端口10020-->

????? mapreduce.jobhistory.address

????? hadoop-senior00-levi.com:10020


<!-- 配置 MapReduce JobHistory Server web ui 地址, 默認(rèn)端口19888-->

????? mapreduce.jobhistory.webapp.address

????? hadoop-senior00-levi.com:19888




slaves

hadoop-senior00-levi.com

hadoop-senior01-levi.com

hadoop-senior02-levi.com



Hadoop-HA

Hadoop為什么要有HA?

????? 我們都知道NameNode是存儲(chǔ)了所有數(shù)據(jù)的路徑,在Hadoop第一個(gè)版本是沒有HA,單臺(tái)的NameNode節(jié)點(diǎn)掛了,那么整個(gè)數(shù)據(jù)就沒辦法訪問了,

????? 那個(gè)的工程師就自己寫一個(gè)腳本去解決這個(gè)問題,定時(shí)的拷貝NameNode的fsimage和edits到別的服務(wù)器,但是數(shù)據(jù)量大的時(shí)候,拷貝就很慢了,而且工程師半夜正在和周公下棋的時(shí)候,NameNode掛了,那就很尷尬了。

????? 雖然可以到第二天早上來恢復(fù),但是數(shù)據(jù)量那么大的時(shí)候,太慢了,滿足不了需求。

????? 所以Hadoop為了解決這個(gè)問題,在后面的版本繼承了HA(高可用)。


Hadoop-HA是什么?

????? Hadoop-HA(高可用)就是在一臺(tái)服務(wù)器掛了,第二臺(tái)服務(wù)器可以馬上頂上去。

????? 兩個(gè)基本問題:

1、第一臺(tái)服務(wù)器和第二臺(tái)服務(wù)器的數(shù)據(jù)必須要同步。

Hadoop-HA通過edits-log的變化,來將數(shù)據(jù)寫入到JournalNode節(jié)點(diǎn)里面去,以分享給其他的NameNode。

2、要解決第一臺(tái)和第二臺(tái)服務(wù)器同時(shí)啟用的情況,在這種情況下,子節(jié)點(diǎn)怎么提交數(shù)據(jù),會(huì)提交到兩臺(tái)服務(wù)器,但是又會(huì)出現(xiàn)搶占資源的情況,(給一個(gè)人送東西和給兩個(gè)人送東西所耗費(fèi)的體力是不言而喻的),

這個(gè)問題在Hadoop-HA中稱為腦裂,借助第三方框架(Zookeeper)實(shí)現(xiàn)隔離機(jī)制來解決腦裂這個(gè)問題。


Hadoop–HA 實(shí)現(xiàn)

NameNode

hdfs-site.xml


???

???????dfs.replication

???? 3




????

????????? dfs.namenode.secondary.http-address

????????? hadoop104:50090




????

???? ?dfs.namenode.checkpoint.period

???? ?120




????

???? ?dfs.namenode.name.dir

???? ?/opt/module/hadoop-2.7.2/data/tmp/dfs/name




????

???? ? ?? dfs.hosts

???? ???? /opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts




????

???? ?dfs.hosts.exclude

???? ?/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude




???

????????? dfs.nameservices

????????? mycluster



???

????????? dfs.ha.namenodes.mycluster

????????? nn1,nn2



???

????????? dfs.namenode.rpc-address.mycluster.nn1

????????? hadoop102:8020



???

????????? dfs.namenode.rpc-address.mycluster.nn2

????????? hadoop103:8020



???

????????? dfs.namenode.http-address.mycluster.nn1

????????? hadoop102:50070



???

????????? dfs.namenode.http-address.mycluster.nn2

????????? hadoop103:50070



???

????????? dfs.namenode.shared.edits.dir

????????? qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster



???

????????? dfs.ha.fencing.methods

????????? sshfence



???

????????? dfs.ha.fencing.ssh.private-key-files

????????? /home/levi/.ssh/id_rsa



???

????????? dfs.journalnode.edits.dir

????????? /opt/module/hadoop/data/jn



???

????????? dfs.permissions.enable

????????? false




???

????????? dfs.client.failover.proxy.provider.mycluster

???? ???? org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider



????

????????? dfs.ha.automatic-failover.enabled

????????? true


core-site.xml



????????? fs.defaultFS

???? ??????hdfs://hadoop102:8020

???? -->



????

????????? fs.defaultFS

????????? hdfs://mycluster




????

????????? hadoop.tmp.dir

????????? /opt/module/hadoop-2.7.2/data/tmp




????

???? ?fs.trash.interval

???? ?1



???? ?

???? ?hadoop.http.staticuser.user

???? ?levi




????

????????? ha.zookeeper.quorum

????????? hadoop102:2181,hadoop103:2181,hadoop104:2181


ResourceManager

hdfs-site.xml



????????? yarn.resourcemanager.hostname

????????? hadoop103

???? -->



???

???????yarn.nodemanager.aux-services

???????mapreduce_shuffle




???

???????yarn.log-aggregation-enable

??????? true



???

???????yarn.log.server.url

???????http://hadoop103:19888/jobhistory/logs/



???

???????yarn.log-aggregation.retain-seconds

??????? 86400




???

???????yarn.resourcemanager.ha.enabled

??????? true




???

???????yarn.resourcemanager.cluster-id

??????? cluster-yarn1



???

???????yarn.resourcemanager.ha.rm-ids

??????? rm1,rm2




???

???????yarn.resourcemanager.hostname.rm1

?????? ?hadoop103




???

???????yarn.resourcemanager.hostname.rm2

??????? hadoop104




???

???????yarn.resourcemanager.zk-address

???????hadoop102:2181,hadoop103:2181,hadoop104:2181




???

???????yarn.resourcemanager.recovery.enabled

??????? true




???

???????yarn.resourcemanager.store.class????

????????? org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 首先,我們?cè)谑褂们跋瓤纯碒DFS是什麼?這將有助于我們是以后的運(yùn)維使用和故障排除思路的獲得。 HDFS采用mast...
    W_Bousquet閱讀 4,456評(píng)論 0 2
  • HDFS入門 hadoop架構(gòu) Hadoop 1.0中的資源管理方案 Hadoop 1.0指的是版本為Apache...
    依天立業(yè)閱讀 1,262評(píng)論 0 1
  • 一、系統(tǒng)參數(shù)配置優(yōu)化 1、系統(tǒng)內(nèi)核參數(shù)優(yōu)化配置 修改文件/etc/sysctl.conf,添加如下配置,然后執(zhí)行s...
    張偉科閱讀 3,923評(píng)論 0 14
  • 翻譯: https://www.cloudera.com/documentation/enterprise/lat...
    金剛_30bf閱讀 2,833評(píng)論 1 1
  • 《父愛如山》 作者:你的父親覺空 仁慈出義子, 愛在厲嚴(yán)生, 如存心疑惑, 山水共月明。
    管雪彤閱讀 107評(píng)論 0 0

友情鏈接更多精彩內(nèi)容