注:本文轉(zhuǎn)自我的個(gè)人博客(Spark - 利用WeakReference來(lái)清理對(duì)象)。
最近在stackoverflow上看到有人好奇Spark是在什么時(shí)機(jī)對(duì)Accumulator或者Broadcast這樣的變量進(jìn)行回收的。自己在看源碼的時(shí)候發(fā)現(xiàn)了這個(gè)有趣的地方。
Spark ContextCleaner
我們?cè)赟park閑置(沒有任務(wù)執(zhí)行)時(shí),容易看到下面的日志:
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108284023
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108283658
追溯到源碼中,可以得知SparkContext在啟動(dòng)時(shí),初始化了ContextCleaner,并以daemon方式啟動(dòng)了一個(gè)cleaningThread線程,這個(gè)線程的作用就是不斷循環(huán),回收清理RDD、Broadcast變量、Accumulator等無(wú)效對(duì)象。
這個(gè)時(shí)候可以提出一個(gè)問題:以Accumulator為例,當(dāng)某個(gè)Accumulator不再使用(沒有被任何對(duì)象引用)時(shí),ContextCleaner是如何知道這個(gè)信息的?
先看一下整個(gè)清理過程:
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
可以看到,ContextCleaner是通過一個(gè)referenceQueue找到了需要回收的對(duì)象(CleanAccum)。接下來(lái),從referenceQueue入手,看看JVM中的WeakReference是什么樣的存在。
Java WeakReference
Java中對(duì)Reference有幾種不同的分類:
- StrongReference: 通常我們定義的對(duì)象就屬于這種,較難被GC。
- WeakReference: 如Spark中封裝的CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) ,如果引用的對(duì)象(task)只和當(dāng)前的WeakReference對(duì)象聯(lián)結(jié),那么在GC中會(huì)被回收,并放入referenceQueue中。
- SoftReference: 相對(duì)WeakReference較強(qiáng)的引用,可以回收,但不一定是在下次GC中。
所以在ContextCleaner中,Spark采用了WeakReference + referenceQueue的方式來(lái)實(shí)現(xiàn)對(duì)象的回收。當(dāng)我們注冊(cè)一個(gè)Accumulator時(shí),會(huì)同時(shí)調(diào)用registerForCleanup:
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}
referenceBuffer的作用是保證WeakReference在處理前不被GC。
Spark將注冊(cè)的Accumulator封裝到CleanupTask,并基于task初始化了一個(gè)WeakReference。當(dāng)Accumulator不再被引用時(shí),task會(huì)被放入referenceQueue中,而此時(shí)cleaningThread從referenceQueue中提取即將要GC的對(duì)象做處理(見上面的清理過程代碼)。