Netty的線程結(jié)構(gòu)圖

? ? ?可以從上圖看出netty的線程實(shí)現(xiàn)由很多種但是他們都是來(lái)自于jdk自帶的Executor和ExecutorService。
? ? ?這里稍微提一下Executor他很簡(jiǎn)單只有一個(gè)execut方法而這個(gè)方法只有一個(gè)參數(shù)是Runnable,對(duì)于開(kāi)發(fā)者這個(gè)類型很熟悉所有線程的執(zhí)行都需要有他,并且該接口也只有一個(gè)方法就是run()。
? ? ?上文大概說(shuō)了下Executor的結(jié)構(gòu),這里說(shuō)下他的定義,他是一個(gè)執(zhí)行器的定義,因?yàn)閺膱D中可以看出他是一個(gè)接口。
ExecutorService是一個(gè)服務(wù)可以當(dāng)做管理工具,他管理了執(zhí)行器的創(chuàng)建和停止,既然是執(zhí)行器服務(wù)那么就代表著他是可以管理多個(gè)執(zhí)行器的否則也就沒(méi)有意義,但是也是一個(gè)接口對(duì)于他的定義如下。
//停止執(zhí)行器,雖然停止但是會(huì)將在停止前已經(jīng)存在的任務(wù)將會(huì)嘗試停止如果失敗則繼續(xù)執(zhí)行完成為止。
void shutdown();
//停止執(zhí)行器,相比上方的停止這個(gè)就暴力一些,立馬停止并且取消停止前已經(jīng)存在的任務(wù)并且嘗試關(guān)閉正在執(zhí)行的任務(wù)。
List<Runnable> shutdownNow();
//執(zhí)行器服務(wù)是否停止服務(wù)(由子類實(shí)現(xiàn)決定)
boolean isShutdown();
//該方法是對(duì)shutdown或shutdownNow的狀態(tài)獲取是否終止完成。如果沒(méi)有調(diào)用這兩個(gè)方法那么此方法永遠(yuǎn)是false(具體情況有子類決定)
boolean isTerminated();
//等待終止服務(wù),此方法不會(huì)有任何的操作僅僅是等待任務(wù)的執(zhí)行,會(huì)有兩個(gè)結(jié)果1、要么執(zhí)行結(jié)束2、要么執(zhí)行時(shí)間超過(guò)操作時(shí)間
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//提交一個(gè)可以帶返回值的任務(wù)(下方會(huì)詳細(xì)介紹Future這個(gè)類)
//Callable只有一個(gè)方法 V call(),F(xiàn)uture會(huì)默認(rèn)將他的返回存到自己的成員變量中返回,調(diào)用get方法獲取
<T> Future<T> submit(Callable<T> task);
//提交一個(gè)Runnable任務(wù)會(huì)默認(rèn)的將傳入的result存入到一個(gè)Future中在進(jìn)行返回
<T> Future<T> submit(Runnable task, T result);
//提交一個(gè)Runnable任務(wù)會(huì)默認(rèn)將null存入Future的成員變量中通過(guò)get獲得
Future<?> submit(Runnable task);
//傳入一個(gè)Callable類型集合并且等到集合處理完成一起返回,最終返回一個(gè)對(duì)應(yīng)結(jié)果的Future集合。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//相比上方此方法設(shè)置了超時(shí)時(shí)間,等待任務(wù)的超時(shí),如果超時(shí)則進(jìn)行任務(wù)的取消處理,繼續(xù)會(huì)放回對(duì)應(yīng)的結(jié)果列表但是如果是被取消的結(jié)果則會(huì)拋出取消異常,這代表著如果在不缺點(diǎn)是否成功的時(shí)候需要調(diào)用isCancelled方法此方法校驗(yàn)當(dāng)前的Future是否取消如果取消了則不要調(diào)用get因?yàn)闀?huì)出異常。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//傳入指定的任務(wù)集合只要其中任何一個(gè)任務(wù)完成則取消其他任務(wù)并且返回完成的任務(wù)的返回值,這里并不是Future而是Callable的返回類型
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
//傳入指定的任務(wù)集合只要啟動(dòng)任何一個(gè)任務(wù)完成則取消其他的任務(wù)執(zhí)行并且返回第一執(zhí)行完成的任務(wù)結(jié)果,如果連第一個(gè)任務(wù)執(zhí)行都超時(shí)了那么不會(huì)返回結(jié)果會(huì)直接拋出TimeOut的超時(shí)異常。
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService的定義到此結(jié)束了。接下來(lái)講述他依賴的類型Future和Callable。
//這個(gè)接口非常簡(jiǎn)單定義一個(gè)call方法此方法只有兩個(gè)結(jié)果要么拋出業(yè)務(wù)異常,要么返回計(jì)算的結(jié)果,而結(jié)果類型則是創(chuàng)建時(shí)設(shè)置的泛型V。
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
//嘗試取消任務(wù)執(zhí)行,如果取消失敗則可能是任務(wù)已完成或者已經(jīng)取消或其他原因?qū)е率。绻晒Ψ譃閮煞N情況1、任務(wù)未運(yùn)行則永遠(yuǎn)不會(huì)執(zhí)行2、任務(wù)已經(jīng)執(zhí)行mayInterruptIfRunning需要判斷他是否為true如果是則嘗試中斷執(zhí)行。
boolean cancel(boolean mayInterruptIfRunning);
//如果是主動(dòng)取消的則返回true,否則任何形式的結(jié)束此方法都是false
boolean isCancelled();
//在任何情況下都會(huì)返回true,除非是新new的調(diào)用此方法是false具體看子類狀態(tài)的實(shí)現(xiàn)。
boolean isDone();
//等待計(jì)算完成獲取結(jié)果,有等待就代表是阻塞的。如果執(zhí)行異常則會(huì)拋出
V get() throws InterruptedException, ExecutionException;
//等待指定的時(shí)間獲取結(jié)果如果超時(shí)則拋出TimeoutException,如果執(zhí)行異常則會(huì)拋出
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException,TimeoutException;
}
這里介紹完了ExecutorService的結(jié)構(gòu)和他依賴的結(jié)構(gòu),因?yàn)槎际嵌x所以就只用知道他們是干嘛的使用者又是怎么組合這些定義最終完成什么功能的,所以接下來(lái)還是定義。。。,不過(guò)是關(guān)于ScheduledExecutorService的定義,細(xì)心的讀者會(huì)發(fā)現(xiàn)筆者的講解流程是根據(jù)線程池的結(jié)構(gòu)圖一層一層講解的。
//可以看出他繼承與ExecutorService,并且對(duì)ExecutorService的結(jié)構(gòu)進(jìn)行了擴(kuò)展
public interface ScheduledExecutorService extends ExecutorService {
//創(chuàng)建一個(gè)延遲執(zhí)行的任務(wù),此任務(wù)延遲時(shí)間有delay與unit控制,返回值Future的get方法返回null,因?yàn)閞un方法并沒(méi)有返回值所以返回默認(rèn)值
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//是上方方法的重構(gòu),將Runnable改為Callable,因?yàn)閭魅氲脑嘋allable所以在Future的get方法返回call方法的返回值,看不明白的讀者可以根據(jù)上方的Callable的講解進(jìn)行理解。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//之前創(chuàng)建的延遲啟動(dòng)的任務(wù)都只執(zhí)行一次,此方法是根據(jù)initialDelay為運(yùn)行起點(diǎn)等待設(shè)置的啟動(dòng)時(shí)長(zhǎng),然后以period為周期的循環(huán)執(zhí)行任務(wù)。除非取消任務(wù)或者中斷調(diào)用者線程否則次任務(wù)永久運(yùn)行,因?yàn)槭怯谰眠\(yùn)行所以無(wú)返回值,默認(rèn)都為NULL。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//scheduleAtFixedRate的啟動(dòng)一樣都是指定initialDelay的時(shí)長(zhǎng)后運(yùn)行,也是在指定的周期中重復(fù)運(yùn)行,除非遇到異?;蚓€程中斷和取消執(zhí)行否則將永久運(yùn)行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
//那么scheduleAtFixedRate和scheduleWithFixedDelay他們的區(qū)別是什么?
//scheduleWithFixedDelay:在計(jì)算周期的時(shí)候會(huì)將任務(wù)的完成時(shí)間算進(jìn)去,意思就是如果任務(wù)執(zhí)行了1s而周期是2s那么下次執(zhí)行的時(shí)間會(huì)是啟動(dòng)時(shí)間+3s。
//對(duì)應(yīng)公式:下一次執(zhí)行開(kāi)始時(shí)間=上一次執(zhí)行結(jié)束時(shí)間+周期+上一次執(zhí)行的時(shí)長(zhǎng)
//scheduleAtFixedRate:恰恰相反,計(jì)算周期的時(shí)候不會(huì)管你的執(zhí)行時(shí)間會(huì)繼續(xù)按照設(shè)置的周期計(jì)算,比如任務(wù)執(zhí)行了1s而周期是2s那么就會(huì)根據(jù)你的第一次啟動(dòng)時(shí)間+2s。
//對(duì)應(yīng)公式:下一次執(zhí)行開(kāi)始時(shí)間=上次執(zhí)行結(jié)束時(shí)間+周期
//這里會(huì)有一個(gè)疑問(wèn),那就是如果我周期設(shè)置為2s而任務(wù)執(zhí)行了3s,scheduleAtFixedRate方法不是不計(jì)操作時(shí)長(zhǎng)的嗎?那么會(huì)不會(huì)出現(xiàn)第一次執(zhí)行還未結(jié)束第二次已經(jīng)開(kāi)始呢?這個(gè)問(wèn)題是不存在的,因?yàn)樵谇懊嬷v解的時(shí)候就說(shuō)了結(jié)束周期任務(wù)中有一條就是當(dāng)線程中斷就會(huì)結(jié)束周期,意思就是一個(gè)線程維護(hù)一個(gè)周期任務(wù),一旦線程中斷那么任務(wù)也就結(jié)束了,如果多個(gè)線程對(duì)應(yīng)一個(gè)任務(wù)的時(shí)候只有一個(gè)線程在運(yùn)行任務(wù)其他線程都在等待狀態(tài)。
//這里這樣說(shuō)可能讀者會(huì)納悶這個(gè)結(jié)論怎么來(lái)的,等介紹完結(jié)構(gòu)后會(huì)講解他的實(shí)現(xiàn),到時(shí)候各位再結(jié)合今天的講解看那就一目了然了。
}
//在ScheduledExecutorService中依賴了ScheduledFuture,而從下方可以看出ScheduledFuture是一個(gè)空的接口他繼承與Delayed和Future
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
//Delayed接口只定義了getDelay方法,此方法用于獲取剩余的時(shí)長(zhǎng):getDelay=下次執(zhí)行開(kāi)始時(shí)間-當(dāng)前時(shí)間。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
//可以看出Delayed類繼承了Comparable,此接口是用來(lái)比較兩個(gè)對(duì)象的具體看子類實(shí)現(xiàn),這里說(shuō)下他的返回值,當(dāng)前的數(shù)比較結(jié)果:-1小于、0等于、1大于。
public interface Comparable<T> {
public int compareTo(T o);
}
頂層的基本說(shuō)完了,剩下零零散散的會(huì)在使用的地方再提及,接下來(lái)就是對(duì)netty定義的講解。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
//是否正在關(guān)閉,只有調(diào)用shutdownGracefully兄弟方法或者其他方法,這個(gè)是根據(jù)實(shí)現(xiàn)走的所以再說(shuō)實(shí)現(xiàn)的時(shí)候在細(xì)講。
boolean isShuttingDown();
//優(yōu)美關(guān)閉線程池,怎么優(yōu)美了看他實(shí)現(xiàn)就是了。
Future<?> shutdownGracefully();
//上方的重載,具體實(shí)現(xiàn)按照代碼講解,這里總結(jié)為之過(guò)早。
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回終止時(shí)執(zhí)行的Future操作結(jié)果。實(shí)現(xiàn)時(shí)細(xì)講。
Future<?> terminationFuture();
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
@Deprecated
void shutdown();
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
@Deprecated
List<Runnable> shutdownNow();
//返回一個(gè)由EventExecutorGroup管理的EventExecutor
EventExecutor next();
//此方法重寫與Iterable接口下方單獨(dú)介紹
@Override
Iterator<EventExecutor> iterator();
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
Future<?> submit(Runnable task);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
<T> Future<T> submit(Runnable task, T result);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
<T> Future<T> submit(Callable<T> task);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//此方法來(lái)自于ScheduledExecutorService具體講解查看上文
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
//此接口用來(lái)標(biāo)記當(dāng)前類可進(jìn)行迭代并且支持for語(yǔ)句進(jìn)行遍歷
public interface Iterable<T> {
//獲取當(dāng)前類的迭代器,迭代器不會(huì)進(jìn)行深入講解,因?yàn)檫@個(gè)涉及到數(shù)據(jù)的存儲(chǔ)方式內(nèi)存等。如果講又是一大堆,這里讀者只用知道Iterator是獲取元素的,常用的兩個(gè)方法hasNext()是否還有下一個(gè)數(shù)據(jù)和next()返回當(dāng)前數(shù)據(jù)并且做好獲取下一個(gè)的準(zhǔn)備,這里光說(shuō)可能不理解,舉個(gè)例子數(shù)組[1,2],調(diào)用next則從下標(biāo)0獲取并且下標(biāo)+1,再次next則獲取下標(biāo)為1的那么自然返回2,再次next會(huì)發(fā)現(xiàn)沒(méi)有了那么迭代器報(bào)錯(cuò),所以就需要使用hasNext方法判斷是否有下一個(gè),而他的判斷方法根據(jù)現(xiàn)實(shí)數(shù)據(jù)結(jié)構(gòu)走,如果按上面的例子那么就是判斷當(dāng)前的下標(biāo)是否小于數(shù)組的length。
Iterator<T> iterator();
//java8的特性接口的default方法和接受一個(gè)函數(shù)進(jìn)行調(diào)用。
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
//分割迭代器,支持java8的stream操作,這里暫不進(jìn)行介紹。
default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}
//此事件執(zhí)行器繼承與EventExecutorGroup事件執(zhí)行組,這里暫不討論他的數(shù)據(jù)結(jié)構(gòu)后面實(shí)現(xiàn)的時(shí)候自然會(huì)出現(xiàn)他是怎么個(gè)結(jié)構(gòu)。
//在EventExecutorGroup接口中可以看到有對(duì)EventExecutor的依賴,EventExecutorGroup的迭代就是EventExecutor。
public interface EventExecutor extends EventExecutorGroup {
//如果是EventExecutor的實(shí)現(xiàn)則此方法代表返回當(dāng)前類this。
@Override
EventExecutor next();
//返回EventExecutor的管理者,這里可能有些迷糊在后面會(huì)在實(shí)現(xiàn)中細(xì)講他們的關(guān)系。
EventExecutorGroup parent();
//判斷當(dāng)前線程是否是執(zhí)行器的執(zhí)行線程,一般是調(diào)用下面的方法傳入當(dāng)前的線程
boolean inEventLoop();
//傳入一個(gè)線程判斷是否是當(dāng)前執(zhí)行器的執(zhí)行線程
boolean inEventLoop(Thread thread);
//返回一個(gè)新的應(yīng)答,這里的Promise是Future的實(shí)現(xiàn),下面會(huì)介紹
<V> Promise<V> newPromise();
//返回一個(gè)新的應(yīng)答
<V> ProgressivePromise<V> newProgressivePromise();
//返回一個(gè)SucceededFuture對(duì)象并且他的isSuccess方法為true,使用get獲取結(jié)果是不會(huì)阻塞會(huì)返回你傳入的result。
//這里的Future并不是前面介紹的這個(gè)Future是netty自己定義的,下面會(huì)詳細(xì)介紹
<V> Future<V> newSucceededFuture(V result);
//傳入一個(gè)拋出的異常描述,返回FailedFuture類的一個(gè)實(shí)例,此實(shí)例與上方恰好相反,isSuccess為false,并且get的時(shí)候拋出異常。
<V> Future<V> newFailedFuture(Throwable cause);
}
//netty定義的Future可以看出他繼承與jdk的Future
public interface Future<V> extends java.util.concurrent.Future<V> {
//當(dāng)操作完成時(shí)返回true
boolean isSuccess();
//操作取消的時(shí)候返回true
boolean isCancellable();
//獲取執(zhí)行異常,執(zhí)行成功則返回null
Throwable cause();
//添加執(zhí)行監(jiān)聽(tīng)器,當(dāng)執(zhí)行操作完成時(shí)會(huì)調(diào)用此監(jiān)聽(tīng)器也就是isDone為true的時(shí)候調(diào)用
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//添加一個(gè)集合的監(jiān)聽(tīng)器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//刪除監(jiān)聽(tīng)器,只會(huì)刪除第一次出現(xiàn)的,如果一個(gè)監(jiān)聽(tīng)器添加了多次那么只會(huì)刪除第一次出現(xiàn)的
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//與上方相同只是支持傳入多個(gè)監(jiān)聽(tīng)器進(jìn)行刪除
Future<V> mremoveListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//同步等待結(jié)果,要么返回結(jié)果要么拋出異常,具體按照實(shí)現(xiàn)類講解。
Future<V> sync() throws InterruptedException;
//此方法不會(huì)拋出中斷異常,因?yàn)閮?nèi)部做了處理,這是筆者看實(shí)現(xiàn)總結(jié)的,當(dāng)然不止一種實(shí)現(xiàn)所以暫時(shí)理解即可后續(xù)會(huì)詳細(xì)介紹
Future<V> syncUninterruptibly();
//含義與sync方法相同
Future<V> await() throws InterruptedException;
//含義與syncUninterruptibly方法相同
Future<V> awaitUninterruptibly();
//指定任務(wù)執(zhí)行的等待時(shí)長(zhǎng),等待結(jié)果結(jié)束后根據(jù)執(zhí)行結(jié)果isDone方法結(jié)果返回,具體以實(shí)現(xiàn)為主
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
//調(diào)用上方的方法,是指?jìng)魅氲膖imeoutMillis是毫秒
boolean await(long timeoutMillis) throws InterruptedException;
//上面的await方法如果出現(xiàn)中斷會(huì)拋出中斷異常,此方法不會(huì)拋出異常,傳參和第一個(gè)await一樣
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
//不會(huì)拋出中斷異常傳參和第二個(gè)await一樣
boolean awaitUninterruptibly(long timeoutMillis);
//非阻塞獲取結(jié)果如果futrue執(zhí)行成功那么返回call的返回值,如果失敗或者還正在處理則返回null
V getNow();
//暫時(shí)與父類的含義一樣,具體根據(jù)實(shí)現(xiàn)子類理解。
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
結(jié)構(gòu)到這里差不多結(jié)束了剩下的結(jié)構(gòu)需要根據(jù)具體的實(shí)現(xiàn)查看所以這里暫不介紹,下一篇文章將會(huì)講解netty中的future,從本文可以看出有很多關(guān)于future的使用,可以看出netty非常依賴于他那么下節(jié)將會(huì)進(jìn)行future的講解。