1. 為什么要用MQ? MQ帶來了什么好處? 帶來了什么壞處?
-
為什么要用MQ?
MQ(message queue) 消息隊(duì)列. 最早的應(yīng)用都是單體架構(gòu)設(shè)計(jì)的, 隨著業(yè)務(wù)的發(fā)展, 單體架構(gòu)不足以滿足的時(shí)候, 服務(wù)化改造就出現(xiàn)了. 服務(wù)化改造出現(xiàn)帶來了一個(gè)問題就是服務(wù)之間的通信. 同步的通信代表就是RPC框架, 如Dubbo. 而異步通信代表就是MQ.
-
MQ帶來的好處?
MQ帶來的好處主要有三個(gè) : 異步, 解耦, 削峰.
在我看來, 解耦, 削峰都是異步帶來的產(chǎn)物.
解耦是因?yàn)樵黾恿薓Q這樣一個(gè)中間層, 服務(wù)的上游無需關(guān)注下游的狀態(tài), 下游也無需關(guān)注上游的狀態(tài).
削峰是在業(yè)務(wù)遇到突發(fā)的高峰時(shí), 大量的請求涌入, 通過MQ這種方式就可以先把請求積壓下來, 然后對下游消費(fèi)方可以削掉這個(gè)峰值的流量, 下游可以允許的pull消息來消費(fèi).
異步是請求過來,寫入消息隊(duì)列中, 就可以返回了. 后續(xù)下游通過消息隊(duì)列中消息來做消費(fèi)即可.
-
MQ帶來的壞處?
增加了一個(gè)MQ作為中間層, 帶來了以上這么多好處但是同時(shí)也有壞處.
系統(tǒng)的可用性降低. 增加一個(gè)組件的同時(shí), 就需要保證這個(gè)組件的可用性. 需要考慮在MQ不可用的時(shí)候, 整個(gè)服務(wù)會不會變得不可用, 甚至整個(gè)服務(wù)鏈路是否變得不可用.
系統(tǒng)的復(fù)雜性增高. 如何保證消息的冪等, 如何保證消息不丟失, 如何保證消息的順序型都是MQ帶來的問題.
一致性問題. 如何確保消費(fèi)真實(shí)的被消費(fèi)到.
2. 市面上常見的MQ有哪些, 性能對比.
RabbitMQ, Kafka, rocketMQ等數(shù)據(jù)可以百度查到.
QMQ的開發(fā)者又談到QMQ的性能和rocket的性能處于同一個(gè)數(shù)量級.
3. QMQ的特點(diǎn)是什么?

4. QMQ的存儲模型?
Kafka和RocketMQ都是基于partition的存儲模型, 也就是每一個(gè)subject分成一個(gè)或者多個(gè)partition, 同時(shí)consumer消費(fèi)的時(shí)候也是和partition一一對應(yīng)的.
如下 :

在這種設(shè)計(jì)的模式下, 如果consumer數(shù)目大于partition的數(shù)據(jù), 就會出現(xiàn)consumer處于空閑的狀態(tài).
如果partition數(shù)據(jù)大于consumer的數(shù)據(jù)就會出現(xiàn)部分consumer繁忙的狀況.
以上是用基于partition去做負(fù)載均衡所帶來的問題. 由于這種靜態(tài)綁定的關(guān)系, 如果遇到了消費(fèi)速度更不上消費(fèi)的速度, 單單的增加consumer是不夠的. 需要增加partition. 尤其是在kafka里, partition是一個(gè)比較重的資源, 增加太多的partition還需要考慮集群的處理能力; 同時(shí)當(dāng)高峰期過了之后, 如果想縮容consumer也是比較麻煩的, 因?yàn)閜artition只能增加, 不能減少.
上述設(shè)計(jì), 同時(shí)帶了另一個(gè)問題, 就是如果有消息積壓, 我們增加partition也是沒有用的, 因?yàn)橄M(fèi)已經(jīng)擠壓到已存在的partition中, 新增partition只能夠消費(fèi)新分配過來的數(shù)據(jù).

以上是QMQ的存儲模型, 方框上方的數(shù)字代表該方框自己在log中的偏移量, 方框中的數(shù)據(jù)代表該項(xiàng)的內(nèi)容. 如何message log上方的3,6,9表示這幾條消息在message log中的偏移量. 而consume log中方框內(nèi)的數(shù)據(jù)3,6,9,20對應(yīng)著message log的偏移, 表示這幾條消息都在topic1中, consume log 方框上方的1,2,3,4代表這幾個(gè)方框在consume log中的邏輯偏移. 下面的pull log 方框中的1,2,3,4對應(yīng)著consume log的邏輯偏移, 而pull log方框外的數(shù)字表示pull log的邏輯偏移.
message log 是所有消息的主存儲體, 所有topic的消息都進(jìn)入該log.
consume log 存儲的是message log的索引.
pull log 每個(gè)consumer拉取消息的時(shí)候會產(chǎn)生pull log, pull log 記錄的是拉取消息在consume log中的sequence.
這個(gè)時(shí)候消費(fèi)者就可以使用pull log上的sequence來表示消費(fèi)的進(jìn)度, 這樣一來我們就解耦了consumer和partition之間的耦合關(guān)系. 兩者可以任意擴(kuò)展.
5. QMQ是如何保證高可用的?
分片 + 復(fù)制.
分片
QMQ不是基于partition的, 可以通過增加更多的機(jī)器提高一個(gè)topic的可用性. 消息按照一定的負(fù)載均衡策略, 分配到不同的機(jī)器上, 某臺機(jī)器離線之后, producer將不再將消息發(fā)送到server.
復(fù)制
QMQ通過主從復(fù)制來提高單機(jī)高可用. QMQ將服務(wù)器劃分為過個(gè)group, 每一個(gè)group都包含多個(gè)master和slave, 消息的發(fā)送和消費(fèi)全部指向master, slave只保證可用性.
6. QMQ是如何保證冪等的?
Exactly once 消費(fèi)
一般的消息分為At Most Once, At Least Once, Exactly once. 而最后一種屬于我們最期望的一種模型, 同時(shí)這種模型的實(shí)現(xiàn)也不容易. 由于網(wǎng)絡(luò)和應(yīng)用依賴的復(fù)雜性, Exactly once基本不可行, 但是我們可以通過冪等處理來實(shí)現(xiàn)最終的Exactly once.
什么時(shí)候會出現(xiàn)重復(fù)消費(fèi)
發(fā)消息的時(shí)候, 網(wǎng)絡(luò)抖動, 導(dǎo)致發(fā)送超時(shí), 但是實(shí)際上server已經(jīng)成功收到消息, 只是server的ACK回到Producer的時(shí)候超時(shí)了. 這個(gè)時(shí)候Producer端為了確保不丟失往往會重試, 就會導(dǎo)致消息發(fā)送多次.
consumer在收到消息,進(jìn)行業(yè)務(wù)處理, 業(yè)務(wù)處理的過程中需要有外部依賴, 比如調(diào)用一個(gè)HTTP的接口, 這種情況也會有實(shí)際成功但是結(jié)果超時(shí)的情況, 這個(gè)時(shí)候會重發(fā)消息.
-
consumer收到消息處理成果后, 返回ack給server的時(shí)候由于網(wǎng)絡(luò)等原因?qū)е耡ck丟失, 也會導(dǎo)致消息重復(fù)消費(fèi).
QMQ怎么保證冪等.
基于DB的冪等處理器. 通過數(shù)據(jù)庫事務(wù)保證業(yè)務(wù)和去重是原子操作.
基于Redis的冪等處理器.
8. 延時(shí)消息和hash wheel timer算法.
QMQ中用到的HashWheelTimer是采用Netty得HashWheelTimer實(shí)現(xiàn)的。

如上面的這個(gè)圖, 假設(shè)時(shí)間輪大小為8(這個(gè)輪子底層用了一個(gè)數(shù)組實(shí)現(xiàn)的) 1s轉(zhuǎn)動一格, 每一格指向一個(gè)鏈表, 這個(gè)鏈表保存著待執(zhí)行的任務(wù)(TimeOutTask).
假設(shè)當(dāng)前位于2位置, 要添加一個(gè)3s后的任務(wù),則2+3=5, 在第五格的鏈表中添加一個(gè)節(jié)點(diǎn)指向任務(wù)即可, 標(biāo)識為round=0.
假設(shè)當(dāng)前位于2位置, 要添加一個(gè)10s后的任務(wù), (2 + 10) % 8 = 4 , 則在第4格添加一個(gè)節(jié)點(diǎn)指向任務(wù), 并標(biāo)識round=1, 則當(dāng)時(shí)間輪第二次經(jīng)過第4格的時(shí)候, 會執(zhí)行任務(wù).
時(shí)間輪只會執(zhí)行round=0的任務(wù), 并會把格子上的其他任務(wù)的round減1.
public class HashedWheelTimer implements Timer {
static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker<HashedWheelTimer> leak;
private final Worker worker = new Worker();
private final Thread workerThread;
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
private volatile long startTime;
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}
// 最終的構(gòu)造器.
// 默認(rèn)的tickDuration為100ms, 即走過每一個(gè)格子要100ms的時(shí)間
// 默認(rèn)的ticksPerWheel是512 即默認(rèn)的大小是512
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
boolean leakDetection, long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(
String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration,
Long.MAX_VALUE / wheel.length));
}
// 啟動一個(gè)worker線程
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
// 這里是過載保護(hù), 同時(shí)有64個(gè)Timer的時(shí)候,會拋出異常
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES
.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
// we have not yet shutdown then we want to make sure we decrement the active instance count.
// 一共最多有64個(gè)實(shí)例, 在最終被gc之前, 實(shí)例數(shù)目-1.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
}
}
// 構(gòu)造方法中調(diào)用, 構(gòu)建整個(gè)wheel
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// corner case
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// wheel數(shù)組的數(shù)量, 如果不是2的整數(shù)冪, 計(jì)算成為2的整數(shù)冪, 向上去冪
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// HashWheelBucket 數(shù)組, 其實(shí)就是hash輪的本質(zhì).
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
// 數(shù)組中填充對象, 這個(gè)對象里面有兩個(gè)HashedWheelTimeout指針,即頭指針和尾指針。
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
//按照翻譯, 顯式啟動background 線程, 如果沒有顯式啟動的話, 也會自動啟動。
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
// worker線程會啟動并在運(yùn)行時(shí)間startTime改為1. 這個(gè)為了確保worker啟動
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class
.getSimpleName());
}
// 通過cas方式將當(dāng)前狀態(tài)修改為shutdown狀態(tài)
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
// 通過csd確保是shutdown狀態(tài)之后, 實(shí)例數(shù)減一.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
// 如果worker線程的狀態(tài)是存活的, 就調(diào)用它的中斷方法, 中斷掉.
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
// 讓出cpu資源.
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
// 實(shí)例數(shù)減一
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (shouldLimitTimeouts()) {
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
+ ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts
+ ")");
}
}
// 這個(gè)調(diào)用的start()方法是自動啟動Timer , 也可手動啟動.
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 計(jì)算出到期的時(shí)間
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 創(chuàng)建hashedWheelTimeout對象
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 加入對列中
timeouts.add(timeout);
return timeout;
}
private boolean shouldLimitTimeouts() {
return maxPendingTimeouts > 0;
}
// 過多Timer
private static void reportTooManyInstances() {
String resourceType = simpleClassName(HashedWheelTimer.class);
logger.error("You are creating too many " + resourceType + " instances. " + resourceType
+ " is a shared resource that must be reused across the JVM,"
+ "so that only a few instances are created.");
}
// 執(zhí)行worker
private final class Worker implements Runnable {
// timeout的HashSet
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 格子數(shù)
private long tick;
@Override
public void run() {
// 初始化開始時(shí)間
startTime = System.nanoTime();
if (startTime == 0) {
// 修改狀態(tài)
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
// 返回deadline時(shí)間
final long deadline = waitForNextTick();
if (deadline > 0) {
// 計(jì)算當(dāng)前桶的index位置
int idx = (int) (tick & mask);
// 處理取消的任務(wù)
processCancelledTasks();
// 撈出budget
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// 防止操作太多, 浪費(fèi)太多計(jì)算時(shí)間, 一次最多操作10w個(gè)
for (int i = 0; i < 100000; i++) {
// 從任務(wù)隊(duì)列中撈出來
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
// 計(jì)算round
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 計(jì)算ticks
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// 計(jì)算在wheel里的位置
int stopIndex = (int) (ticks & mask);
// 放入雙向鏈表中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
*
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// 時(shí)間到了
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
// windows平臺需要計(jì)算一下
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
// 線程睡眠這些時(shí)間
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
private static final class HashedWheelTimeout implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;
// The bucket to which the timeout was added
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else if (timer.shouldLimitTimeouts()) {
timer.pendingTimeouts.decrementAndGet();
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: ");
if (remaining > 0) {
buf.append(remaining).append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining).append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(", task: ").append(task()).append(')').toString();
}
}
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(
String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
// 任務(wù)取消則移除
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 如果round > 0 說明還要轉(zhuǎn)好幾圈, 所以圈數(shù)-1
timeout.remainingRounds--;
}
timeout = next;
}
}
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
if (timeout.timer.shouldLimitTimeouts()) {
timeout.timer.pendingTimeouts.decrementAndGet();
}
return next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void clearTimeouts(Set<Timeout> set) {
for (; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
}
參考 :
https://wisewong.github.io/archives/e2d1a18d.html
https://cloud.tencent.com/developer/news/368207