Spark Streaming Dynamic Resource Allocation 文檔(非官方特性)

必要配置

通過下面參數(shù)開啟DRA

spark.streaming.dynamicAllocation.enabled=true

設(shè)置最大最小的Executor 數(shù)目:

spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50

可選配置

這些參數(shù)可以不用配置,都已經(jīng)提供了一個較為合理的默認值

開啟日志:

spark.streaming.dynamicAllocation.debug=true

設(shè)置DRA 生效延時:

spark.streaming.dynamicAllocation.delay.rounds=10

設(shè)置DRA 計算資源量時參考的周期數(shù):

spark.streaming.dynamicAllocation.rememberBatchSize=1

設(shè)置DRA 釋放資源的步調(diào):

spark.streaming.dynamicAllocation.releaseRounds=5

設(shè)置DRA 資源額外保留比例:

spark.streaming.dynamicAllocation.reserveRate=0.2

DRA 算法說明

減少資源時,采用啟發(fā)式算法。根據(jù)之前周期的處理時間,計算需要保留的資源量(A),然后嘗試分多輪試探性的減少(B),每個計算周期都會重復(fù)A,B動作,最后會收斂到一個具體的數(shù)值。

如果一旦發(fā)生延時,則會立馬向Yarn申請spark.streaming.dynamicAllocation.maxExecutors 個Executor,以保證可以最快速度消除延時。富余出來的資源會通過減少資源的動作慢慢進行減少,讓程序趨于穩(wěn)定。

發(fā)生減少資源的動作,則剔除的掉的Executor 會被立刻(幾毫秒/納秒)屏蔽,并且不再分配Task,之后再由Yarn異步移除。

添加資源的動作,則由Yarn決定

注意事項

請務(wù)必保證你Package 的App包不包含spark 相關(guān)的組件。否則你會看到自己的設(shè)置并不生效,因為運行的時候用了你的App里的spark-core,spark-streaming jar包了。

一些可以參考的調(diào)整

如果系統(tǒng)趨向穩(wěn)定后,經(jīng)過人工觀察發(fā)現(xiàn)其實還可以再降資源,則可以嘗試調(diào)低

spark.streaming.dynamicAllocation.releaseRounds=5
spark.streaming.dynamicAllocation.reserveRate=0.2

建議releaseRounds 不低于2,reserveRate 不低于0.05。避免系統(tǒng)發(fā)生顛簸。

測試代碼

object IamGod {
  def main(args: Array[String]): Unit = {

    def createContext = {
      val conf = new SparkConf().setAppName("DRA Test")
      val ssc = new StreamingContext(conf, Seconds(30))

      val items1 = Seq.fill(30)(Seq((10 + scala.util.Random.nextInt(10)) * 1000))
      val items2 = Seq.fill(30)(Seq((30 + scala.util.Random.nextInt(10)) * 1000))
      val items3 = Seq.fill(30)(Seq((20 + scala.util.Random.nextInt(10)) * 1000))

      val fileInput = new TestInputStream[Int](ssc, items1 ++ items2 ++ items3, 10)

      val logs = fileInput.map(f => Thread.sleep(f))

      logs.foreachRDD { rdd =>
        rdd.count()
      }

      ssc
    }

    val ssc = createContext

    ssc.start()
    ssc.awaitTermination()

  }

}

前面引用了一個測試類:

class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
  extends InputDStream[T](_ssc) {

  def start() {}

  def stop() {}

  def compute(validTime: Time): Option[RDD[T]] = {
    logInfo("Computing RDD for time " + validTime)
    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
    val selectedInput = if (index < input.size) input(index) else Seq[T]()

    // lets us test cases where RDDs are not created
    if (selectedInput == null) {
      return None
    }

    // Report the input data's information to InputInfoTracker for testing
    val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
    Some(rdd)
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容