groupByKey
val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))

groupByKey的性能,相對(duì)來(lái)說(shuō),是有問(wèn)題的
因?yàn)?,它是不?huì)進(jìn)行本地聚合的,而是原封不動(dòng)的,把ShuffleMapTask的輸出,拉取到ResultTask的內(nèi)存中,所以這樣的話,會(huì)導(dǎo)致,所有的數(shù)據(jù),都要進(jìn)行網(wǎng)絡(luò)傳輸,從而導(dǎo)致網(wǎng)絡(luò)傳輸?shù)男阅荛_(kāi)銷(xiāo)很大
但是,有些場(chǎng)景下,用其他算法實(shí)現(xiàn)不了的,比如reduceByKey,sortByKey,countByKey實(shí)現(xiàn)不了的話,還是只能用groupByKey().map()來(lái)實(shí)現(xiàn),比如可能你需要拿到某個(gè)key對(duì)應(yīng)的所有的value,進(jìn)行自定義的業(yè)務(wù)邏輯處理
reduceByKey
val counts = pairs.reduceByKey(_ + _)

HashShuffleWriter的writer()方法,是先判斷了一下,如果是isMapCombined,那么就在本地進(jìn)行聚合,聚合之后,再寫(xiě)入磁盤(pán)文件
對(duì)于,僅僅是要對(duì)key對(duì)應(yīng)的values進(jìn)行聚合為一個(gè)值的場(chǎng)景,用reduceByKey是非常合適的,因?yàn)闀?huì)先在ShuffleMapTask端寫(xiě)入本地磁盤(pán)文件的時(shí)候,進(jìn)行本地聚合,再寫(xiě)入磁盤(pán)文件,此時(shí),就會(huì)導(dǎo)致數(shù)據(jù)量大幅度縮減,甚至可能達(dá)到數(shù)據(jù)量縮減了幾倍,甚至十幾倍、幾十倍的程度
這樣的話,也就相當(dāng)于,ShuffleMapTask端的數(shù)據(jù),傳輸?shù)絉educeTasl端的數(shù)據(jù),數(shù)據(jù)量大幅度縮減,性能大幅度增加,甚至達(dá)到減少數(shù)據(jù)量的時(shí)間,幾倍、十幾倍、幾十倍
如果能用reduceByKey,那就用reduceByKey,因?yàn)樗鼤?huì)在map端,先進(jìn)行本地combine,可以大大減少要傳輸?shù)絩educe端的數(shù)據(jù)量,減小網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷(xiāo)。
只有在reduceByKey處理不了時(shí),才用groupByKey().map()來(lái)替代。