Hive on Tez 中 Map 任務(wù)的數(shù)量計(jì)算

Hive on Tez 中的Mapper 數(shù)量計(jì)算

在Hive 中執(zhí)行一個query時,我們可以發(fā)現(xiàn)Hive 的執(zhí)行引擎在使用 Tez 與 MR時,兩者生成mapper數(shù)量差異較大。主要原因在于 Tez 中對 inputSplit 做了 grouping 操作,將多個 inputSplit 組合成更少的 groups,然后為每個 group 生成一個 mapper 任務(wù),而不是為每個inputSplit 生成一個mapper 任務(wù)。下面我們通過日志分析一下這中間的整個過程。


調(diào)整inputFormt為CombineInputFormat合并小文件可以有效減少作業(yè)中Mapper的數(shù)量。

  1. MapReduce模式

在 mr 模式下,生成的container數(shù)為116個:

image

對應(yīng)日志條目為:

Input size for job job_1566964005095_0003 = 31733311148. Number of splits = 116

在MR中,使用的是Hadoop中的FileInputFormat,所以若是一個文件大于一個block的大小,則會切分為多個InputSplit;若是一個文件小于一個block大小,則為一個InputSplit。

在這個例子中,總文件個數(shù)為14個,每個均為2.1GB,一共29.4GB大小。生成的InputSplit數(shù)為116個,也就是說,每個block(這個場景下InputSplit 大小為一個block大小)的大小大約為256MB。

2.Tez模式

而在Tez模式下,生成的map任務(wù)為32個:

生成split groups的相關(guān)日志如下:
mapred.FileInputFormat|: Total input files to process : 14
io.HiveInputFormat|: number of splits 476
tez.HiveSplitGenerator|: Number of input splits: 476. 3 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat

tez.SplitGrouper|: # Src groups for split generation: 2
tez.SplitGrouper|: Estimated number of tasks: 5 for bucket 1
grouper.TezSplitGrouper|: Grouping splits in Tez
|grouper.TezSplitGrouper|: Desired splits: 5 too small. Desired splitLength: 6346662229 Max splitLength: 1073741824 New desired splits: 30 Total length: 31733311148 Original splits: 476
|grouper.TezSplitGrouper|: Desired numSplits: 30 lengthPerGroup: 1057777038 numLocations: 1 numSplitsPerLocation: 476 numSplitsInGroup: 15 totalLength: 31733311148 numOriginalSplits: 476 . Grouping by length: true count: false nodeLocalOnly: false
|grouper.TezSplitGrouper|: Doing rack local after iteration: 32 splitsProcessed: 466 numFullGroupsInRound: 0 totalGroups: 31 lengthPerGroup: 793332736 numSplitsInGroup: 11
|grouper.TezSplitGrouper|: Number of splits desired: 30 created: 32 splitsProcessed: 476
|tez.SplitGrouper|: Original split count is 476 grouped split count is 32, for bucket: 1
|tez.HiveSplitGenerator|: Number of split groups: 32

Avaiable Slots

首先可以看到,需要處理的文件數(shù)為14,初始splits數(shù)目為476(即意味著在這個場景下,一個block的大小約為64MB)。對應(yīng)日志條目如下:

|mapred.FileInputFormat|: Total input files to process : 14

|io.HiveInputFormat|: number of splits 476

獲取到splits的個數(shù)為476個后,Driver開始計(jì)算可用的slots(container)數(shù),這里計(jì)算得到3個slots,并打印了默認(rèn)的waves值為1.7。

在此場景中,集群一共資源為 8 vcore,12G 內(nèi)存,capacity-scheduler中指定的user limit factor 為0.5,也就是說:當(dāng)前用戶能使用的資源最多為 6G 內(nèi)存。在Tez Driver中,申請的container 資源的單位為: Default Resources=<memory:1536, vCores:1>

所以理論上可以申請到的container 數(shù)目為4(6G/1536MB = 4)個,而由于 Application Master 占用了一個container,所以最終available slots為3個。

在計(jì)算出了可用的slots為3個后,Tez 使用split-waves 乘數(shù)(由tez.grouping.split-waves指定,默認(rèn)為1.7)指定“預(yù)估”的Map 任務(wù)數(shù)目為:3 × 1.7 = 5 個tasks。對應(yīng)日志條目如下:

|tez.HiveSplitGenerator|: Number of input splits: 476. 3 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat

|tez.SplitGrouper|: Estimated number of tasks: 5 for bucket 1

Grouping Input Splits

在Tez分配任務(wù)時,不會像mr那樣為每個split生成一個map任務(wù),而是會將多個split進(jìn)行g(shù)rouping,讓map任務(wù)更高效地的完成。首先Tez會根據(jù)計(jì)算得到的 estimated number of tasks = 5,將splits聚合為5個Split Group,生成5個mapper執(zhí)行任務(wù)。

但是這里還需要考慮另一個值:lengthPerGroup。Tez會檢查lengthPerGroup是否在 tez.grouping.min-size (默認(rèn)為50MB)以及 tez.grouping.max-size(默認(rèn)為1GB) 定義范圍內(nèi)。如果超過了max-size,則指定lengthPerGroup為max-size,如果小于min-size,則指定lengthPerGroup為min-size。

在這個場景下,數(shù)據(jù)總大小為 31733311148 bytes(29.5GB左右,也是原數(shù)據(jù)大?。A(yù)估為5個Group, 則每個Group的 splitLength為6346662229 bytes(5.9GB 左右),超過了 Max splitLength = 1073741824 bytes( 1GB),所以重新按 splitLength = 1GB 來算,計(jì)算出所需的numSplits 數(shù)為 30 個,每個Split Group的大小為1GB。

在計(jì)算出了每個Split Group的大小為1GB后,由于原Split總數(shù)目為476,所以需要將這476個inputSplit進(jìn)行g(shù)rouping,使得每個Group的大小大約為1GB左右。按此方法計(jì)算,預(yù)期的splits數(shù)目應(yīng)為30個(但是僅是通過總數(shù)據(jù)大小/lengthPerGroup得出,尚未考慮inputSplits如何合并的問題,不一定為最終生成的map tasks數(shù)目)。且最終可計(jì)算得出每個group中可以包含15個原split,也就是numSplitsInGroup = 15。相關(guān)日志條目如下:

|grouper.TezSplitGrouper|: Grouping splits in Tez

|grouper.TezSplitGrouper|: Desired splits: 5 too small. Desired splitLength: 6346662229 Max splitLength: 1073741824 New desired splits: 30 Total length: 31733311148 Original splits: 476

|grouper.TezSplitGrouper|: Desired numSplits: 30 lengthPerGroup: 1057777038 numLocations: 1 numSplitsPerLocation: 476 numSplitsInGroup: 15 totalLength: 31733311148 numOriginalSplits: 476 . Grouping by length: true count: false nodeLocalOnly: false

原splits總數(shù)目為 476,在對splits進(jìn)行g(shù)rouping時,每個group中將會包含15個inputSplits,所以最終可以計(jì)算出的group數(shù)目為 476/15 = 32 個,也就是最終生成的mapper數(shù)量。

|tez.SplitGrouper|: Original split count is 476 grouped split count is 32, for bucket: 1

|tez.HiveSplitGenerator|: Number of split groups: 32

所以在Tez中,inputSplit 數(shù)目雖然是476個,但是最終僅生成了32個map任務(wù)用于處理所有的 475個inputSplits,減少了過多mapper任務(wù)會帶來的額外開銷。

Split Waves

這里為什么要定義一個split waves值呢?使用此值之后會讓Driver申請更多的container,比如此場景中本來僅有3個slots可用,但是會根據(jù)這個乘數(shù)再多申請2個container資源。但是這樣做的原因是什么呢?

1.首先它可以讓分配資源更靈活:比如集群之后添加了計(jì)算節(jié)點(diǎn)、其他任務(wù)完成后釋放了資源等。所以即使剛開始會有部分map任務(wù)在等待資源,它們在后續(xù)也會很快被分配到資源執(zhí)行任務(wù)

  1. 將數(shù)據(jù)分配給更多的map任務(wù)可以提高并行度,減少每個map任務(wù)中處理的數(shù)據(jù)量,并緩解由于少部分map任務(wù)執(zhí)行較慢,而導(dǎo)致的整體任務(wù)變慢的情況
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容