PromiseTask源碼講解
在講解PromiseTask之前需要先介紹幾個(gè)之前沒(méi)有講述的接口定義,雖然PromiseTask繼承與DefaultPromise但是他們之間還是有差距的,之前一直再說(shuō)future的定義是一個(gè)任務(wù)管理器,那么DefaultPromise則就是實(shí)現(xiàn)管理器的公用方法,僅僅是對(duì)任務(wù)執(zhí)行的描述,但是并沒(méi)有真正的操作任務(wù),而PromiseTask則對(duì)任務(wù)做了操作的處理,所以他有一些獨(dú)特的接口需要實(shí)現(xiàn),那么下面將是對(duì)這些接口的定義。
//jdk定義的接口此接口就是為了將Runnable 和 future 整合到一起,這樣對(duì)此接口的實(shí)現(xiàn)都是可以進(jìn)行獨(dú)立運(yùn)行的future。
//感興趣的讀者可以去查找下在netty中是否使用DefaultPromise做了單獨(dú)的執(zhí)行任務(wù)。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
//jdk定義接口,線程執(zhí)行時(shí)必須傳入的接口定義,從而可以看出PromiseTask在內(nèi)部實(shí)現(xiàn)了run方法從而達(dá)到多線程運(yùn)行的效果。
public interface Runnable {
public abstract void run();
}
//這里繼承的Future接口是jdk的如果有疑問(wèn)請(qǐng)查看第一章線程結(jié)構(gòu)中有講解此接口的定義
//PromiseTask繼承與DefaultPromise并且實(shí)現(xiàn)了RunnableFuture接口
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
//之前在講解線程結(jié)構(gòu)的時(shí)候有講解過(guò)Callable的定義,如果有遺忘的讀者請(qǐng)回看回憶一下。
//之前一直在說(shuō)Future是任務(wù)管理,其中管理中有結(jié)果result,但是Runnable接口的run方法并沒(méi)有返回值,所以這里的做法是將Runnable接口轉(zhuǎn)換成Callable接口
//轉(zhuǎn)換很簡(jiǎn)單就是創(chuàng)建了一個(gè)適配器并且傳入結(jié)果和runnable接口
static <T> Callable<T> toCallable(Runnable runnable, T result) {
return new RunnableAdapter<T>(runnable, result);
}
//定義的內(nèi)部適配器類,此類繼承與Callable
private static final class RunnableAdapter<T> implements Callable<T> {
//定義Runnable接口的屬性
final Runnable task;
//和結(jié)果集屬性
final T result;
//構(gòu)造器中傳入Runnable和result
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
//當(dāng)運(yùn)行Callable的時(shí)候則會(huì)調(diào)用此方法,此方法的實(shí)現(xiàn)也很簡(jiǎn)單,調(diào)用Runnable的run方法并且返回創(chuàng)建適配器時(shí)傳入的result
@Override
public T call() {
task.run();
return result;
}
//重寫toString方法并且返回一些詳細(xì)信息
@Override
public String toString() {
return "Callable(task: " + task + ", result: " + result + ')';
}
}
//定義了Callable用于存儲(chǔ)線程執(zhí)行時(shí)需要執(zhí)行的任務(wù)
protected final Callable<V> task;
//構(gòu)造器,傳入一個(gè)線程執(zhí)行器和不要執(zhí)行的Runnable,最后還傳入了返回結(jié)果。
//之前說(shuō)定義說(shuō)過(guò)個(gè)Runnable是沒(méi)有返回值的所以需要在定義的時(shí)候就設(shè)置好結(jié)果,所以如果使用Runnable就必須要傳入結(jié)果
PromiseTask(EventExecutor executor, Runnable runnable, V result) {
this(executor, toCallable(runnable, result));
}
//構(gòu)造器,傳入執(zhí)行器和callable需要執(zhí)行的有結(jié)果任務(wù)。因?yàn)閭魅氲木褪莻€(gè)執(zhí)行有返回值的任務(wù)所以不用再傳入結(jié)果
//而上方的構(gòu)造器就是講Runnable封裝了下傳入此構(gòu)造器
PromiseTask(EventExecutor executor, Callable<V> callable) {
super(executor);
task = callable;
}
//hashCode 對(duì)象標(biāo)識(shí)
@Override
public final int hashCode() {
return System.identityHashCode(this);
}
//重寫了equals,任務(wù)直接的比較
@Override
public final boolean equals(Object obj) {
return this == obj;
}
//之前提到的run方法的實(shí)現(xiàn)。
//此處可能稍微有點(diǎn)抽象,因?yàn)椴](méi)有看到線程并且傳入的執(zhí)行器也沒(méi)有執(zhí)行,那是因?yàn)榇朔椒ㄔ谡{(diào)用的時(shí)候已經(jīng)有單獨(dú)的一個(gè)線程在調(diào)用了,所以看起來(lái)和正常方法沒(méi)有什么不一樣的。
@Override
public void run() {
try {
//首先判斷當(dāng)前任務(wù)狀態(tài)是否為不可取消狀態(tài),因?yàn)槿绻O(shè)置到這個(gè)狀態(tài)則表示當(dāng)前的任務(wù)正在運(yùn)行.
if (setUncancellableInternal()) {
//如果沒(méi)有運(yùn)行那么則調(diào)用task.call方法獲取執(zhí)行結(jié)果
V result = task.call();
//執(zhí)行完成則設(shè)置結(jié)果
setSuccessInternal(result);
}
} catch (Throwable e) {
//如果報(bào)錯(cuò)則設(shè)置異常結(jié)果
setFailureInternal(e);
}
}
//剩下的方法都是默認(rèn)的一些處理因?yàn)樗旅孢€會(huì)有繼承所以大多方法并沒(méi)有特殊含義。
//下面的方法再前面定義的時(shí)候都已經(jīng)講過(guò)所以這再不會(huì)進(jìn)行講解,因?yàn)椴](méi)有特殊的實(shí)現(xiàn),要么是調(diào)用父級(jí)要么是直接返回值數(shù)據(jù)。
@Override
public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException();
}
protected final Promise<V> setFailureInternal(Throwable cause) {
super.setFailure(cause);
return this;
}
@Override
public final boolean tryFailure(Throwable cause) {
return false;
}
protected final boolean tryFailureInternal(Throwable cause) {
return super.tryFailure(cause);
}
@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}
protected final Promise<V> setSuccessInternal(V result) {
super.setSuccess(result);
return this;
}
@Override
public final boolean trySuccess(V result) {
return false;
}
protected final boolean trySuccessInternal(V result) {
return super.trySuccess(result);
}
@Override
public final boolean setUncancellable() {
throw new IllegalStateException();
}
protected final boolean setUncancellableInternal() {
return super.setUncancellable();
}
//此處并沒(méi)有重寫父級(jí)的toString方法而是重寫了父類的toStringBuilder方法用于消息的處理。
@Override
protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ',');
return buf.append(" task: ")
.append(task)
.append(')');
}
}
到此PromiseTask到此結(jié)束了,接下來(lái)講解ScheduledFutureTask的實(shí)現(xiàn),但是在講解他之前需要再說(shuō)幾個(gè)定義和其內(nèi)部的實(shí)現(xiàn),因?yàn)閮?nèi)部依賴否則會(huì)講的非常模糊。
由于ScheduledFuture已經(jīng)在第二篇中講解過(guò)所以這里不再講解
//此接口是為了維護(hù)隊(duì)列內(nèi)部數(shù)據(jù)使用的,只有在DefaultPriorityQueue中使用到了,其他地方不應(yīng)該調(diào)用此接口定義的方法
//這里的維護(hù)僅僅是記錄ScheduledFutureTask具體任務(wù)在隊(duì)列中的下標(biāo)地址,而下面的兩個(gè)方法都是對(duì)下標(biāo)做的操作。
//這里比較抽象,讀者可以這樣想,任務(wù)并不是一個(gè)是多個(gè)但是他們都需要執(zhí)行,只能一個(gè)一個(gè)去執(zhí)行,這樣就需要一個(gè)隊(duì)列去進(jìn)行排序排到第幾個(gè)則為了后面能快速獲取到當(dāng)前任務(wù)的下標(biāo)所以這里實(shí)現(xiàn)了這個(gè)接口這算是個(gè)優(yōu)化點(diǎn),因?yàn)槿绻贿M(jìn)行記錄那么可能會(huì)導(dǎo)致,如果獲取當(dāng)前下標(biāo)則需要進(jìn)行遍歷比較,這樣是非常消耗性能的,所以這個(gè)做法挺不錯(cuò),可以拿來(lái)借鑒。
public interface PriorityQueueNode {
//定義了一個(gè)常量值,不存在隊(duì)列中的index
int INDEX_NOT_IN_QUEUE = -1;
//獲取在傳入隊(duì)列中的下標(biāo)地址,具體看實(shí)現(xiàn)。
int priorityQueueIndex(DefaultPriorityQueue<?> queue);
//設(shè)置當(dāng)前的任務(wù)在隊(duì)列中的下標(biāo)位置,int i則是對(duì)于的index
void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i);
}
在講解前說(shuō)過(guò),會(huì)有幾個(gè)定義,但是在筆者閱讀總結(jié)的時(shí)候發(fā)現(xiàn),剩下的定義是講解過(guò)的所以這里只講述為講過(guò)的定義。接下來(lái)就是對(duì)ScheduledFutureTask的實(shí)現(xiàn)講解。
//延遲任務(wù)管理,不僅支持延遲執(zhí)行也可以根據(jù)周期一直運(yùn)行。
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
//下一次任務(wù)的執(zhí)行id,
private static final AtomicLong nextTaskId = new AtomicLong();
//任務(wù)的創(chuàng)建時(shí)間,創(chuàng)建了當(dāng)前任務(wù)則代表已經(jīng)開(kāi)始,因?yàn)槿绻茄舆t任務(wù)那么就要從創(chuàng)建開(kāi)始進(jìn)行計(jì)時(shí)。
private static final long START_TIME = System.nanoTime();
//獲取當(dāng)前時(shí)間減去創(chuàng)建時(shí)間的時(shí)間差
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
//獲取最后一次的執(zhí)行時(shí)間,此方法一般用于循環(huán)任務(wù)
static long deadlineNanos(long delay) {
//使用當(dāng)前時(shí)間減去任務(wù)開(kāi)始時(shí)間并且加上周期不管怎么算都會(huì)是下一次的執(zhí)行時(shí)間的間隔
//這里稍微有點(diǎn)繞,此處并不是使用具體的時(shí)間進(jìn)行比較的而是使用時(shí)間段進(jìn)行比較的,比如開(kāi)始時(shí)間是00:00:00而當(dāng)前時(shí)間是00:00:01他們的時(shí)間段就是1s而下一次執(zhí)行周期計(jì)算應(yīng)該是2s如果這樣比較那么此條件不成立則不執(zhí)行,直到當(dāng)前時(shí)間00:00:02的時(shí)候才進(jìn)行執(zhí)行。而此方法就是獲取下一次執(zhí)行周期的計(jì)算結(jié)果。
long deadlineNanos = nanoTime() + delay;
//這里防止計(jì)算錯(cuò)誤導(dǎo)致程序錯(cuò)誤所以做了對(duì)應(yīng)的處理
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
//獲取當(dāng)前的id,CAS之前說(shuō)到過(guò),這里是將nextTaskId字段進(jìn)行加一進(jìn)行返回
private final long id = nextTaskId.getAndIncrement();
//記錄任務(wù)執(zhí)行的周期疊加數(shù),之前介紹了他的計(jì)算是計(jì)算的時(shí)間段而每個(gè)時(shí)間段執(zhí)行都需要疊加上周期這樣才能保證執(zhí)行時(shí)間的準(zhǔn)確
//這里提一下可能有人會(huì)發(fā)現(xiàn)START_TIME是static描述的不管是那個(gè)對(duì)象來(lái)使用都會(huì)是一樣開(kāi)始時(shí)間,而為了保證執(zhí)行的準(zhǔn)確性再添加任務(wù)的時(shí)候回將已過(guò)去的周期疊加到此字段,就是調(diào)用了deadlineNanos方法,這里提到可能會(huì)有些抽象,后面使用的時(shí)候自然就會(huì)清楚。
private long deadlineNanos;
//周期時(shí)長(zhǎng),這里需要注意這個(gè)周期有三個(gè)狀態(tài)
//等于0的時(shí)候不會(huì)循環(huán)執(zhí)行
//小于0則使用scheduleWithFixedDelay方法的算法,下一次執(zhí)行時(shí)間是上次執(zhí)行結(jié)束的時(shí)間加周期
//大于0則使用scheduleAtFixedRate方法的算法,下一次執(zhí)行時(shí)間是上一次執(zhí)行時(shí)間加周期
//大于小于的兩者差距在前面詳細(xì)的介紹過(guò)遺忘的讀者可以再去閱讀。
private final long periodNanos;
//剛才講述了PriorityQueueNode有說(shuō)存儲(chǔ)當(dāng)前node是在隊(duì)列中的那個(gè)下標(biāo),而此變量則是對(duì)列存儲(chǔ)的下標(biāo)
private int queueIndex = INDEX_NOT_IN_QUEUE;
//構(gòu)造器,傳入執(zhí)行器、運(yùn)行的Runnable,因?yàn)槭荝unnable所以傳入了result,執(zhí)行的時(shí)間。
//可以看出此方法是延遲執(zhí)行任務(wù)的構(gòu)造,因?yàn)闆](méi)有傳入周期,執(zhí)行一次即可結(jié)束。
//此處的執(zhí)行時(shí)間是執(zhí)行開(kāi)始時(shí)間,而這個(gè)時(shí)間的算法就是deadlineNanos方法的調(diào)用
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Runnable runnable, V result, long nanoTime) {
this(executor, toCallable(runnable, result), nanoTime);
}
//構(gòu)造器,傳入執(zhí)行器,運(yùn)行的Callable,執(zhí)行時(shí)間,周期
//此處只支持period大于0或者小于0,如果等于0則會(huì)拋出異常
//而period就是對(duì)periodNanos的賦值之前講述過(guò)他的差異
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}
//此處是對(duì)第一個(gè)構(gòu)造的一個(gè)調(diào)用實(shí)現(xiàn),因?yàn)榈谝粋€(gè)構(gòu)造傳入的是Runnable而ScheduledFutureTask使用的任務(wù)是Callable所以第一個(gè)構(gòu)造調(diào)用了一個(gè)轉(zhuǎn)換的方法然后調(diào)用此構(gòu)造
//可以看出默認(rèn)周期是0則代表此構(gòu)造是不重復(fù)運(yùn)行的
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
//獲取執(zhí)行器
@Override
protected EventExecutor executor() {
return super.executor();
}
//獲取執(zhí)行時(shí)間
public long deadlineNanos() {
return deadlineNanos;
}
//獲取當(dāng)前時(shí)間還有多久到下一次執(zhí)行時(shí)間
//獲取獲取時(shí)間和下一次執(zhí)行時(shí)間的差,如果當(dāng)前時(shí)間已經(jīng)超過(guò)下一次執(zhí)行時(shí)間則返回0
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
//上一個(gè)方法使用的是當(dāng)前時(shí)間而此方法使用的是傳入的指定時(shí)間
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
//將獲取到的時(shí)長(zhǎng)轉(zhuǎn)為指定的時(shí)間類型,獲取到的試納秒如果傳入的unit是秒或者毫秒則會(huì)轉(zhuǎn)成對(duì)象的時(shí)長(zhǎng)返回
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
//之前再說(shuō)Delayed接口的時(shí)候,此結(jié)構(gòu)基礎(chǔ)過(guò)Comparable接口所以整個(gè)方法實(shí)現(xiàn)的Comparable接口的方法
//此方法是比較兩個(gè)ScheduledFutureTask的周期任務(wù)是下次執(zhí)行的時(shí)長(zhǎng),因?yàn)榧热皇窃陉?duì)列中那么每次彈出的任務(wù)都會(huì)是頭部的,所以是為了將先執(zhí)行的任務(wù)排到隊(duì)列頭使用。
//此函數(shù)具體的返回值需要根據(jù)使用出做出判定此處不做解釋
@Override
public int compareTo(Delayed o) {
//如果兩個(gè)對(duì)象比較相等則返回0
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
//當(dāng)前的執(zhí)行時(shí)間減去傳入的執(zhí)行時(shí)間,獲取的就是他們的差數(shù)
long d = deadlineNanos() - that.deadlineNanos();
//如果小于0 則代表當(dāng)前的時(shí)間執(zhí)行早于傳入的時(shí)間則返回-1
if (d < 0) {
return -1;
//如果大于0則代表當(dāng)前任務(wù)晚于傳入的時(shí)間則返回1
} else if (d > 0) {
return 1;
//如果他倆下一個(gè)周期時(shí)間相等則代表d是0,則判斷他當(dāng)前的id是否小于傳入的id,如果小則代表當(dāng)前任務(wù)優(yōu)先于傳入的任務(wù)則返回-1
} else if (id < that.id) {
return -1;
//如果兩個(gè)id相等則拋出異常
} else if (id == that.id) {
throw new Error();
//否則傳入的任務(wù)優(yōu)先于當(dāng)前的任務(wù),此處結(jié)論是根據(jù)調(diào)用出總結(jié)。
} else {
return 1;
}
}
//最終的運(yùn)行run方法
@Override
public void run() {
//如果當(dāng)前線程不是傳入的執(zhí)行器線程則會(huì)拋出斷言異常當(dāng)然如果運(yùn)行時(shí)沒(méi)有開(kāi)啟斷言關(guān)鍵字那么次代碼無(wú)效
assert executor().inEventLoop();
try {
//檢查是否周期為0之前說(shuō)過(guò)如果是0則不進(jìn)行循環(huán)
if (periodNanos == 0) {
//與父級(jí)的使用相同設(shè)置為狀態(tài)為正在運(yùn)運(yùn)行
if (setUncancellableInternal()) {
//執(zhí)行任務(wù)
V result = task.call();
//設(shè)置為成功
setSuccessInternal(result);
}
} else {
//檢查當(dāng)前的任務(wù)是否被取消了
if (!isCancelled()) {
//如果沒(méi)有則調(diào)用call,因?yàn)槟苓M(jìn)入這里都是循環(huán)執(zhí)行的任務(wù)所以沒(méi)有返回值
task.call();
//并且判斷當(dāng)前的執(zhí)行器是否已經(jīng)關(guān)閉
if (!executor().isShutdown()) {
//將當(dāng)前的周期時(shí)間賦值給p
long p = periodNanos;
//如果當(dāng)前周期大于0則代表當(dāng)前時(shí)間添加周期時(shí)間
//這里需要注意當(dāng)前時(shí)間包括了不包括執(zhí)行時(shí)間
//這樣說(shuō)可能有點(diǎn)繞,這樣理解這里的p是本次執(zhí)行是在開(kāi)始的準(zhǔn)時(shí)間,什么是準(zhǔn)時(shí)間?就是無(wú)視任務(wù)的執(zhí)行時(shí)間以周期時(shí)間和執(zhí)行開(kāi)始時(shí)間計(jì)算。
//scheduleAtFixedRate方法的算法,通過(guò)下面的deadlineNanos+=p也是可以看出的。
if (p > 0) {
deadlineNanos += p;
} else {
//此處小于0 則就需要將當(dāng)前程序的運(yùn)行時(shí)間也要算進(jìn)去所以使用了當(dāng)前時(shí)間加周期,p因?yàn)樾∮?所以負(fù)負(fù)得正了
deadlineNanos = nanoTime() - p;
}
//如果還沒(méi)有取消當(dāng)前任務(wù)
if (!isCancelled()) {
//獲取任務(wù)隊(duì)列并且將當(dāng)前的任務(wù)在丟進(jìn)去,因?yàn)橐呀?jīng)計(jì)算完下一次執(zhí)行的時(shí)間了所以當(dāng)前任務(wù)已經(jīng)是一個(gè)新的任務(wù),最起碼執(zhí)行時(shí)間改變了
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
}
} catch (Throwable cause) {
//如果出現(xiàn)異常則設(shè)置為失敗
setFailureInternal(cause);
}
}
//取消當(dāng)前任務(wù)所以需要從任務(wù)隊(duì)列中移除當(dāng)前任務(wù)
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}
//取消不刪除則直接調(diào)用父級(jí)方法不做任務(wù)的刪除,
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
//和之前已經(jīng)重寫了父類的toString打印的詳細(xì)信息
@Override
protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ',');
return buf.append(" id: ")
.append(id)
.append(", deadline: ")
.append(deadlineNanos)
.append(", period: ")
.append(periodNanos)
.append(')');
}
//獲取在隊(duì)列中的位置
@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
return queueIndex;
}
//設(shè)置當(dāng)前任務(wù)在隊(duì)列中的位置
@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
queueIndex = i;
}
}
到此PromiseTask的分支基本講解完畢了還有一個(gè)RunnableScheduledFutureTask它的定義是一個(gè)內(nèi)部類到時(shí)候再講對(duì)于執(zhí)行器的時(shí)候會(huì)將此類一起講解。