按照優(yōu)先級在關(guān)閉時執(zhí)行一系列操作,在spark內(nèi)用途很廣泛,主要是釋放資源,刪除文件等
使用
// SparkContext在初始化時注冊,設(shè)定優(yōu)先級和要調(diào)用的函數(shù)
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop() // 關(guān)閉SparkContext時要釋放清理的對象
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}
ShutdownHookManager
ShutdownHookManager的靜態(tài)函數(shù)addShutdownHook依賴SparkShutdownHookManager類進行具體的邏輯處理
//處理的核心
private lazy val shutdownHooks = {
val manager = new SparkShutdownHookManager()
manager.install()
manager
}
// 如果在jvm執(zhí)行shutdown hook過程中添加鉤子,jvm會拋出異常
def inShutdown(): Boolean = {
try {
val hook = new Thread {
override def run() {}
}
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(hook)
// scalastyle:on runtimeaddshutdownhook
Runtime.getRuntime.removeShutdownHook(hook)
} catch {
case ise: IllegalStateException => return true
}
false
}
//添加鉤子
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}
private [util] class SparkShutdownHookManager {
// 每個鉤子都記錄到優(yōu)先隊列里
private val hooks = new PriorityQueue[SparkShutdownHook]()
@volatile private var shuttingDown = false
/**
* Install a hook to run at shutdown and run all registered hooks in order.
*/
def install(): Unit = {
val hookTask = new Runnable() {
// runAll 函數(shù)會從優(yōu)先隊列中取出所有的鉤子并運行
override def run(): Unit = runAll()
}
// 依賴Hadoop的ShutdownHookManager機制
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
}
}
Hadoop的ShutdownHookManager機制是通過JDK的addShutdownHook實現(xiàn),收到信號后,將所有的鉤子按照優(yōu)先級取出執(zhí)行
JDK ShutdownHook
java.lang.Runtime包內(nèi)的函數(shù):
addShutdownHook(Thread hook)
注冊一個JVM關(guān)閉的鉤子,JVM會為了響應(yīng)以下兩類事件而關(guān)閉:
- 程序正常退出
exits,當(dāng)最后的非守護線程退出時,或者在調(diào)用System.exit方法時 - 因為用戶中斷,終止
terminated虛擬機,例如Ctrl + C,或發(fā)生系統(tǒng)級別的事件,比如用戶退出(產(chǎn)生SIGHUP,系統(tǒng)的默認處理方式是終止進程)或系統(tǒng)關(guān)閉
shutdown hook是一個已初始化但尚未啟動的線程。虛擬機開始關(guān)閉時,它會以某種順序啟動所有已經(jīng)注冊的關(guān)閉鉤子,并讓它們同時運行。運行完所有的鉤子后,虛擬機就會停止。注意,關(guān)閉期間會繼續(xù)運行守護線程,如果通過調(diào)用exit方法來發(fā)起關(guān)閉,那么也會繼續(xù)運行非守護線程
一旦開始了關(guān)閉序列shutdown sequence,則只能通過調(diào)用halt方法來停止,此方法可強行終止虛擬機,在關(guān)閉過程中不能注冊新的關(guān)閉鉤子或取消注冊先前已注冊的鉤子。否則會導(dǎo)致拋出 IllegalStateException
shutdown hook在虛擬機生命周期中的特定時間運行,因此在編寫代碼時要特別注意,保證線程安全,并盡可能地避免死鎖
shutdown hook應(yīng)該快速地完成其工作。當(dāng)程序調(diào)用exit時,虛擬機應(yīng)該迅速地關(guān)閉并退出。由于用戶退出或系統(tǒng)關(guān)閉而終止虛擬機時,底層的操作系統(tǒng)可能只允許在固定的時間內(nèi)關(guān)閉并退出。因此在關(guān)閉鉤子中不應(yīng)該進行用戶交互或執(zhí)行長時間的計算
與其他所有線程一樣,通過調(diào)用線程所屬線程組的ThreadGroup.uncaughtException方法,可在關(guān)閉鉤子中處理未捕獲的異常。此方法的默認實現(xiàn)是將該異常的堆棧跟蹤打印至System.err并終止線程;它不會導(dǎo)致虛擬機退出或暫停。
僅在很少的情況下,虛擬機可能會中止abort,也就是沒有完全關(guān)閉就停止運行。虛擬機被外部終止時會出現(xiàn)這種現(xiàn)象,比如在Unix上使用SIGKILL信號。如果native方法出錯,虛擬機也會終止,例如內(nèi)部數(shù)據(jù)結(jié)構(gòu)損壞或試圖訪問不存在的內(nèi)存。如果虛擬機中止,則無法保證是否將運行關(guān)閉鉤子
File.deleteOnExit()進程結(jié)束后刪除文件,其實現(xiàn)機制就是ShutdownHook
JDK信號處理相關(guān)
主要是sun.misc包內(nèi)兩個類:Signal和SignalHandler
Signal構(gòu)造函數(shù)
// 參數(shù)為對應(yīng)信號的名稱
public Signal(String var1)
通過
kill -l可以查看信號的名稱
SignalHandler接口的核心是處理函數(shù)handle(Signal var1)
public interface SignalHandler {
SignalHandler SIG_DFL = new NativeSignalHandler(0L);
SignalHandler SIG_IGN = new NativeSignalHandler(1L);
void handle(Signal var1);
}
靜態(tài)方法Signal.handle(Signal, SignalHandler)注冊Signal對應(yīng)的處理器
在org.apache.spark.util.SignalUtils類中就添加了對"TERM", "HUP", "INT"三個信號的處理
def registerLogger(log: Logger): Unit = synchronized {
if (!loggerRegistered) {
Seq("TERM", "HUP", "INT").foreach { sig =>
SignalUtils.register(sig) {
log.error("RECEIVED SIGNAL " + sig)
false
}
}
loggerRegistered = true
}
}
內(nèi)部創(chuàng)建對應(yīng)的信號Signal,以及對應(yīng)的ActionHandler,其實現(xiàn)了SignalHandler接口的handle方法,內(nèi)部會執(zhí)行注冊的方法,這里就是進行l(wèi)og輸出