Spark Streaming使用場景及優(yōu)化總結(jié)

SparkStreaming適合場景

Storm 流式計算(扶梯)
優(yōu)點: 數(shù)據(jù)延遲度很低,Storm的事務(wù)機制要比SparkStreaming的事務(wù)機制要完善(什么是事務(wù)機制?對于一條數(shù)據(jù),不多處理也不少處理,對于一條數(shù)據(jù)恰好處理一次,比如金融,股票等要求實時性比較高,那么就需要選Storm)

缺點:一直持有著資源,每一條數(shù)據(jù)都要在集群中某一臺節(jié)點處理,要計算的數(shù)據(jù)會進行網(wǎng)絡(luò)傳輸,吞吐量小,另外Storm不適合做復(fù)雜的業(yè)務(wù)邏輯(適合匯總)

SparkStreaming 微批處理(類似于電梯),它并不是純的批處理
優(yōu)點:吞吐量大,可以做復(fù)雜的業(yè)務(wù)邏輯(保證每個job的處理小于batch interval)
缺點:數(shù)據(jù)延遲度較高

公司中為什么選用SparkStreaming要多一些?
1.秒級別延遲,通常應(yīng)用程序是可以接受的,
2.可以應(yīng)用機器學(xué)習(xí),SparkSQL...可擴展性比較好,數(shù)據(jù)吞吐量較高

Spark性能優(yōu)化

代碼優(yōu)化

  • 多個Action計算最好基于同一個RDD進行計算操作, 并且對相同的RDD進行Cache操作,避免重復(fù)計算,增加任務(wù)的執(zhí)行時間;并且持久化級別最好使用MEMORY_ONLY_SER來減少內(nèi)存使用;

  • 在使用join的地方看是否可以使用map算子和廣播變量的方式替代;

  • 使用高效的算子, 例如:使用reduceByKey/aggregateByKey來代替groupByKey,因為前者可以進行combiner操作,減少網(wǎng)絡(luò)IO;

當(dāng)進行聯(lián)合規(guī)約操作時,避免使用 groupByKey。舉個例子,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執(zhí)行的結(jié)果是一樣的,但是前者需要把全部的數(shù)據(jù)通過網(wǎng)絡(luò)傳遞一遍,

  • 使用MapPartition來代替Map操作, 尤其是在需要網(wǎng)絡(luò)連接的地方;
  • 使用foreachPartition代替foreach操作,可以對數(shù)據(jù)進行批量處理;
  • 在filter操作后,可以使用colease操作,可以減少任務(wù)數(shù);
  • 序列化盡量使用Kyro方式, 其性能更好;
  • 減少對復(fù)雜數(shù)據(jù)結(jié)構(gòu)的使用,可以有效減少序列化時間;
  • 對應(yīng)簡單的函數(shù),最好使用閉合結(jié)構(gòu),可以有效減少網(wǎng)絡(luò)IO;
  • 使用Repartition操作可以有效增加任務(wù)的處理并行度;

參數(shù)調(diào)整優(yōu)化部分

經(jīng)過實踐驗證,調(diào)整后有效的參數(shù)如下:

設(shè)置合理的資源;
Java垃圾回收器;
清理不必要的空間;

  • 根據(jù)資源情況,可以添加Executor的個數(shù)來有效,參數(shù)為 spark.executor.instances
  • 調(diào)整每個Executor的使用內(nèi)核數(shù), 參數(shù)為 spark.executor.cores
  • 調(diào)整每個Executor的內(nèi)存, 參數(shù)為 spark.executor.memory
  • shuffle write task的buffer大小, 參數(shù)為 spark.shuffle.file.buffer
  • shuffle read task的buffer大小, 參數(shù)為 spark.reducer.maxSizeInFlight
  • 每一個stage的task的默認(rèn)并行度, 默認(rèn)為200, 建議修改為1000左右, 參數(shù) spark.default.parallelism
  • 用于RDD的持久化使用的內(nèi)存比例,默認(rèn)0.6, 參數(shù) spark.storage.memoryFraction
  • 用戶shuffle使用的內(nèi)存比例, 默認(rèn)為0.2, 參數(shù) spark.shuffle.memoryFraction

其它優(yōu)化

  • 增加數(shù)據(jù)讀取的并行度,比如讀取Kafka的數(shù)據(jù),可以增加topic的partition數(shù)量和executor的個數(shù);
  • 限制讀取Kafka數(shù)據(jù)的速率,參數(shù) spark.streaming.kafka.maxRatePerPartition
  • 對于存在數(shù)據(jù)傾斜問題,有兩類情況:
  • 進行join操作,產(chǎn)生skew問題, 可以使用map+廣播變量類進行處理;
  • 對redece/aggregate等聚合操作,參數(shù)skew問題, 可以進行兩次聚合的思想來解決, * 核心是先進行key進行隨機數(shù)操作,是數(shù)據(jù)分布均勻,并進行聚合,最后是剔除隨機數(shù)據(jù),用實際數(shù)據(jù)來進行聚合操作。

SQL 優(yōu)化

  • 開啟spark.sql.autoBroadcastJoinThreshold;
  • 合理配置spark.sql.shuffle.partition;

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容