來源:https://mp.weixin.qq.com/s/Kv1Qq4118I2itYwPYyQUoA
1寫在前面
天翼電子商務(wù)有限公司(簡稱“甜橙金融”)是中國電信的全資子公司,2011 年 3 月成立于北京,作為中國人民銀行核準(zhǔn)的第三方支付機(jī)構(gòu),是兼具金融、電信、互聯(lián)網(wǎng)文化的跨界國家高新技術(shù)企業(yè)。目前公司對(duì)實(shí)時(shí)性計(jì)算的需要及應(yīng)用越來越多,本文選取了其中之一的 Spark Streaming 來介紹如何實(shí)現(xiàn)高吞吐量并具備容錯(cuò)機(jī)制的實(shí)時(shí)流應(yīng)用。在甜橙金融監(jiān)控系統(tǒng)項(xiàng)目中,需要對(duì)每天億萬級(jí)(10T)的日志記錄進(jìn)行實(shí)時(shí)的指標(biāo)統(tǒng)計(jì),在生產(chǎn)者一端,我們通過 Flume 將數(shù)據(jù)存入 Kafka 當(dāng)中, 而在消費(fèi)者一端,我們利用 Spark Streaming 從 Kafka 中不斷的拉取數(shù)據(jù)進(jìn)行指標(biāo)統(tǒng)計(jì)并存入外部存儲(chǔ)中。
本文將從以下幾個(gè)方面進(jìn)行介紹,目的是帶領(lǐng)大家對(duì)實(shí)時(shí)流處理有個(gè)初步的認(rèn)識(shí),一起交流學(xué)習(xí)。
監(jiān)控系統(tǒng)架構(gòu)及存在的主要問題
Spark Streaming 流處理框架介紹
Streaming 相關(guān)的優(yōu)化
Streaming 任務(wù)的監(jiān)控
寫在最后
2監(jiān)控系統(tǒng)架構(gòu)及存在的主要問題系統(tǒng)架構(gòu)介紹
整個(gè)實(shí)時(shí)監(jiān)控系統(tǒng)的架構(gòu)是先由 Flume 收集服務(wù)器產(chǎn)生的日志 Log 和前端埋點(diǎn)數(shù)據(jù), 然后實(shí)時(shí)把這些信息發(fā)送到 Kafka 分布式發(fā)布訂閱消息系統(tǒng),接著由 Spark Streaming 消費(fèi) Kafka 中的消息,同時(shí)消費(fèi)記錄由 Zookeeper 集群統(tǒng)一管理,這樣即使 Kafka 宕機(jī)重啟后也能找到上次的消費(fèi)記錄繼而進(jìn)行消費(fèi)。在這里 Spark Streaming 首先從 MySQL 讀取規(guī)則然后進(jìn)行 ETL 清洗并計(jì)算多個(gè)聚合指標(biāo),最后將結(jié)果的一部分存儲(chǔ)到 Hbase 中,另一部分重新發(fā)回到 Kafka 中再消費(fèi)更新到 MySQL 中,監(jiān)控前端實(shí)時(shí)獲取指標(biāo)進(jìn)行展示。
主要問題
在上面的框架介紹中,下游監(jiān)控系統(tǒng)的指標(biāo)數(shù)據(jù)來源于 Spark Streaming 的實(shí)時(shí)計(jì)算,可見 Streaming 計(jì)算處于極為重要的環(huán)節(jié),而計(jì)算性能不足就會(huì)成為整個(gè)系統(tǒng)的瓶頸。大部分時(shí)候我們 Spark 指標(biāo)計(jì)算都能應(yīng)付過來,但是在節(jié)日流量翻倍的情況下就力不從心了,為應(yīng)對(duì)這種情況之前采取的措施一般是關(guān)閉一些非關(guān)鍵性日志接口把監(jiān)控流量降下來。雖然此舉能暫時(shí)解決問題,但仍需要治標(biāo)更治本的方法。
首先來看看優(yōu)化前 Streaming 的計(jì)算能力。
圖一所示為每批次(30 秒)800W+ 日志流量下,Spark Streaming 計(jì)算大概需要 50 多秒。雖無明顯延時(shí),但計(jì)算能力很弱雞 14w/s

圖 1
隨著流量不斷的增大,如圖 2 所示為每批次(時(shí)間 30 秒)1000W+ 條日志流量下,Spark 計(jì)算已嚴(yán)重超時(shí),越來越多的 batch 加入到 queue 的隊(duì)列等待處理,此時(shí)監(jiān)控系統(tǒng)基本失效。

圖 2
既然痛點(diǎn)已找到,那么剩下要做的就是想辦法去優(yōu)化。下文在講如何優(yōu)化前,先帶大家認(rèn)識(shí)下流式處理框架中的兩個(gè)經(jīng)典好搭檔 Spark Streaming + Kafka。
3Spark Streaming + Kafka 流處理框架為什么選擇 Spark Streaming 和 Kafka
Kafka 支持分布式及出色的吞吐量
-
Spark Streaming 流式處理框架已被各大公司廣泛應(yīng)用且成熟度高,支持大部分的數(shù)據(jù)源和存儲(chǔ),如下圖所示其豐富生態(tài)圈
[圖片上傳中...(image-6bc74e-1536914295737-0)]
Kafka 與 Spark Streaming 集成度高
Spark Streaming 初識(shí)
Spark Streaming 接受實(shí)時(shí)輸入數(shù)據(jù)并將數(shù)據(jù)切分成多個(gè) batches, 然后由 Spark engine 進(jìn)行計(jì)算并將結(jié)果輸出到外部存儲(chǔ)。
接下來看看 Spark Streaming 從 Kafka 中接受數(shù)據(jù)的兩種方式。
基于 Receiver 方式
這種方式使用 Receiver 方式接受數(shù)據(jù),實(shí)現(xiàn)是以 Kafka 高階用戶 API 接口,收到的數(shù)據(jù)會(huì)存到 Spark executor,之后 Spark Streaming 提交 Job 處理這些數(shù)據(jù)。為了保證數(shù)據(jù)不會(huì)丟失,需要開啟 Write Ahead Logs,流程如下圖所示:
基于 Direct 方式
在 Spark 1.3 之后,引入了 Direct 方式以提供更強(qiáng)的端到端的保證。不同于 Receiver 方式,其會(huì)周期性的獲取 Kafka 每個(gè) topic 中每個(gè) Partition 最新的 offsets。之后 Spark job 會(huì)基于 Kafka simple API 讀取 Kafka 相應(yīng) Offset 數(shù)據(jù)并進(jìn)行處理,流程如下圖所示:

該方式相對(duì)于 Receiver 方式具有以下優(yōu)勢(shì):
簡化的并行度:基于 Receiver 的方式中要提高數(shù)據(jù)傳輸并行度我們需要?jiǎng)?chuàng)建多個(gè) Receiver 實(shí)例之后再 Union 起來合并成一個(gè) Dstream。而 Direct 方式中提供了更為簡單的映射關(guān)系,Kafka 中的 partition 與 Spark RDD 中的 partition 是一一映射的,因而可以并行讀取數(shù)據(jù)。
高效性:在 Receiver 的方式中,為了達(dá)到零數(shù)據(jù)丟失需要將數(shù)據(jù)備份到 Write Ahead Log 中,這樣系統(tǒng)中就保存了兩份數(shù)據(jù)浪費(fèi)資源。而 Direct 方式只要知道當(dāng)前消費(fèi)的 Offsets 就能恢復(fù)出相應(yīng)的數(shù)據(jù)。
精確一次的語義保證:基于 Receiver 的方式中,通過 Kafka 的高階 API 接口從 Zookeeper 中獲取 offset 值,這也是傳統(tǒng)的從 Kafka 中讀取數(shù)據(jù)的方式,但由于 Spark Streaming 消費(fèi)的數(shù)據(jù)和 Zookeeper 中記錄的 offset 不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。而第二種方式,直接使用了簡單的低階 Kafka API,Offsets 可以利用 Spark Streaming 的 checkpoints 進(jìn)行記錄來消除這種不一致性。
以上翻譯自官方文檔。既然 Direct 方式有這么多優(yōu)點(diǎn),那么在我們的監(jiān)控系統(tǒng)中理所當(dāng)然也用了這種方式,同時(shí)為了能使基于 Zookeeper 的 Kafka monitor 工具生效,我們也實(shí)現(xiàn)了 Offset 的管理,具體流程如下:
Spark Streaming 任務(wù)啟動(dòng)后首先去 Zookeeper 中去讀取特定 topic 中每個(gè) Partition 的 offset 并組裝 fromOffsets 變量;
Spark Streaming 獲取到 fromOffsets 后通過 KafkaUtils.createDirectStream 去消費(fèi) Kafka 的數(shù)據(jù);
讀取 Kafka 數(shù)據(jù)然后進(jìn)行批的邏輯處理,如下圖所示為該 Job 的 DAG,包括一些基本的 RDD 算子操作 (flatMap, reduceByKey, Map 等), 并將計(jì)算結(jié)果存儲(chǔ)到 Hbase 和回吐到 Kafka 中,最后更新 offsets 到 Zookeeper 中。

4Spark Streaming 性能優(yōu)化及任務(wù)監(jiān)控
重點(diǎn)來了,那么說起優(yōu)化,我們首先想到的就是最大限度利用集群資源,將硬件性能壓榨到極致,先看看如何在用 spark-submit 提交命令的時(shí)候進(jìn)行資源調(diào)優(yōu)。
資源參數(shù)調(diào)優(yōu)
增加 Driver 和 Executor 的內(nèi)存(driver-memory、executor-memory)
通過增加 Driver 和 Executor 的內(nèi)存數(shù)量,可以減小程序 Out of memory 和 意外崩潰 產(chǎn)生的概率,當(dāng)然也不能無限制增加以免造成資源的浪費(fèi)或者導(dǎo)致其它任務(wù)申請(qǐng)資源失敗。
設(shè)置合理的 CPU 個(gè)數(shù)
--num-executors 和 --executor-cores 兩個(gè)參數(shù)配合使用來調(diào)節(jié)計(jì)算資源占有情況。通常對(duì)于集群中一定量的 CPU Core,設(shè)置較多的 Executor 個(gè)數(shù)和較少的 Executor core 個(gè)數(shù)來達(dá)到資源最大使用率。
結(jié)合內(nèi)存和 CPU 參數(shù),我們來舉個(gè)例子,看看怎么設(shè)置會(huì)比較合理。
假設(shè)在擁有 6 個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)有 16 個(gè) Core 和 64G 內(nèi)存集群中提交 Job, 一種可能的配置參數(shù)如下:
--num-executors 6 –executor-cores 15 –executor-memory 63G
這種方式其實(shí)不太合理,原因如下:
由于我們的 OS 以及 Hadoop daemons 要占用一定內(nèi)存,因此 yarn.nodemanager.resources.memory-mb 和 yarn.nodemanager.resources.cpu-vcores 不可能占用 100% 資源,一般是 63 * 1024 和 15Core.
Application master 也會(huì)占用一個(gè) core, 因此在 master 節(jié)點(diǎn)上也不可能設(shè)置為 15 個(gè) core
每個(gè) executor 設(shè)置 15Core 會(huì)造成低效的 HDFS I/O 吞吐量
鑒于上面的原因,一種更為合理的的設(shè)置是:
--num-executors 17 –executor-cores 5 –executor-memory 19G
增加 parallelism:增加 Spark Partition 數(shù)量
Partition 即 Spark 中的數(shù)據(jù)分區(qū),每個(gè) task 在同一時(shí)間只能處理一個(gè) Partition 的數(shù)據(jù),這個(gè)值不能設(shè)置的太小也不能設(shè)置的太大。
設(shè)置的太大,每個(gè)分區(qū)中的數(shù)據(jù)很少,因此會(huì)需要更多的 task 來處理這些數(shù)據(jù),增加任務(wù)調(diào)度器的負(fù)擔(dān)
設(shè)置的太小,每個(gè)分區(qū)中的數(shù)據(jù)很多,也會(huì)對(duì)內(nèi)存造成壓力,executor 無法最大程度利用集群計(jì)算資源。
通過 spark.default.parallelism 可以設(shè)置 spark 默認(rèn)的分區(qū)數(shù)量,在這里我們?cè)O(shè)置的 1000.
此外在 Spark Streaming + Kafka 的案例中,我們采用 Direct 方式從 Kafka 中獲取數(shù)據(jù),此時(shí) Kafka partition 的數(shù)量和 Spark RDD 的分區(qū)數(shù)量是 1:1 映射的關(guān)系,而調(diào)優(yōu)之前該 topic 創(chuàng)建時(shí)的分區(qū)數(shù)量是 64,并發(fā)度太小導(dǎo)致集群資源利用不夠。我們一開始采取的優(yōu)化方式是創(chuàng)建 InputDstream 之后先 Repartition 到一個(gè)更大的并行度,然后進(jìn)行邏輯計(jì)算,結(jié)果證明該方式較之前性能上有一定提升但還是沒有達(dá)到我們想要的理想結(jié)果,這是由于 repartition 會(huì)造成 Shuffle 操作,而 Shuffle 比較耗時(shí),會(huì)引起大量的磁盤 IO, 序列化、反序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮鳎虼艘M量避免。之后我們直接從數(shù)據(jù)源頭 Kafka 那邊增加 Topic 分區(qū)數(shù)(240),從而極大的提升了處理效率。如圖所示:
設(shè)置合理的批處理時(shí)間和 Kafka 數(shù)據(jù)拉取速率
在 StreamingContext 初始化的時(shí)候需要設(shè)置批處理時(shí)間,而這個(gè)值不能設(shè)置的太小,太小不僅會(huì)導(dǎo)致 SparkStreaming 頻繁的提交作業(yè)增加系統(tǒng)調(diào)度的負(fù)擔(dān),如果處理不過來容易造成作業(yè)的積壓發(fā)生阻塞。此外還要根據(jù)生產(chǎn)者寫入 Kafka 的速率以及 Streaming 本身消費(fèi)數(shù)據(jù)的速率設(shè)置合理的 Kafka 讀取速率(spark.streaming.kafka.maxRatePerPartition),使得 Spark Streaming 的每個(gè) task 都能在 Batch 時(shí)間內(nèi)及時(shí)處理完 Partition 內(nèi)的數(shù)據(jù),使 Scheduling Delay 盡可能的小。
最后還可以設(shè)置 spark.steaming.backpressure.enabled 為 true,這就使得如果在某一時(shí)刻數(shù)據(jù)量突然增大導(dǎo)致處理時(shí)間遠(yuǎn)大于 Batch interval 的情況下,告訴 Kafka 你需要降低發(fā)送速率了。下圖所示為理想的處理狀態(tài)。
使用 Kryo 序列化
Spark Streaming 在傳輸、使用對(duì)象的時(shí)候要用到序列化和反序列化,而 Kryo 序列化方式比 Java 序列化機(jī)制性能高 10 倍,因此我們可在使用的時(shí)候注冊(cè)自定義類型,如下函數(shù)所示:
設(shè)置 Streaming job 的并行度
這里的 job 主要由兩個(gè)參數(shù)決定:
Spark.scheduler.mode(FIFO/FAIR)
Spark.streaming.concurrentjobs
在每個(gè) batch 內(nèi),可能有一批 Streaming job, 默認(rèn)是 1,這些 job 由 jobExecutor 執(zhí)行并提交,而 JobExecutor 是一個(gè)默認(rèn)池子大小為 1 的線程池,大小由參數(shù) Spark.streaming 。concurrentjobs 控制。如果 concurrentjobs 設(shè)置為 2,那么只要資源允許,那么會(huì)同時(shí)提交執(zhí)行兩個(gè) job,否則仍順序執(zhí)行。
開發(fā)調(diào)優(yōu)
Hbase 輸出操作
在我們的項(xiàng)目中,需要將 Spark Streaming 計(jì)算完的結(jié)果存入到 Hbase 中,這里我們采用的是批量 Put 數(shù)據(jù)到 Hbase 中,而非每次插入單條數(shù)據(jù),參考如下事例:
輸出到 Kafka
此外我們還會(huì)將計(jì)算結(jié)果回吐到 Kafka 中。通常你可能會(huì) Google “Spark Streaming to kafka”來尋找案例,而大多數(shù)情況你會(huì)找到下面這樣的例子,當(dāng)然很大程度上你也會(huì)這么寫。針對(duì) Partition 中的每條數(shù)據(jù)建立一個(gè) Kafka Producer, 然后再發(fā)送數(shù)據(jù),這種做法不靈活且低效。

比較高效的做法有兩種:
定義 Kafka producer 為 lazy 并廣播到每個(gè) executor 上,之后就可以用這個(gè) producer 發(fā)送數(shù)據(jù),事例如下:
使用也比較方便:
或者使用單例模式:
遇到的坑
經(jīng)過上述調(diào)優(yōu)方案后,Spark Streaming 實(shí)時(shí)處理能力較之前有了質(zhì)的提高,但是我們也經(jīng)常會(huì)發(fā)現(xiàn)一些異?,F(xiàn)象。在流量逐步升高的情況下,會(huì)出現(xiàn)丟包的情況,Streaming 的計(jì)算性能也受到了很大的影響。通過使用 Zabbix 工具查看網(wǎng)卡流量,發(fā)現(xiàn)有時(shí)候 eth3 網(wǎng)卡出口流量能達(dá)到 638Mbps, 如下圖所示,而我們的網(wǎng)卡是千兆網(wǎng),并且在存在多個(gè) kafka Consumer 的情況下就不難解釋之前的丟包現(xiàn)象了,同樣 spark 計(jì)算過程中需要傳輸數(shù)據(jù),因?yàn)槭艿綆挼南拗埔矔?huì)導(dǎo)致計(jì)算性能的下降。

隨后我們將 Kafka 集群中的網(wǎng)卡換到萬兆,重新提交 Spark Streaming 任務(wù)后發(fā)現(xiàn)計(jì)算性能提升數(shù)倍:上圖為調(diào)優(yōu)前約 15w/s 的處理量,下圖為調(diào)優(yōu)后每秒 50w/s 的處理量。

VS
當(dāng)在一個(gè) Batch 時(shí)間內(nèi)輸入數(shù)據(jù)達(dá)到 1000W 以上事件時(shí),Streaming 仍能很好的 handle,計(jì)算性能仍是 50W+/s 的處理速率,相比調(diào)優(yōu)前基本失效的狀態(tài)也大大提高了穩(wěn)定性。
任務(wù)監(jiān)控
對(duì)于 Spark Streaming 任務(wù)的監(jiān)控可以直觀的通過 Spark Web UI ,該頁面包括 Input Rate, Scheduling Delay、Processing Time 等,但是這種方法運(yùn)維成本較高,需要人工不間斷的巡視。
另一種推薦的方式可以通過 StreamingListener 接口獲取 Scheduling Delay 和 Processing Time,事例如下:
除此之外你還可以自己寫 Python 腳本在 yarn 管理界面解析該應(yīng)用的 ApplicationMaster 的地址,之后再通過 Spark Streaming 的 UI 去獲取相關(guān)參數(shù)。
5寫在最后
目前我們?cè)谧?Structured Streaming 的測試,相關(guān)文檔參見:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
該實(shí)時(shí)流框架端到端的延遲為 100ms,而且在 Spark 最新版本 2.3 中支持 Continuous Processing 模式,延遲能降到更低 1ms,對(duì)比 Spark Streaming 就要好很多。
總之性能優(yōu)化的路還很長,這就需要我們不斷的嘗試新的技術(shù)新的框架,最后希望本文能給正在做 spark streaming 實(shí)時(shí)流優(yōu)化的同學(xué)帶來一些幫助,歡迎大家一起交流。
6參考文獻(xiàn):
1.http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
2.https://ngorchakova.github.io/jvmwarstories/spark-kafka-sink/
作者介紹
張璐波,甜橙金融大數(shù)據(jù)實(shí)時(shí)計(jì)算專家,高級(jí)架構(gòu)師,對(duì)海量數(shù)據(jù)實(shí)時(shí)處理有深入的研究,涉及 Kafka, Spark Streaming, Structured Streaming, Flink, Strom 等開源流計(jì)算框架。目前主要工作集中在實(shí)時(shí)風(fēng)控,智能風(fēng)控以及流計(jì)算平臺(tái)的搭建。