線程是每個應(yīng)用都必須關(guān)系的事情,畢竟任何服務(wù)器的資源都是有限的,服務(wù)線程過少的容易發(fā)生阻塞,服務(wù)線程過多的話上下文切換的開銷又會影響效率,所以合適的線程模型對于一個高性能的應(yīng)用來說必不可少。Dubbo作為一個帶有服務(wù)治理功能的RPC框架,在線程模型上也有自己的處理,今天就讓我們一起來看一下Dubbo的線程模型。
下面我們要看一下默認(rèn)情況下的線程模型:
首先明確一個基本概念:IO線程和業(yè)務(wù)線程的區(qū)別
- IO線程:配置在netty連接點的用于處理網(wǎng)絡(luò)數(shù)據(jù)的線程,主要處理編解碼等直接與網(wǎng)絡(luò)數(shù)據(jù)打交道的事件。
- 業(yè)務(wù)線程:用于處理具體業(yè)務(wù)邏輯的線程,可以理解為自己在provider上寫的代碼所執(zhí)行的線程環(huán)境。
Dubbo默認(rèn)采用的是長鏈接的方式,即默認(rèn)情況下一個consumer和一個provider之間只會建立一條鏈接,這種情況下IO線程的工作就是編碼和解碼數(shù)據(jù),監(jiān)聽具體的數(shù)據(jù)請求,直接通過Channel發(fā)布數(shù)據(jù)等等;二業(yè)務(wù)線程就是處理IO線程處理之后的數(shù)據(jù),業(yè)務(wù)線程并不知道任何跟網(wǎng)絡(luò)相關(guān)的內(nèi)容,只是純粹的處理業(yè)務(wù)邏輯,在業(yè)務(wù)處理邏輯的時候往往存在復(fù)雜的邏輯,所以業(yè)務(wù)線程池的配置往往都要比IO線程池的配置大很多。
Dubbo中線程相關(guān)參數(shù)的含義
iothreads:指定IO線程池(worker)的線程數(shù)量,默認(rèn)情況下為CPU個數(shù)+1,因為這個線程的工作內(nèi)容比較簡單,所以一般情況下我們不會去配置這個值,除非IO線程的響應(yīng)速度明顯拖慢了整個工程的響應(yīng),IO線程的默認(rèn)類型是CacheThreadPool,一分鐘的線程死亡時間。
threadpool:業(yè)務(wù)線程的具體線程類型,默認(rèn)采用的fixed線程池,即線程數(shù)量一定的線程池,這種線程池的好處就是不會頻繁創(chuàng)建線程線程,適合線業(yè)務(wù)比較密集的應(yīng)用。因為這個數(shù)據(jù)只管關(guān)系到服務(wù)的并發(fā)情況,所以在需要的時候可以適當(dāng)調(diào)整該數(shù)量來增加工程的并發(fā)。
threads:該參數(shù)就是業(yè)務(wù)線程池的核心線程數(shù)配置,默認(rèn)情況下為200。如果空間有條件的話可以適當(dāng)?shù)靥嵘摂?shù)量,例如提升至400或者500都是可以的。
queues:該數(shù)量指定來在初始化業(yè)務(wù)線程池時候是否需要排隊隊列,如果不設(shè)置的話,業(yè)務(wù)線程池的排隊隊列是SynchronousQueue,即不允許業(yè)務(wù)事件排隊,如果線程池沒有空閑線程之后會直接排除異常信息。但是如果配置來queues之后則會使用LinkedBlockingQueue作為排隊隊列,queues則代表隊列的初始隊列。因為queues的配置直接關(guān)系到排隊,所以在一般情況下建議不要配置,因為線程池滿的情況下一般期望是直接失敗,然后調(diào)用其他的機(jī)器,而不是再次隊列繼續(xù)等待,繼續(xù)等待不僅可能會拉低響應(yīng)時間,而且很有可能會超時。
acceptes:我們知道threadpool,threads和queues都是控制業(yè)務(wù)線程池的字段,而acceptes就是控制IO線程池的字段。這個字段標(biāo)示著服務(wù)端可接受的最大長連接數(shù),默認(rèn)情況下為不限制,但是有時候為來保護(hù)服務(wù)器防止連接數(shù)過多導(dǎo)致請求失敗率過高,則可以考慮設(shè)置該字段為一個定值。
connections:既然服務(wù)端可以設(shè)置最大接收的連接數(shù),那么客戶端也可以設(shè)置與服務(wù)端建立的連接數(shù)。connections可以配置在reference上表示要同對應(yīng)的服務(wù)器建立的長鏈接數(shù)量,默認(rèn)為只建立一條鏈接,如果配置來connections的話則會建立N條長鏈接以提供消費者的吞吐量。但是有一點需要注意是如果conenctions的數(shù)量配置大于服務(wù)端的accepts的話,超出的部分會直接報錯,表示不支持更多的鏈接,該值不宜配置過多,因為如果多個消費者都配置來該值的話很容易到值服務(wù)端的accepts超過預(yù)期數(shù)量而報錯。
-
dispatcher:這個字段代表的是IO線程池和業(yè)務(wù)線程池的邊界,具體有這么幾種類型,下面我們一一詳細(xì)看看:
- all:所有消息都派發(fā)到線程池,包括請求,響應(yīng),連接事件,斷開事件,心跳等。對應(yīng)的是AllChannelHandler(具體這個Handler的處理位置以及他的作用,見前幾篇博客,這里不再強(qiáng)調(diào))
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //鏈接事件通過線程池處理 public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } } //鏈接斷開事件通過線程池處理 public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); }catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t); } } //數(shù)據(jù)接收事件通過線程池處理 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常事件通過線程池梳理 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); }catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t); } } private ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } }- direct:所有消息都不派發(fā)到線程池,全部在IO線程上直接執(zhí)行。(這種做法在絕大多數(shù)情況下都不合理,因為畢竟業(yè)務(wù)邏輯相關(guān)對IO事件都是復(fù)雜的)。具體的實現(xiàn)方式就是在裝飾者的層級上直接下調(diào),不再包裝線程池。
- message:只有請求響應(yīng)消息派發(fā)到線程池,其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行。
/** * 只有請求響應(yīng)消息派發(fā)到線程池,其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行 */ public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //接收到消息時候觸發(fā),無論是服務(wù)端接收到請求數(shù)據(jù)還是客戶端接收到返回數(shù)據(jù) public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } }具體的做法其實很簡單,相對于all來說只不過只會會將received事件在線程池中處理,其他的一概以默認(rèn)方式處理(IO線程池)。
- execution:官方的說法是:只請求消息派發(fā)到線程池,不含響應(yīng),響應(yīng)和其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行。相對于message來說,限制的更死了,也就是只有服務(wù)端的業(yè)務(wù)邏輯才會執(zhí)行在業(yè)務(wù)線程池中執(zhí)行。消費端如果收到的消息之后,處理邏輯還是IO線程上執(zhí)行。但是實際情況是我看到的代碼顯示execution與all的處理邏輯幾乎一樣,并沒有體現(xiàn)出官方的說法。 具體實現(xiàn)如下:
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //處理鏈接建立事件 public void connected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); } //處理鏈接斷開事件 public void disconnected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); } //處理數(shù)據(jù)收到的事件 public void received(Channel channel, Object message) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } //處理異常事件 public void caught(Channel channel, Throwable exception) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); } }有可能是我的理解還不到位,對于其其他的用處沒有理解到,如果這里有問題的話還請大家指出。
- connection:在IO線程上,將連接建立以及斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到線程池。具體實現(xiàn)如下:
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit ; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME); //初始化一個單獨處理鏈接建立和斷開的無界隊列連接池 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); //預(yù)警排隊數(shù)量 queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } //連接建立事件直接在單獨的線程池中處理 public void connected(Channel channel) throws RemotingException { try{ checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } } //連接斷開事件直接在單獨的線程池中處理 public void disconnected(Channel channel) throws RemotingException { try{ checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED)); }catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t); } } //數(shù)據(jù)接收事件還是在業(yè)務(wù)線程池中處理 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常事件還是在業(yè)務(wù)線程池中處理 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try{ cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception)); }catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t); } } //檢查排隊數(shù)量是否大于預(yù)警數(shù)量(默認(rèn)為1000),如果炒過的話就打WARNING日志 private void checkQueueLength(){ if (connectionExecutor.getQueue().size() > queuewarninglimit){ logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit)); } } }這種dispatcher的意義就在于將將連接事件與IO線程池和業(yè)務(wù)線程池分開處理,是其不會相互干擾。假如在網(wǎng)絡(luò)不穩(wěn)定的環(huán)境下,不會因為頻繁的網(wǎng)絡(luò)抖動影響實際的業(yè)務(wù)處理效率。
關(guān)于dubbo線程模型的內(nèi)容應(yīng)該都已經(jīng)講完了,具體怎么配置還要根據(jù)實際的業(yè)務(wù)場景。