// tickMs 當前時間輪中一個時間格表示的時間跨度
// wheelSize 當前時間輪的格數(shù)
// startMs 時間輪的創(chuàng)建時間
// taskCounter 所有時間輪中任務的總數(shù)
// queue 所有時間輪共用的一個任務隊列,元素類型是TimerTaskList
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
// 當前時間輪的時間跨度
// 當前時間輪只能處理時間范圍在 currentTime~currentTime+interval之間的定時任務,
// 超過范圍要把任務添加到上層時間輪中
private[this] val interval = tickMs * wheelSize
// 每個成員對應時間輪里的一個時間格,保存TimerTaskList的數(shù)組
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
// currentTime是時間輪的指針,tickMs的整數(shù)倍(即只可能按時間格步長前進),將整個時間輪分為到期部分和未到期部分
// currentTime當前指向的時間格也屬于到期部分
// 初始化時近似等同于創(chuàng)建時間
private[this] var currentTime = startMs - (startMs % tickMs)
// 上層時間輪的引用
@volatile private[this] var overflowWheel: TimingWheel = null
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
false
} else if (expiration < currentTime + tickMs) {
// 任務已經過期了,即使是在當前指針指向的時間格也算過期
false
} else if (expiration < currentTime + interval) { // 在這個時間輪跨度內.添加到這個時間輪里
// 根據任務的失效時間分配時間格
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// 隨著currentTime后移,當前時間輪能處理的時間段也在不斷后移,
// 新來的TimerTaskEntity會添加到復用原來清理過的時間格
// 所以每次重置bucket的到期時間,保證最新
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// 超過時間輪跨度,添加到上層
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
// TimingWheel:嘗試推進當前和上層時間輪的指針
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
// 修整currentTime是tickMs的整數(shù)倍, 即減去整除后多余的余數(shù)
// 指針的前進并不是想象中的固定步長,而是直接跳到對應任務的超時時間
currentTime = timeMs - (timeMs % tickMs)
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[this] val expiration = new AtomicLong(-1L)
def setExpiration(expirationMs: Long): Boolean = {
// 這里判斷新添加任務的expiration和原來的是否一致,保證冪等
expiration.getAndSet(expirationMs) != expirationMs
}
}
// 執(zhí)行到期任務、阻塞等待最近到期任務
@threadsafe
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20,
startMs: Long = System.currentTimeMillis) extends Timer {
// 固定線程池,執(zhí)行到期任務
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread("executor-"+executorName, runnable, false)
})
// 所有時間輪共用隊列
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
// 最底層的時間輪
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue
)
// 同步時間輪currentTime修改的讀寫鎖
private[this] val readWriteLock = new ReentrantReadWriteLock()
// DelayedOperationPurgatory.tryCompleteElseWatch里如果未到時間的operation會觸發(fā)add
// DelayedOperationPurgatory是TimerTask的子類
// 實質就是加鎖版的addTimerTaskEntry
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
} finally {
readLock.unlock()
}
}
// 添加定時任務,未過期就加入時間輪,否則就執(zhí)行
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// 時間輪添加任務返回false說明已經過期,直接執(zhí)行該任務
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
// SystenTimer.advanClock
def advanceClock(timeoutMs: Long): Boolean = {
// 取出的是TimerTaskList類型成員
// 當TimerTaskList因為超時被輪詢出來并不一定代表里面所有的TimerTaskEntry一定就超時,
// 所以對于沒有超時的TimerTaskEntry需要重新加入到TimingWheel新的TimerTaskList中,對于超時的TimerTaskEntry則立即執(zhí)行任務。
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
// 定時隊列能取出任務說明任務已到期
writeLock.lock()
try {
while (bucket != null) {
// 從最底層的時間輪開始推進指針
timingWheel.advanceClock(bucket.getExpiration())
// 從隊列里取出的是TimerTaskList,然后遍歷List,每條Entity過期就執(zhí)行,未過期就重新從底層時間輪開始插入
// 不就重復插入了嗎?在哪里清空時間格的??
bucket.flush(reinsert)
// 此處poll不會阻塞
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
}
trait TimerTask extends Runnable {
// 延遲操作的延遲時長
val delayMs: Long
}
abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
// 此Operation是否完成
private val completed = new AtomicBoolean(false)
override def run(): Unit = {
if (forceComplete())
// 執(zhí)行延遲操作到期執(zhí)行的相關代碼
onExpiration()
}
def forceComplete(): Boolean = {
// 如果Operation沒有完成
// 這個CAS保證線程安全
if (completed.compareAndSet(false, true)) {
// 從TimerTaskList里刪除
cancel()
// 調用真正邏輯
onComplete()
true
} else {
false
}
}
// 具體子類的業(yè)務邏輯實現(xiàn)
def onComplete(): Unit
}
private class Watchers(val key: Any) {
// DelayedOperation隊列
private[this] val operations = new LinkedList[T]()
// 添加DelayOperation到隊列
def watch(t: T) {
operations synchronized operations.add(t)
}
def tryCompleteWatched(): Int = {
var completed = 0
operations synchronized {
// 遍歷operations隊列
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
// 其他線程完成了這個operation,這里就移除已完成的operation
iter.remove()
} else if (curr synchronized curr.tryComplete()) {
// 嘗試執(zhí)行未完成的operation,如果返回isCompleted=true,表示立刻就能完成就刪除
completed += 1
iter.remove()
}
}
}
// operations集合全部完成,從watchersForKey里刪除這個鍵值對
if (operations.size == 0)
removeKeyIfEmpty(key, this)
completed
}
}
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer, // SystemTimer對象
brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
// 管理watchers
// values是Watchers類型的對象,表示一個DelayedOperation集合,底層是LinkedList
// key是Watchers里DelayedOperation集合關心的對象(貌似關聯(lián)的key就GroupCoordinator和ReplicaManager
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
// 對watchersForKey同步的讀寫鎖
private val removeWatchersLock = new ReentrantReadWriteLock()
// delayedOperation的個數(shù)
private[this] val estimatedTotalOperations = new AtomicInteger(0)
// 主要作用:推進時間輪指針,定期清理watchersForKey中已完成的DelayedOperation
private val expirationReaper = new ExpiredOperationReaper()
if (reaperEnabled)
// 初始化時就啟動expirationReaper線程
expirationReaper.start()
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d".format(brokerId), false) {
// 輪詢檢查推進時間輪指針和清理完成的operation
override def doWork() {
// 時間輪和SystemTimer的指針都是從這個線程驅動推進的
advanceClock(200L)
}
def advanceClock(timeoutMs: Long) {
timeoutTimer.advanceClock(timeoutMs)
// 當DelayedOperationPurgatory與SystemTimer中的DelayOperation數(shù)量相差到一個閾值時,執(zhí)行清理工作
if (estimatedTotalOperations.get - delayed > purgeInterval) {
estimatedTotalOperations.getAndSet(delayed)
val purged = allWatchers.map(_.purgeCompleted()).sum
}
}
}
// 檢測指定單個DelayedOperation是否已經完成,若未完成則添加到watchesForKeys和SystemTimer中
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
var isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true
// 將DelayedOperation添加到所有key對應的Watchers中
// 一個DelayedOperation可能有多個watchKeys
var watchCreated = false
for(key <- watchKeys) {
if (operation.isCompleted())
// 若過程中被其他線程完成,則放棄后續(xù)添加過程
// ExpiredOperationReaper線程會定期清理watchersForKey,所以不需要清理之前添加的key
return false
// 添加DelayedOperation到watchersForKey中對應key的watchers
watchForOperation(key, operation)
if (!watchCreated) {
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}
isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true
// 將operation添加到SystemTimer里
// 同時SystemTimer也會把任務添加到時間輪里
if (! operation.isCompleted()) {
timeoutTimer.add(operation)
if (operation.isCompleted()) {
// 如果完成從SystemTimer里刪除
operation.cancel()
}
}
false
}
// 根據傳入的key,嘗試執(zhí)行對應的Watchers中的operation
def checkAndComplete(key: Any): Int = {
val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
watchers.tryCompleteWatched()
}
}
TimingWheel.scala
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
相關閱讀更多精彩內容
- 在創(chuàng)建scala項目的時候create Scala SDK: 這里選擇bin上一級目錄,然后點擊OK 這樣就出現(xiàn)了...
- 多維數(shù)組:數(shù)組的元素,還是數(shù)組,數(shù)組套數(shù)組,就是多維數(shù)組 構造指定行與列的二維數(shù)組:Array.ofDim方法 構...
- http://www.cnblogs.com/cbscan/articles/4147709.html
- Scala 篇 單例對象 在 Java 中實現(xiàn)單例對象通常需要自己實現(xiàn)一個類并創(chuàng)建 getInstance() 的...
- Overview 樣本類是 Scala 中使用關鍵字 case class 聲明的類。它可以隱式調用構造方法進行初...