TimingWheel.scala

// tickMs 當前時間輪中一個時間格表示的時間跨度
// wheelSize 當前時間輪的格數(shù)
// startMs 時間輪的創(chuàng)建時間
// taskCounter 所有時間輪中任務的總數(shù)
// queue 所有時間輪共用的一個任務隊列,元素類型是TimerTaskList
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  // 當前時間輪的時間跨度
  // 當前時間輪只能處理時間范圍在 currentTime~currentTime+interval之間的定時任務,
  // 超過范圍要把任務添加到上層時間輪中
  private[this] val interval = tickMs * wheelSize
  // 每個成員對應時間輪里的一個時間格,保存TimerTaskList的數(shù)組
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  // currentTime是時間輪的指針,tickMs的整數(shù)倍(即只可能按時間格步長前進),將整個時間輪分為到期部分和未到期部分
  // currentTime當前指向的時間格也屬于到期部分
  // 初始化時近似等同于創(chuàng)建時間
  private[this] var currentTime = startMs - (startMs % tickMs)
  // 上層時間輪的引用
  @volatile private[this] var overflowWheel: TimingWheel = null

  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs

    if (timerTaskEntry.cancelled) {
      false
    } else if (expiration < currentTime + tickMs) {
      // 任務已經過期了,即使是在當前指針指向的時間格也算過期
      false
    } else if (expiration < currentTime + interval) { // 在這個時間輪跨度內.添加到這個時間輪里
      // 根據任務的失效時間分配時間格
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // 隨著currentTime后移,當前時間輪能處理的時間段也在不斷后移,
      // 新來的TimerTaskEntity會添加到復用原來清理過的時間格
      // 所以每次重置bucket的到期時間,保證最新
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    } else {
      // 超過時間輪跨度,添加到上層
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

  // TimingWheel:嘗試推進當前和上層時間輪的指針
  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      // 修整currentTime是tickMs的整數(shù)倍, 即減去整除后多余的余數(shù)
      // 指針的前進并不是想象中的固定步長,而是直接跳到對應任務的超時時間
      currentTime = timeMs - (timeMs % tickMs)

      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
}

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val expiration = new AtomicLong(-1L)

  def setExpiration(expirationMs: Long): Boolean = {
    // 這里判斷新添加任務的expiration和原來的是否一致,保證冪等
    expiration.getAndSet(expirationMs) != expirationMs
  }

}

// 執(zhí)行到期任務、阻塞等待最近到期任務
@threadsafe
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20,
                  startMs: Long = System.currentTimeMillis) extends Timer {
  // 固定線程池,執(zhí)行到期任務
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
    def newThread(runnable: Runnable): Thread =
      Utils.newThread("executor-"+executorName, runnable, false)
  })

  // 所有時間輪共用隊列
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  private[this] val taskCounter = new AtomicInteger(0)
  // 最底層的時間輪
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue
  )
  // 同步時間輪currentTime修改的讀寫鎖
  private[this] val readWriteLock = new ReentrantReadWriteLock()

  // DelayedOperationPurgatory.tryCompleteElseWatch里如果未到時間的operation會觸發(fā)add
  // DelayedOperationPurgatory是TimerTask的子類
  // 實質就是加鎖版的addTimerTaskEntry
  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
    } finally {
      readLock.unlock()
    }
  }

  // 添加定時任務,未過期就加入時間輪,否則就執(zhí)行
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // 時間輪添加任務返回false說明已經過期,直接執(zhí)行該任務
      if (!timerTaskEntry.cancelled)
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }

  // SystenTimer.advanClock
  def advanceClock(timeoutMs: Long): Boolean = {
    // 取出的是TimerTaskList類型成員
    // 當TimerTaskList因為超時被輪詢出來并不一定代表里面所有的TimerTaskEntry一定就超時,
    // 所以對于沒有超時的TimerTaskEntry需要重新加入到TimingWheel新的TimerTaskList中,對于超時的TimerTaskEntry則立即執(zhí)行任務。
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      // 定時隊列能取出任務說明任務已到期
      writeLock.lock()
      try {
        while (bucket != null) {
          // 從最底層的時間輪開始推進指針
          timingWheel.advanceClock(bucket.getExpiration())
          // 從隊列里取出的是TimerTaskList,然后遍歷List,每條Entity過期就執(zhí)行,未過期就重新從底層時間輪開始插入
          // 不就重復插入了嗎?在哪里清空時間格的??
          bucket.flush(reinsert)
          // 此處poll不會阻塞
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
}

trait TimerTask extends Runnable {
  // 延遲操作的延遲時長
  val delayMs: Long
}

abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
  // 此Operation是否完成
  private val completed = new AtomicBoolean(false)

  override def run(): Unit = {
    if (forceComplete())
      // 執(zhí)行延遲操作到期執(zhí)行的相關代碼
      onExpiration()
  }

  def forceComplete(): Boolean = {
    // 如果Operation沒有完成
    // 這個CAS保證線程安全
    if (completed.compareAndSet(false, true)) {
      // 從TimerTaskList里刪除
      cancel()
      // 調用真正邏輯
      onComplete()
      true
    } else {
      false
    }
  }

  // 具體子類的業(yè)務邏輯實現(xiàn)
  def onComplete(): Unit
}

private class Watchers(val key: Any) {

  // DelayedOperation隊列
  private[this] val operations = new LinkedList[T]()

  // 添加DelayOperation到隊列
  def watch(t: T) {
    operations synchronized operations.add(t)
  }

  def tryCompleteWatched(): Int = {

    var completed = 0
    operations synchronized {
      // 遍歷operations隊列
      val iter = operations.iterator()
      while (iter.hasNext) {
        val curr = iter.next()
        if (curr.isCompleted) {
          // 其他線程完成了這個operation,這里就移除已完成的operation
          iter.remove()
        } else if (curr synchronized curr.tryComplete()) {
          // 嘗試執(zhí)行未完成的operation,如果返回isCompleted=true,表示立刻就能完成就刪除
          completed += 1
          iter.remove()
        }
      }
    }

    // operations集合全部完成,從watchersForKey里刪除這個鍵值對
    if (operations.size == 0)
      removeKeyIfEmpty(key, this)

    completed
  }

}

class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
         timeoutTimer: Timer, // SystemTimer對象
         brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true)
        extends Logging with KafkaMetricsGroup {

  // 管理watchers
  // values是Watchers類型的對象,表示一個DelayedOperation集合,底層是LinkedList
  // key是Watchers里DelayedOperation集合關心的對象(貌似關聯(lián)的key就GroupCoordinator和ReplicaManager
  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
  // 對watchersForKey同步的讀寫鎖
  private val removeWatchersLock = new ReentrantReadWriteLock()
  // delayedOperation的個數(shù)
  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  // 主要作用:推進時間輪指針,定期清理watchersForKey中已完成的DelayedOperation
  private val expirationReaper = new ExpiredOperationReaper()

  if (reaperEnabled)
    // 初始化時就啟動expirationReaper線程
    expirationReaper.start()

  private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d".format(brokerId), false) {
    // 輪詢檢查推進時間輪指針和清理完成的operation
    override def doWork() {
      // 時間輪和SystemTimer的指針都是從這個線程驅動推進的
      advanceClock(200L)
    }

    def advanceClock(timeoutMs: Long) {
      timeoutTimer.advanceClock(timeoutMs)
      // 當DelayedOperationPurgatory與SystemTimer中的DelayOperation數(shù)量相差到一個閾值時,執(zhí)行清理工作
      if (estimatedTotalOperations.get - delayed > purgeInterval) {
        estimatedTotalOperations.getAndSet(delayed)
        val purged = allWatchers.map(_.purgeCompleted()).sum
      }
    }
  }

  // 檢測指定單個DelayedOperation是否已經完成,若未完成則添加到watchesForKeys和SystemTimer中
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    var isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 將DelayedOperation添加到所有key對應的Watchers中
    // 一個DelayedOperation可能有多個watchKeys
    var watchCreated = false
    for(key <- watchKeys) {
      if (operation.isCompleted())
        // 若過程中被其他線程完成,則放棄后續(xù)添加過程
        // ExpiredOperationReaper線程會定期清理watchersForKey,所以不需要清理之前添加的key
        return false
      // 添加DelayedOperation到watchersForKey中對應key的watchers
      watchForOperation(key, operation)

      if (!watchCreated) {
        watchCreated = true
        estimatedTotalOperations.incrementAndGet()
      }
    }

    isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 將operation添加到SystemTimer里
    // 同時SystemTimer也會把任務添加到時間輪里
    if (! operation.isCompleted()) {
      timeoutTimer.add(operation)
      if (operation.isCompleted()) {
        // 如果完成從SystemTimer里刪除
        operation.cancel()
      }
    }

    false
  }

  // 根據傳入的key,嘗試執(zhí)行對應的Watchers中的operation
  def checkAndComplete(key: Any): Int = {
    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
      watchers.tryCompleteWatched()
  }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容