Spark - 利用WeakReference來(lái)清理對(duì)象

注:本文轉(zhuǎn)自我的個(gè)人博客(Spark - 利用WeakReference來(lái)清理對(duì)象)。

最近在stackoverflow上看到有人好奇Spark是在什么時(shí)機(jī)對(duì)Accumulator或者Broadcast這樣的變量進(jìn)行回收的。自己在看源碼的時(shí)候發(fā)現(xiàn)了這個(gè)有趣的地方。


Spark ContextCleaner

我們?cè)赟park閑置(沒有任務(wù)執(zhí)行)時(shí),容易看到下面的日志:

19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108284023
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108283658

追溯到源碼中,可以得知SparkContext在啟動(dòng)時(shí),初始化了ContextCleaner,并以daemon方式啟動(dòng)了一個(gè)cleaningThread線程,這個(gè)線程的作用就是不斷循環(huán),回收清理RDD、Broadcast變量、Accumulator等無(wú)效對(duì)象。
這個(gè)時(shí)候可以提出一個(gè)問題:以Accumulator為例,當(dāng)某個(gè)Accumulator不再使用(沒有被任何對(duì)象引用)時(shí),ContextCleaner是如何知道這個(gè)信息的?

先看一下整個(gè)清理過程:

val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
  .map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
  reference.foreach { ref =>
    logDebug("Got cleaning task " + ref.task)
    referenceBuffer.remove(ref)
    ref.task match {
      case CleanRDD(rddId) =>
        doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
      case CleanShuffle(shuffleId) =>
        doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
      case CleanBroadcast(broadcastId) =>
        doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
      case CleanAccum(accId) =>
        doCleanupAccum(accId, blocking = blockOnCleanupTasks)
      case CleanCheckpoint(rddId) =>
        doCleanCheckpoint(rddId)
    }
  }
}

可以看到,ContextCleaner是通過一個(gè)referenceQueue找到了需要回收的對(duì)象(CleanAccum)。接下來(lái),從referenceQueue入手,看看JVM中的WeakReference是什么樣的存在。


Java WeakReference

Java中對(duì)Reference有幾種不同的分類:

  • StrongReference: 通常我們定義的對(duì)象就屬于這種,較難被GC。
  • WeakReference: 如Spark中封裝的CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) ,如果引用的對(duì)象(task)只和當(dāng)前的WeakReference對(duì)象聯(lián)結(jié),那么在GC中會(huì)被回收,并放入referenceQueue中。
  • SoftReference: 相對(duì)WeakReference較強(qiáng)的引用,可以回收,但不一定是在下次GC中。

所以在ContextCleaner中,Spark采用了WeakReference + referenceQueue的方式來(lái)實(shí)現(xiàn)對(duì)象的回收。當(dāng)我們注冊(cè)一個(gè)Accumulator時(shí),會(huì)同時(shí)調(diào)用registerForCleanup:

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
  referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

referenceBuffer的作用是保證WeakReference在處理前不被GC。

Spark將注冊(cè)的Accumulator封裝到CleanupTask,并基于task初始化了一個(gè)WeakReference。當(dāng)Accumulator不再被引用時(shí),task會(huì)被放入referenceQueue中,而此時(shí)cleaningThread從referenceQueue中提取即將要GC的對(duì)象做處理(見上面的清理過程代碼)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容