首先說明,本篇文章的內(nèi)容不是原創(chuàng),因為原文質(zhì)量比較好,因此想進一步的歸納總結(jié), 原文鏈接 https://www.cnblogs.com/zackstang/p/11435039.html
hive on tez的map的數(shù)量計算
在hive里面執(zhí)行一個query時,我們可以發(fā)現(xiàn),hive的執(zhí)行引擎在使用tez和MR的時候,兩者生成的mapper數(shù)量差異較大。主要的原因是tez會對inputsplit做grouping的操作,將多個inputsplit組合成為更少的groups,然后為每一個group生成一個mapper任務(wù),而不是像MR一樣,為每一個inoutsplit生成一個mapper任務(wù)。下面,我們通過日志來分析一下整個過程。
1.MR模式
在MR模式下生成了116個

對應(yīng)的日志為:
Input size for job job_1566964005095_0003 = 31733311148. Number of splits = 116
在MR中,使用的是hadoop的FileInoutFormat,所以諾是一個文件大于一個Block的大小,則會被切分為多個inputSplit;諾是一個文件小于一個block的大小,則為一個inputsplit。在這個案例中國,文件個數(shù)為14個,每個為2.1G,一共是29.4G大小,生成的inputSplit數(shù)為116個,也就是說,這里的inputSplit為256M,這里要說明一下,MR中的默認的Input的spilt是128M,但是我們可以修改,通常修改為2的倍數(shù),在這里HIve的inputSplitSize為256M。
2.Tez模式
而在tez模式下,生成了map任務(wù)為32個

首先可以看到要處理的文件是14個,初始的splits的個數(shù)為476個(意味著,在Tez中設(shè)置的splitSize為64M)
日志如下:
|mapred.FileInputFormat|: Total input files to process : 14
|io.HiveInputFormat|: number of splits 476
獲取到splits為476個之后,Driver開始計算可用的slots(container)數(shù),這里計算得到3個slots,并打印waves為1.7,
在這個場景中,,集群的資源是8Vcores,12G內(nèi)存,tez的capacity_scheduler中指定的user limit factor是0.5,也就是說最多可用內(nèi)存為6個,在tez Driver中,申請的container資源的單位為Default Resources=<memory:1536, vCores:1>,這里要看Default Resource
所以理論上,可以申請到的container的個數(shù)為4個(6G/1536M=4),但是AM需要占用一個,所以用于計算的available slot(container)為3個。
在計算出可用的slots為3個后,Tez使用的split-waves(由tez.grouping.split-waves指定,默認值為1.7)計算出指定的預(yù)估的Map任務(wù)數(shù)目為 3*1.7=5 個task,對應(yīng)的日志如下,
|tez.HiveSplitGenerator|: Number of input splits: 476. 3 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat
|tez.SplitGrouper|: Estimated number of tasks: 5 for bucket 1
Grouping Input Splits
在tez分配任務(wù)的時候,不會像MR為每個split生成一個map任務(wù),而會將多個split進行g(shù)rouping。讓map任務(wù)可以更高效的完成。首先tez會根據(jù)計算得到 estimate number of tasks = 5 預(yù)估task的個數(shù)為5,這個5是有3個slot*split-waves1.7得到的,將476個splits聚合為5個SplitGroup,生成5個mapper執(zhí)行任務(wù)。
但是這里還需要考慮另外的一個值:lengthPerGroup,,tez會檢查lengthPerGroup是否在Tez.grouping.min-size(默認值為50,)和tez.grouping.max-size(默認為1G)之間。如果超過max-size,則將lengthPerGroup設(shè)置為max-size,如果小于min-size,則將lenghtPerGroup設(shè)置為min-size。
在這樣的場景下,數(shù)據(jù)總大小為29.5GB左右,預(yù)估為5個Group,則每個group需要處理的數(shù)據(jù)為5.9G,超過了Max splitLenght = 1G 所以 lengthPreGroup的大小為1G,splitLength的大小也要按照1G來劃分,計算出所需要的numsplits為30個左右,每個splitGroup為1G。
在計算出每個split group為1G后, 由于原來的splits的總數(shù)為476個,所以需要將476個split進行g(shù)rouping的操作,是的每個group大小為1個左右,按照這個方法,逾期的split個數(shù)為30個,(這里的30的結(jié)果是預(yù)估,沒有考慮文件個數(shù)等,所以不一定是最終的splits),且最終計算出每個group包含原先的15個split,也就是numSplitsInGroup=15,相關(guān)日志如下:
|grouper.TezSplitGrouper|: Grouping splits in Tez
|grouper.TezSplitGrouper|: Desired splits: 5 too small. Desired splitLength: 6346662229 Max splitLength: 1073741824 New desired splits: 30 Total length: 31733311148 Original splits: 476
|grouper.TezSplitGrouper|: Desired numSplits: 30 lengthPerGroup: 1057777038 numLocations: 1 numSplitsPerLocation: 476 numSplitsInGroup: 15 totalLength: 31733311148 numOriginalSplits: 476 . Grouping by length: true count: false nodeLocalOnly: false
原先總數(shù)為476,對splits進行g(shù)roup之后,每個group包含15split,最終group的數(shù)目計算為476/15=32個,也就是最終生成的mapper數(shù)量
日志如下:
|tez.SplitGrouper|: Original split count is 476 grouped split count is 32, for bucket: 1
|tez.HiveSplitGenerator|: Number of split groups: 32
所以在Tez中,inputSplit 數(shù)目雖然是476個,但是最終僅生成了32個map任務(wù)用于處理所有的 475個inputSplits,減少了過多mapper任務(wù)會帶來的額外開銷。
split waves
這里為什么要定義一個split waves,使用這個值可以讓Driver申請更多的Container,比如本場景中,本來僅僅只有3個slots可以使用,但是會根據(jù)這個乘數(shù)在多申請兩個Container的。但是這樣做的原因是什么呢。
- 首先他可以讓分配資源更靈活,:比如集群之后新增了計算節(jié)點,其他任務(wù)十分了資源等,所有即使有部分的map任務(wù)在等待資源,它們也會在后續(xù)被分配到資源的。
2.將數(shù)據(jù)分配給跟多的mapper,提高程序執(zhí)行的并行度,減少map任務(wù)處理的數(shù)據(jù)量,并緩解由于少部分map任務(wù)執(zhí)行較慢,導(dǎo)致的任務(wù)整體變慢的情況。