看這篇文章前,請(qǐng)先移步Spark Streaming 數(shù)據(jù)產(chǎn)生與導(dǎo)入相關(guān)的內(nèi)存分析, 文章重點(diǎn)講的是從Kafka消費(fèi)到數(shù)據(jù)進(jìn)入BlockManager的這條線路的分析。
這篇內(nèi)容是個(gè)人的一些經(jīng)驗(yàn),大家用的時(shí)候還是建議好好理解內(nèi)部的原理,不可照搬
讓Receiver均勻的分布到你的Executor上
在Spark Streaming 數(shù)據(jù)產(chǎn)生與導(dǎo)入相關(guān)的內(nèi)存分析中我說(shuō)了這么一句話:
我發(fā)現(xiàn)在數(shù)據(jù)量很大的情況下,最容易掛掉的就是Receiver所在的Executor了。 建議Spark Streaming團(tuán)隊(duì)最好是能將數(shù)據(jù)寫入到多個(gè)BlockManager上。
從現(xiàn)在的API來(lái)看,是沒(méi)有提供這種途徑的。但是Spark Streaming 提供了同時(shí)讀多個(gè)topic的功能,每個(gè)topic是一個(gè)InputStream。 我們可以復(fù)用這個(gè)功能,具體代碼如下:
val kafkaDStreams = (1 to kafkaDStreamsNum).map { _ => KafkaUtils.createStream(
ssc,
zookeeper,
groupId,
Map("your topic" -> 1),
if (memoryOnly) StorageLevel.MEMORY_ONLY else StorageLevel.MEMORY_AND_DISK_SER_2)}
val unionDStream = ssc.union(kafkaDStreams)
unionDStream
kafkaDStreamsNum 是你自己定義的,希望有多少個(gè)Executor 啟動(dòng)Receiver 去接收kafka數(shù)據(jù)。我的經(jīng)驗(yàn)值是 1/4 個(gè)Executors 數(shù)目。因?yàn)閿?shù)據(jù)還要做replication 一般,所以這樣內(nèi)存最大可以占到 1/2 的storage.
另外,務(wù)必給你系統(tǒng)設(shè)置 spark.streaming.receiver.maxRate。假設(shè)你啟動(dòng)了 N個(gè) Receiver,那么你系統(tǒng)實(shí)際會(huì)接受到的數(shù)據(jù)不會(huì)超過(guò) N*MaxRate,也就是說(shuō),maxRate參數(shù)是針對(duì)每個(gè) Receiver 設(shè)置的。
減少非Storage 內(nèi)存的占用
也就是我們盡量讓數(shù)據(jù)都占用Spark 的Storage 內(nèi)存。方法是把spark.streaming.blockInterval 調(diào)小點(diǎn)。當(dāng)然也會(huì)造成一個(gè)副作用,就是input-block 會(huì)多。每個(gè)Receiver 產(chǎn)生的的input-block數(shù)為: batchInterval* 1000/blockInterval。 這里假設(shè)你的batchInterval 是以秒為單位的。 blockInterval 其實(shí)我也不知道會(huì)有啥影響。其實(shí)說(shuō)白了,就是為了防止GC的壓力。實(shí)時(shí)計(jì)算有一個(gè)很大問(wèn)題是GC。
減少單個(gè)Executor的內(nèi)存
一般在Spark Streaming中不建議把 Executor 的內(nèi)存調(diào)的太大。對(duì)GC是個(gè)壓力,大內(nèi)存一FullGC比較可怕,很可能會(huì)拖垮整個(gè)計(jì)算。 多Executor的容錯(cuò)性也會(huì)更好些。