大家好,說到做到,昨天分享了Yarn的資源調(diào)度的流程,我也畫了一個(gè)圖來說明,今天分享一下Yarn資源調(diào)度的源碼分析
MR程序什么時(shí)候提價(jià)給集群呢?也就是什么時(shí)候提交給Yarn,讓Yarn來調(diào)度呢?
就是代碼里的這一句話:

點(diǎn)進(jìn)入看看Job的這個(gè)方法,如下圖

進(jìn)入到sumbit方法

進(jìn)入connect看看做了什么事情

為cluster賦值,那么cluster是什么呢?

看著UserGroupInformation這個(gè)了嗎?這個(gè)就是對吧當(dāng)前的用戶和hadoop的用戶是不是一個(gè)用戶,就在做本地代碼鏈接hadoop集群的時(shí)候報(bào)過一個(gè)用戶不對的異常,就是在這里報(bào)出來的,有興趣的可以進(jìn)去看看,很簡單,這里不再贅述,繼續(xù),看最重要的
this.initialize(jobTrackAddr, conf);
這一句話

這里遍歷拿到clientProtocol是Yarn,因?yàn)樵陧?xiàng)目下有site.xml配置文件,看代碼是怎么實(shí)現(xiàn)的
由于上傳圖片出現(xiàn)問題,直接粘貼代碼看
public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get("mapreduce.framework.name", "local");
if(!"local".equals(framework)) {
return null;
} else {
conf.setInt("mapreduce.job.maps", 1);
return new LocalJobRunner(conf);
}
}
這里寫的很清楚,就是從配置文件中獲取mapreduce.framework.name,配置文件中配置了這個(gè)是yarn,這個(gè)cluster就是yarn的RPC的客戶端,通過cluster遠(yuǎn)程調(diào)用yarn的server的相關(guān)方法。hadoop中大量使用了RPC這也是分布式中必須要使用到的
繼續(xù),接下來就要和yarn通訊了,拿到上傳jar包的上傳路徑,這個(gè)上次分享的時(shí)候說過的,就是
submitter.submitJobInternal(Job.this, Job.this.cluster);
這句話就完成了與yarn通訊
進(jìn)去,這么一句話,很熟悉
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
在提交的時(shí)候的那個(gè)路徑stage就是從這里獲取的
接下來就是copy到HDFS上面
this.copyAndConfigureFiles(job, submitJobDir);
這就話就是將job的相關(guān)信息,拷貝到那個(gè)上傳的路徑下,并且默認(rèn)寫10個(gè)副本,為了從本地獲取job信息,為了不走網(wǎng)絡(luò),執(zhí)行快,這里就是一個(gè)優(yōu)化點(diǎn),如果集群比較大,可是將mapreduce.client.submit.file.replication設(shè)置的大一點(diǎn),為了從本地直接讀取job的信息,這一點(diǎn)很重要,來來來,劃重點(diǎn),記住
short replication = (short)conf.getInt("mapreduce.client.submit.file.replication", 10);
提交上去之后到HDFS下看一下文件
[songlj@my01 ~]$ hadoop fs -ls /tmp/hadoop-yarn/staging/songlj/.staging/job_1524231875809_0046
-rw-r--r-- 10 songlj supergroup 109 2018-04-20 11:38 /tmp/hadoop-yarn/staging/songlj/.staging/job_1524231875809_0046/job.split
-rw-r--r-- 3 songlj supergroup 18 2018-04-20 11:38 /tmp/hadoop-yarn/staging/songlj/.staging/job_1524231875809_0046/job.splitmetainfo
-rw-r--r-- 3 songlj supergroup 77459 2018-04-20 11:38 /tmp/hadoop-yarn/staging/songlj/.staging/job_1524231875809_0046/job.xml
那么生成幾個(gè)map是誰控制的呢?確切的說是MR程序,因?yàn)橄旅孢@句話在MapReduce包下,還沒有在Yarn的相關(guān)包里,這個(gè)大家記住,來來來,劃重點(diǎn)了,這個(gè)也是大部分大數(shù)據(jù)的開發(fā)人員,忽略的地方,自然而然的認(rèn)為是ResourceManager控制的,其實(shí)不然,雖然知識點(diǎn)很小,但是這就是檢驗(yàn)是否看沒有看過源碼的驗(yàn)金石。
int maps = this.writeSplits(job, submitJobDir);
這個(gè)就是根據(jù)split(切片)來確定生成map的數(shù)量,這個(gè)split生成是有講究的,大致是一個(gè)block就是一個(gè)split,這是一般情況,還要看你的設(shè)置,默認(rèn)的情況下就是一個(gè)block就是一個(gè)split,有興趣可以進(jìn)去看一下,這里就不在贅述
接下來就是很重要的,就是shuffle,那么根據(jù)昨天畫的圖接下來應(yīng)該是第5步,將job加入隊(duì)列,接下來是由NodeManger領(lǐng)取任務(wù),接下來就是有Resource生成MRAppMaster,Mapper和Resource的調(diào)度其實(shí)是由這個(gè)master來協(xié)調(diào)的,這里就有一個(gè)重要概念就是shuffle,盜用一個(gè)很經(jīng)典的圖

這個(gè)圖如果理解透徹了,對理解其他運(yùn)算框架storm,spark都有很大的幫助,言歸正傳,解釋一下這個(gè)圖
1,圖中的bugger in memory顧名思義就是內(nèi)存中的一個(gè)緩沖區(qū),其默認(rèn)的100M,這里還是一個(gè)優(yōu)化點(diǎn),就是如果服務(wù)器的內(nèi)存比較大,這個(gè)值可以設(shè)置的大一點(diǎn),能內(nèi)存存儲運(yùn)行盡量內(nèi)存,寫入disk需要時(shí)間,浪費(fèi)cpu資源,io等;默認(rèn)閾值是0.8,就是說如果默認(rèn)的話,就是當(dāng)memory達(dá)到80M的時(shí)候就會觸發(fā)寫入disk操作
2,圖中的partition sort and spill disk,這個(gè)就是磁盤上的文件,先要分區(qū),排序,分組后再寫入磁盤,這里disk上也是一個(gè)個(gè)的小文件,放一個(gè)文件寫滿了,就會往另一個(gè)文件中寫
3,圖中的merge on disk就是合并后寫入disk,也是有分區(qū)和排序的。從這里就可以看出MR中有好多磁盤的io,分組排序,這樣就會占用很大的資源,這也注定了hadoop做事實(shí)的分析,流計(jì)算的能力就不是很好,沒有spark和storm強(qiáng),但是hadoop對海量的離線數(shù)據(jù)的分析還是很厲害的,這也是hadoop沒有被淘汰的一個(gè)很重要的原因,鼻祖能被淘汰嗎?我想到了諾基亞,開個(gè)玩笑
4,圖中綠色的箭頭other maps這個(gè)說明這張圖的意思是有4個(gè)map同時(shí)執(zhí)行的,并且每一個(gè)都分了三個(gè)partions,那么就會生成3個(gè)reduces,reduces的數(shù)量可以在代碼中繼承修改的。這個(gè)以后分享說明,這里不再贅述,知道就行了
5,問題來了,map執(zhí)行完了,reduce task怎么知道他要從哪里才能獲取到第一個(gè)partions的數(shù)據(jù)呢?這個(gè)是由MRAppMaster來調(diào)度的,前面也有說明,master會收到map執(zhí)行完的反饋信息后,將map執(zhí)行結(jié)果以及map的host都會告訴reduce,reduce通過網(wǎng)絡(luò)將數(shù)據(jù)拷貝到reduce所在的服務(wù)器上
6,圖中reduce Task中進(jìn)行了好多merge合并,這期間也產(chǎn)生了好多磁盤的io,最終output出去,可以使HDFS,也可以是數(shù)據(jù)庫,也可以是本地文件等等
好了,基本上也差不多了
望指正,不吝賜教!