Spark ShutdownHook

按照優(yōu)先級在關(guān)閉時執(zhí)行一系列操作,在spark內(nèi)用途很廣泛,主要是釋放資源,刪除文件等

使用

    // SparkContext在初始化時注冊,設(shè)定優(yōu)先級和要調(diào)用的函數(shù)
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      try {
        stop() // 關(guān)閉SparkContext時要釋放清理的對象
      } catch {
        case e: Throwable =>
          logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
      }
    }

ShutdownHookManager

ShutdownHookManager的靜態(tài)函數(shù)addShutdownHook依賴SparkShutdownHookManager類進行具體的邏輯處理

  //處理的核心
  private lazy val shutdownHooks = {
    val manager = new SparkShutdownHookManager()
    manager.install()
    manager
  }

  // 如果在jvm執(zhí)行shutdown hook過程中添加鉤子,jvm會拋出異常
  def inShutdown(): Boolean = {
    try {
      val hook = new Thread {
        override def run() {}
      }
      // scalastyle:off runtimeaddshutdownhook
      Runtime.getRuntime.addShutdownHook(hook)
      // scalastyle:on runtimeaddshutdownhook
      Runtime.getRuntime.removeShutdownHook(hook)
    } catch {
      case ise: IllegalStateException => return true
    }
    false
  }
  
  //添加鉤子
  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
    shutdownHooks.add(priority, hook)
  }
private [util] class SparkShutdownHookManager {
  // 每個鉤子都記錄到優(yōu)先隊列里
  private val hooks = new PriorityQueue[SparkShutdownHook]()
  @volatile private var shuttingDown = false

  /**
   * Install a hook to run at shutdown and run all registered hooks in order.
   */
  def install(): Unit = {
    val hookTask = new Runnable() {
      // runAll 函數(shù)會從優(yōu)先隊列中取出所有的鉤子并運行
      override def run(): Unit = runAll()
    }
    // 依賴Hadoop的ShutdownHookManager機制
    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
  }
}

Hadoop的ShutdownHookManager機制是通過JDK的addShutdownHook實現(xiàn),收到信號后,將所有的鉤子按照優(yōu)先級取出執(zhí)行

JDK ShutdownHook

java.lang.Runtime包內(nèi)的函數(shù):

addShutdownHook(Thread hook)

注冊一個JVM關(guān)閉的鉤子,JVM會為了響應(yīng)以下兩類事件而關(guān)閉:

  • 程序正常退出exits,當(dāng)最后的非守護線程退出時,或者在調(diào)用System.exit方法時
  • 因為用戶中斷,終止terminated虛擬機,例如Ctrl + C,或發(fā)生系統(tǒng)級別的事件,比如用戶退出(產(chǎn)生SIGHUP,系統(tǒng)的默認處理方式是終止進程)或系統(tǒng)關(guān)閉

shutdown hook是一個已初始化但尚未啟動的線程。虛擬機開始關(guān)閉時,它會以某種順序啟動所有已經(jīng)注冊的關(guān)閉鉤子,并讓它們同時運行。運行完所有的鉤子后,虛擬機就會停止。注意,關(guān)閉期間會繼續(xù)運行守護線程,如果通過調(diào)用exit方法來發(fā)起關(guān)閉,那么也會繼續(xù)運行非守護線程

一旦開始了關(guān)閉序列shutdown sequence,則只能通過調(diào)用halt方法來停止,此方法可強行終止虛擬機,在關(guān)閉過程中不能注冊新的關(guān)閉鉤子或取消注冊先前已注冊的鉤子。否則會導(dǎo)致拋出 IllegalStateException

shutdown hook在虛擬機生命周期中的特定時間運行,因此在編寫代碼時要特別注意,保證線程安全,并盡可能地避免死鎖

shutdown hook應(yīng)該快速地完成其工作。當(dāng)程序調(diào)用exit時,虛擬機應(yīng)該迅速地關(guān)閉并退出。由于用戶退出或系統(tǒng)關(guān)閉而終止虛擬機時,底層的操作系統(tǒng)可能只允許在固定的時間內(nèi)關(guān)閉并退出。因此在關(guān)閉鉤子中不應(yīng)該進行用戶交互或執(zhí)行長時間的計算

與其他所有線程一樣,通過調(diào)用線程所屬線程組的ThreadGroup.uncaughtException方法,可在關(guān)閉鉤子中處理未捕獲的異常。此方法的默認實現(xiàn)是將該異常的堆棧跟蹤打印至System.err并終止線程;它不會導(dǎo)致虛擬機退出或暫停。

僅在很少的情況下,虛擬機可能會中止abort,也就是沒有完全關(guān)閉就停止運行。虛擬機被外部終止時會出現(xiàn)這種現(xiàn)象,比如在Unix上使用SIGKILL信號。如果native方法出錯,虛擬機也會終止,例如內(nèi)部數(shù)據(jù)結(jié)構(gòu)損壞或試圖訪問不存在的內(nèi)存。如果虛擬機中止,則無法保證是否將運行關(guān)閉鉤子

File.deleteOnExit()進程結(jié)束后刪除文件,其實現(xiàn)機制就是ShutdownHook

JDK信號處理相關(guān)

主要是sun.misc包內(nèi)兩個類:SignalSignalHandler

Signal構(gòu)造函數(shù)

// 參數(shù)為對應(yīng)信號的名稱
public Signal(String var1)

通過kill -l可以查看信號的名稱

SignalHandler接口的核心是處理函數(shù)handle(Signal var1)

public interface SignalHandler {
    SignalHandler SIG_DFL = new NativeSignalHandler(0L);
    SignalHandler SIG_IGN = new NativeSignalHandler(1L);

    void handle(Signal var1);
}

靜態(tài)方法Signal.handle(Signal, SignalHandler)注冊Signal對應(yīng)的處理器

org.apache.spark.util.SignalUtils類中就添加了對"TERM", "HUP", "INT"三個信號的處理

  def registerLogger(log: Logger): Unit = synchronized {
    if (!loggerRegistered) {
      Seq("TERM", "HUP", "INT").foreach { sig =>
        SignalUtils.register(sig) {
          log.error("RECEIVED SIGNAL " + sig)
          false
        }
      }
      loggerRegistered = true
    }
  }

內(nèi)部創(chuàng)建對應(yīng)的信號Signal,以及對應(yīng)的ActionHandler,其實現(xiàn)了SignalHandler接口的handle方法,內(nèi)部會執(zhí)行注冊的方法,這里就是進行l(wèi)og輸出

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容