2018.03.29_01
問題描述
如何成為技術(shù)大牛
根據(jù)阿里的分享
do more
do better
do exercise
如何成為技術(shù)大牛
2018.03.29_02
問題描述
淘寶如何保持寶貝數(shù)量的一致性
思路
淘寶寶貝數(shù)量先減去購賣數(shù),不為0就可以同步處理,處理失敗再加回來。如果寶貝數(shù)量減到一,則競(jìng)爭(zhēng)。
2018.03.28_01
問題描述
worker 與 executor 線程和進(jìn)程的相關(guān)理解,以及引申為spark 分區(qū)等概念
相關(guān)思路討論
executor 是進(jìn)程,在其中執(zhí)行的task 是線程,spark 所謂的多線程,是一個(gè)executor 中可以多個(gè)task 執(zhí)行。
1.1 MR 采用了多進(jìn)程模型,Spark采用了多線程模式,這里指的是同一個(gè)節(jié)點(diǎn)上多個(gè)任務(wù)的運(yùn)行模式。因?yàn)闊o論MR 和 Spark 整體上看,都是多進(jìn)程:MR是由多個(gè)獨(dú)立的Task 進(jìn)程組成,Spark 應(yīng)用程序的運(yùn)行環(huán)境是由多個(gè)獨(dú)立的Executor 進(jìn)程構(gòu)建的臨時(shí)資源池構(gòu)成的。
多進(jìn)程模型便于細(xì)粒度控制每個(gè)任務(wù)占用的資源,但會(huì)消耗較多的啟動(dòng)時(shí)間,不適合運(yùn)行低延遲類型的作業(yè),這是MapReduce廣為詬病的原因之一。而多線程模型則相反,該模型使得Spark很適合運(yùn)行低延遲類型的作業(yè)??傊?,Spark同節(jié)點(diǎn)上的任務(wù)以多線程的方式運(yùn)行在一個(gè)JVM進(jìn)程中,可帶來以下好處:
1)任務(wù)啟動(dòng)速度快,與之相反的是MapReduce Task進(jìn)程的慢啟動(dòng)速度,通常需要1s左右;
2)同節(jié)點(diǎn)上所有任務(wù)運(yùn)行在一個(gè)進(jìn)程中,有利于共享內(nèi)存。這非常適合內(nèi)存密集型任務(wù),尤其對(duì)于那些需要加載大量詞典的應(yīng)用程序,可大大節(jié)省內(nèi)存。
3)同節(jié)點(diǎn)上所有任務(wù)可運(yùn)行在一個(gè)JVM進(jìn)程(Executor)中,且Executor所占資源可連續(xù)被多批任務(wù)使用,不會(huì)在運(yùn)行部分任務(wù)后釋放掉,這避免了每個(gè)任務(wù)重復(fù)申請(qǐng)資源帶來的時(shí)間開銷,對(duì)于任務(wù)數(shù)目非常多的應(yīng)用,可大大降低運(yùn)行時(shí)間。與之對(duì)比的是MapReduce中的Task:每個(gè)Task單獨(dú)申請(qǐng)資源,用完后馬上釋放,不能被其他任務(wù)重用,盡管1.0支持JVM重用在一定程度上彌補(bǔ)了該問題,但2.0尚未支持該功能。
盡管Spark的過線程模型帶來了很多好處,但同樣存在不足,主要有:
1)由于同節(jié)點(diǎn)上所有任務(wù)運(yùn)行在一個(gè)進(jìn)程中,因此,會(huì)出現(xiàn)嚴(yán)重的資源爭(zhēng)用,難以細(xì)粒度控制每個(gè)任務(wù)占用資源。與之相反的是MapReduce,它允許用戶單獨(dú)為Map Task和Reduce Task設(shè)置不同的資源,進(jìn)而細(xì)粒度控制任務(wù)占用資源量,有利于大作業(yè)的正常平穩(wěn)運(yùn)行
ref: 董西城的解釋spark分區(qū)數(shù),task數(shù)目,core數(shù),worker節(jié)點(diǎn)個(gè)數(shù),excutor數(shù)量梳理
輸入可能以多個(gè)文件的形式存儲(chǔ)在HDFS上,每個(gè)File都包含了很多塊,稱為Block。
當(dāng)Spark讀取這些文件作為輸入時(shí),會(huì)根據(jù)具體數(shù)據(jù)格式對(duì)應(yīng)的InputFormat進(jìn)行解析,一般是將若干個(gè)Block合并成一個(gè)輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。
隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對(duì)應(yīng)的關(guān)系。
隨后這些具體的Task每個(gè)都會(huì)被分配到集群上的某個(gè)節(jié)點(diǎn)的某個(gè)Executor去執(zhí)行。
- 每個(gè)節(jié)點(diǎn)可以起一個(gè)或多個(gè)Executor。
- 每個(gè)Executor由若干core組成,每個(gè)Executor的每個(gè)core一次只能執(zhí)行一個(gè)Task。
- 每個(gè)Task執(zhí)行的結(jié)果就是生成了目標(biāo)RDD的一個(gè)partiton。
這里的core是虛擬的core而不是機(jī)器的物理CPU核,可以理解為就是Executor的一個(gè)工作線程。
至于partition的數(shù)目:
- 對(duì)于數(shù)據(jù)讀入階段,例如sc.textFile,輸入文件被劃分為多少InputSplit就會(huì)需要多少初始Task。
- 在Map階段partition數(shù)目保持不變。
- 在Reduce階段,RDD的聚合會(huì)觸發(fā)shuffle操作,聚合后的RDD的partition數(shù)目跟具體操作有關(guān),例如repartition操作會(huì)聚合成指定分區(qū)數(shù),還有一些算子是可配置的。
ref 梳理
2.1 一種理解:
executor 不是越多越好,假設(shè)你給spark 分配總的資源 為48 個(gè)vcore
(這個(gè)不是物理的CPU核,一個(gè)物理CPU核可以劃分成多個(gè)vcore,是為了更細(xì)粒度的資源控制,
這樣對(duì)應(yīng)小任務(wù)較多情況,能提升資源利用率,打個(gè)比方,我一個(gè)CPU能分成4個(gè)vcore 和 1 個(gè)vcore,
我任務(wù)1,2,3,4 其實(shí)都只要使用1個(gè)vcore 即1/4 個(gè)物理CPU,在分成4個(gè)vcore 的時(shí)候,
一個(gè)CPU能并行處理4個(gè)任務(wù),在分1個(gè)vcore時(shí),只能串行)
然后96G 的內(nèi)存
及 48c 96g
假設(shè)你的參數(shù)如下:
num-executor 8
executor-cores 8
8*8 =64 > 48 了,這個(gè)時(shí)候,只會(huì)給你 48/8 =6 個(gè)executor
同理對(duì)于
num-executor 8
executor-menory 18
8*18=144 > 96 了, 這個(gè)時(shí)候只會(huì)給你 96/8 =12 個(gè)executor
當(dāng) cores 和 menory 都超的時(shí)候,取小的,總之資源不能超總的。
在每個(gè)stage的時(shí)候,會(huì)根據(jù)partition的數(shù)據(jù),劃分出task的數(shù)量,一個(gè)vcore 同一時(shí)間只能處理一個(gè)task
假設(shè)有4個(gè)executor ,每個(gè)executor 有4個(gè)vcore,即同時(shí)處理的task數(shù)量為16 (這是你集群的處理能力)
那么當(dāng)你的分區(qū),只有4個(gè)的時(shí)候,即只有4個(gè)task,就意味著你的集群資源是空著的,沒有利用滿。(最好reparation)
當(dāng)你的分區(qū)有32個(gè)的時(shí)候, 意味著需要 32/16 2輪能處理完。這時(shí)候,你再調(diào)大并行度,也沒有用,集群資源就那么多。
還有關(guān)于并行度的提高,也并不是越大越好。
1是小文件的問題
2是任務(wù)太小的話,啟動(dòng)任務(wù)的時(shí)間占比 相對(duì)任務(wù)的處理時(shí)間占比也會(huì)很高,這樣得不償失。
2.2 另一種思路
一般每個(gè)partition對(duì)應(yīng)一個(gè)task。在我的測(cè)試過程中,如果沒有設(shè)置spark.default.parallelism參數(shù),spark計(jì)算出來的partition非常巨大,與我的cores非常不搭。我在兩臺(tái)機(jī)器上(8cores *2 +6g * 2)上,spark計(jì)算出來的partition達(dá)到2.8萬個(gè),也就是2.9萬個(gè)tasks,每個(gè)task完成時(shí)間都是幾毫秒或者零點(diǎn)幾毫秒,執(zhí)行起來非常緩慢。在我嘗試設(shè)置了 spark.default.parallelism 后,任務(wù)數(shù)減少到10,執(zhí)行一次計(jì)算過程從minute降到20second
spark.default.parallelism 的說明見 說明
一句話,就是觸發(fā)shuffle 操作之后的默認(rèn)分區(qū)數(shù),相當(dāng)于手動(dòng)reparation
2018.03.28_02
問題描述
兩個(gè)巨大的表,默認(rèn)都要用reduce join. 且其中一個(gè)表中join依賴的相同key的數(shù)據(jù)大量重復(fù).考慮到數(shù)據(jù)傾斜,這個(gè)partitioner怎么實(shí)現(xiàn)?
思路
這是一個(gè)半連接的問題
- 先把重復(fù)比較厲害的key 過濾了,(合并了) 再做map側(cè)的join
ref:
如何利用spark快速計(jì)算笛卡爾積?
spark千萬數(shù)據(jù)join問題?
MapReduce表連接之半連接SemiJoin
2018.03.28_03
問題描述
spark sql 的partition 數(shù)目
思路
- 我是這么理解Spark 200G 內(nèi)存處理2T數(shù)據(jù)的。
打個(gè)比方,內(nèi)存為100G 有2000G 的數(shù)據(jù),一共10000個(gè)partition,每個(gè)partition就是200M (方便起見按1G=1000M 計(jì)算)
假設(shè)觸發(fā)shuffle的stage1,每個(gè)partition處理的約為200M數(shù)據(jù)。
默認(rèn)的配置,一個(gè)executor 1C2G。 2G內(nèi)存處理200M數(shù)據(jù),還是沒問題的。
這樣就有100/2=50 個(gè)executor,處理10000個(gè)task 一共需要10000/50 = 200輪。
stage1的數(shù)據(jù)處理的結(jié)果,你可以選擇cache 到內(nèi)存,也可以選擇到disk。這樣再進(jìn)行stage2的操作。
(假設(shè)處理結(jié)果為1000G 還是10000個(gè)partition, 這樣只能到disk 了)
那stage2處理方式一樣,從disk讀數(shù)據(jù),然后進(jìn)行10000/50 = 200輪 的操作。每個(gè)executor 有2G,處理100M 還是可以的。
- hive的分區(qū)數(shù)目和Spark 的partition 數(shù)無關(guān)
你可以去hdfs上查看,hive的分區(qū)目錄下也有分片的文件如:(假設(shè)分區(qū)字段為時(shí)間)
table/20180101/part-00000
~
table/20180101/part-00010
那這個(gè)分區(qū)下的文件,對(duì)應(yīng)Spark 中10個(gè)partition
然后通過spark.sql.shuffle.partitions 參數(shù)來調(diào)節(jié)執(zhí)行sql中shuffle 時(shí)的task數(shù)量
2017.03.27_01
問題描述:
如這樣的 rdd(k,v):RDD[(String, String)]
需要對(duì)第一個(gè)字段 k進(jìn)行歸并,然后統(tǒng)計(jì)v去重之后出現(xiàn)次數(shù)最多的字符(分組 top1)。
例如
a,A
a,B
a,A
b,C
b,D
c,D
c,D
結(jié)果為 (a,A) (b,C) (c,D)
rdd.map(x=>(x._1 + "|" + x._2, 1))
.reduceByKey(_ + _)
.map(x=>(x._1.split("\\|", -1)(0), (x._1.split("\\|", -1)(1),x._2)))
.reduceByKey((x,y) =>{
val re = if (x._2 > y._2){
x
}else {
y
}
re
})
.map(x=>(x._1,x._2._1))
擴(kuò)展
- topN 的問題該如何處理? 除了groupBy 還有別的思路么? 效率?
- 次數(shù)相同該如何處理?
2018.03.27_02
問題描述:
如這樣的 rdd(k,v):RDD[(String, String)]
需要對(duì)第一個(gè)字段 k進(jìn)行歸并,然后統(tǒng)計(jì)v去重之后出現(xiàn)的次數(shù)。
例如
a,A
a,B
a,A
b,C
b,D
c,D
c,D
結(jié)果為 (a,2) (b,2) (c,1)
目前整理的幾種思路是:
- 先進(jìn)行reduceBy,將v合并為字符串,之后再拆分、去重,統(tǒng)計(jì)出現(xiàn)次數(shù)
map(a,b).reduceByKey((x1,x2)=>{
val sum =x1+"\t"+x2
sum}).map{x._2.split("\t").distinct.size}
- 將k,v 拼接為一個(gè)字符串,去重,之后再差分,然后再進(jìn)行reduceBy
rdd.map(x=>(x._1 +"|" x._2)).distinct().map((_.split("|")(0),1)).reduceByKey(_+_)
- 將v 改為Set,reduce 之后再統(tǒng)計(jì)size
rdd.map(x=>(x._1, Set(x._2)).reduceByKey(_ ++ _).map(x=>(x._1, x._2.size))
- 使用dropDuplicates()
需要spark 2.x ,將rdd轉(zhuǎn)為DataSet 之后再進(jìn)行操作。
擴(kuò)展
- 四種思路比較? 是否有更好的解決方案?
- 當(dāng)v 也是多個(gè)字段該如何處理?
- Set 和 String的序列化效率差異?即 String 合并之后再拆分的開銷,和使用Set 序列化增長(zhǎng)的開銷 哪個(gè)比較大?