Dubbo線程模型

線程是每個應(yīng)用都必須關(guān)系的事情,畢竟任何服務(wù)器的資源都是有限的,服務(wù)線程過少的容易發(fā)生阻塞,服務(wù)線程過多的話上下文切換的開銷又會影響效率,所以合適的線程模型對于一個高性能的應(yīng)用來說必不可少。Dubbo作為一個帶有服務(wù)治理功能的RPC框架,在線程模型上也有自己的處理,今天就讓我們一起來看一下Dubbo的線程模型。

下面我們要看一下默認(rèn)情況下的線程模型:
首先明確一個基本概念:IO線程和業(yè)務(wù)線程的區(qū)別

  • IO線程:配置在netty連接點的用于處理網(wǎng)絡(luò)數(shù)據(jù)的線程,主要處理編解碼等直接與網(wǎng)絡(luò)數(shù)據(jù)打交道的事件。
  • 業(yè)務(wù)線程:用于處理具體業(yè)務(wù)邏輯的線程,可以理解為自己在provider上寫的代碼所執(zhí)行的線程環(huán)境。

Dubbo默認(rèn)采用的是長鏈接的方式,即默認(rèn)情況下一個consumer和一個provider之間只會建立一條鏈接,這種情況下IO線程的工作就是編碼和解碼數(shù)據(jù),監(jiān)聽具體的數(shù)據(jù)請求,直接通過Channel發(fā)布數(shù)據(jù)等等;二業(yè)務(wù)線程就是處理IO線程處理之后的數(shù)據(jù),業(yè)務(wù)線程并不知道任何跟網(wǎng)絡(luò)相關(guān)的內(nèi)容,只是純粹的處理業(yè)務(wù)邏輯,在業(yè)務(wù)處理邏輯的時候往往存在復(fù)雜的邏輯,所以業(yè)務(wù)線程池的配置往往都要比IO線程池的配置大很多。

Dubbo中線程相關(guān)參數(shù)的含義

  • iothreads:指定IO線程池(worker)的線程數(shù)量,默認(rèn)情況下為CPU個數(shù)+1,因為這個線程的工作內(nèi)容比較簡單,所以一般情況下我們不會去配置這個值,除非IO線程的響應(yīng)速度明顯拖慢了整個工程的響應(yīng),IO線程的默認(rèn)類型是CacheThreadPool,一分鐘的線程死亡時間。

  • threadpool:業(yè)務(wù)線程的具體線程類型,默認(rèn)采用的fixed線程池,即線程數(shù)量一定的線程池,這種線程池的好處就是不會頻繁創(chuàng)建線程線程,適合線業(yè)務(wù)比較密集的應(yīng)用。因為這個數(shù)據(jù)只管關(guān)系到服務(wù)的并發(fā)情況,所以在需要的時候可以適當(dāng)調(diào)整該數(shù)量來增加工程的并發(fā)。

  • threads:該參數(shù)就是業(yè)務(wù)線程池的核心線程數(shù)配置,默認(rèn)情況下為200。如果空間有條件的話可以適當(dāng)?shù)靥嵘摂?shù)量,例如提升至400或者500都是可以的。

  • queues:該數(shù)量指定來在初始化業(yè)務(wù)線程池時候是否需要排隊隊列,如果不設(shè)置的話,業(yè)務(wù)線程池的排隊隊列是SynchronousQueue,即不允許業(yè)務(wù)事件排隊,如果線程池沒有空閑線程之后會直接排除異常信息。但是如果配置來queues之后則會使用LinkedBlockingQueue作為排隊隊列,queues則代表隊列的初始隊列。因為queues的配置直接關(guān)系到排隊,所以在一般情況下建議不要配置,因為線程池滿的情況下一般期望是直接失敗,然后調(diào)用其他的機(jī)器,而不是再次隊列繼續(xù)等待,繼續(xù)等待不僅可能會拉低響應(yīng)時間,而且很有可能會超時。

  • acceptes:我們知道threadpool,threads和queues都是控制業(yè)務(wù)線程池的字段,而acceptes就是控制IO線程池的字段。這個字段標(biāo)示著服務(wù)端可接受的最大長連接數(shù),默認(rèn)情況下為不限制,但是有時候為來保護(hù)服務(wù)器防止連接數(shù)過多導(dǎo)致請求失敗率過高,則可以考慮設(shè)置該字段為一個定值。

  • connections:既然服務(wù)端可以設(shè)置最大接收的連接數(shù),那么客戶端也可以設(shè)置與服務(wù)端建立的連接數(shù)。connections可以配置在reference上表示要同對應(yīng)的服務(wù)器建立的長鏈接數(shù)量,默認(rèn)為只建立一條鏈接,如果配置來connections的話則會建立N條長鏈接以提供消費者的吞吐量。但是有一點需要注意是如果conenctions的數(shù)量配置大于服務(wù)端的accepts的話,超出的部分會直接報錯,表示不支持更多的鏈接,該值不宜配置過多,因為如果多個消費者都配置來該值的話很容易到值服務(wù)端的accepts超過預(yù)期數(shù)量而報錯。

  • dispatcher:這個字段代表的是IO線程池和業(yè)務(wù)線程池的邊界,具體有這么幾種類型,下面我們一一詳細(xì)看看:

    • all:所有消息都派發(fā)到線程池,包括請求,響應(yīng),連接事件,斷開事件,心跳等。對應(yīng)的是AllChannelHandler(具體這個Handler的處理位置以及他的作用,見前幾篇博客,這里不再強(qiáng)調(diào))
    public class AllChannelHandler extends WrappedChannelHandler {
      
      public AllChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      
      //鏈接事件通過線程池處理
      public void connected(Channel channel) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
          }
      }
      //鏈接斷開事件通過線程池處理
      public void disconnected(Channel channel) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
          }
      }
      //數(shù)據(jù)接收事件通過線程池處理
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = getExecutorService();
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
      //異常事件通過線程池梳理
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          ExecutorService cexecutor = getExecutorService(); 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
          }catch (Throwable t) {
              throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
          }
      }
    
      private ExecutorService getExecutorService() {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) { 
              cexecutor = SHARED_EXECUTOR;
          }
          return cexecutor;
      }
      }
    
    • direct:所有消息都不派發(fā)到線程池,全部在IO線程上直接執(zhí)行。(這種做法在絕大多數(shù)情況下都不合理,因為畢竟業(yè)務(wù)邏輯相關(guān)對IO事件都是復(fù)雜的)。具體的實現(xiàn)方式就是在裝飾者的層級上直接下調(diào),不再包裝線程池。
    • message:只有請求響應(yīng)消息派發(fā)到線程池,其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行。
      /**
       * 只有請求響應(yīng)消息派發(fā)到線程池,其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行
       */
      public class MessageOnlyChannelHandler extends WrappedChannelHandler {
      
      public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      //接收到消息時候觸發(fā),無論是服務(wù)端接收到請求數(shù)據(jù)還是客戶端接收到返回數(shù)據(jù)
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) {
              cexecutor = SHARED_EXECUTOR;
          }
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
    
      }
    

    具體的做法其實很簡單,相對于all來說只不過只會會將received事件在線程池中處理,其他的一概以默認(rèn)方式處理(IO線程池)。

    • execution:官方的說法是:只請求消息派發(fā)到線程池,不含響應(yīng),響應(yīng)和其它連接斷開事件,心跳等消息,直接在IO線程上執(zhí)行。相對于message來說,限制的更死了,也就是只有服務(wù)端的業(yè)務(wù)邏輯才會執(zhí)行在業(yè)務(wù)線程池中執(zhí)行。消費端如果收到的消息之后,處理邏輯還是IO線程上執(zhí)行。但是實際情況是我看到的代碼顯示execution與all的處理邏輯幾乎一樣,并沒有體現(xiàn)出官方的說法。 具體實現(xiàn)如下:
      public class ExecutionChannelHandler extends WrappedChannelHandler {
      
      public ExecutionChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
      }
      //處理鏈接建立事件
      public void connected(Channel channel) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
      }
      //處理鏈接斷開事件
      public void disconnected(Channel channel) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
      }
      //處理數(shù)據(jù)收到的事件
      public void received(Channel channel, Object message) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
      }
      //處理異常事件
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
      }
      }
      
    

    有可能是我的理解還不到位,對于其其他的用處沒有理解到,如果這里有問題的話還請大家指出。

    • connection:在IO線程上,將連接建立以及斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到線程池。具體實現(xiàn)如下:
      public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    
      protected final ThreadPoolExecutor connectionExecutor;
      private final int queuewarninglimit ;
      
      public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
          super(handler, url);
          String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
          //初始化一個單獨處理鏈接建立和斷開的無界隊列連接池
          connectionExecutor = new ThreadPoolExecutor(1, 1,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                                       new NamedThreadFactory(threadName, true),
                                       new AbortPolicyWithReport(threadName, url)
              );  
          //預(yù)警排隊數(shù)量
          queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
      }
      
      //連接建立事件直接在單獨的線程池中處理
      public void connected(Channel channel) throws RemotingException {
          try{
              checkQueueLength();
              connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
          }
      }
      //連接斷開事件直接在單獨的線程池中處理
      public void disconnected(Channel channel) throws RemotingException {
          try{
              checkQueueLength();
              connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
          }catch (Throwable t) {
              throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t);
          }
      }
      //數(shù)據(jù)接收事件還是在業(yè)務(wù)線程池中處理
      public void received(Channel channel, Object message) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) {
              cexecutor = SHARED_EXECUTOR;
          }
          try {
              cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
          } catch (Throwable t) {
              throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
          }
      }
      //異常事件還是在業(yè)務(wù)線程池中處理
      public void caught(Channel channel, Throwable exception) throws RemotingException {
          ExecutorService cexecutor = executor;
          if (cexecutor == null || cexecutor.isShutdown()) { 
              cexecutor = SHARED_EXECUTOR;
          } 
          try{
              cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
          }catch (Throwable t) {
              throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
          }
      }
      //檢查排隊數(shù)量是否大于預(yù)警數(shù)量(默認(rèn)為1000),如果炒過的話就打WARNING日志
      private void checkQueueLength(){
          if (connectionExecutor.getQueue().size() > queuewarninglimit){
              logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
          }
      }
      }
      
    

    這種dispatcher的意義就在于將將連接事件與IO線程池和業(yè)務(wù)線程池分開處理,是其不會相互干擾。假如在網(wǎng)絡(luò)不穩(wěn)定的環(huán)境下,不會因為頻繁的網(wǎng)絡(luò)抖動影響實際的業(yè)務(wù)處理效率。


關(guān)于dubbo線程模型的內(nèi)容應(yīng)該都已經(jīng)講完了,具體怎么配置還要根據(jù)實際的業(yè)務(wù)場景。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,627評論 18 399
  • netty常用API學(xué)習(xí) netty簡介 Netty是基于Java NIO的網(wǎng)絡(luò)應(yīng)用框架. Netty是一個NIO...
    花丶小偉閱讀 6,118評論 0 20
  • 學(xué)好PPT,這是一門必修課。 上周,小宇受邀請在全省培訓(xùn)“微課堂”進(jìn)行了一次線上的PPT交流分享。小宇接觸PPT可...
    波波不是一棵菠菜閱讀 829評論 0 2
  • 這一日早晨鎮(zhèn)上的鐵匠鋪來了位奇怪的客人,這個客人身穿道袍,身形消瘦,頭上隨意的挽著一個道髻,用一根不知道哪里來的竹...
    從九樓開始飛翔閱讀 791評論 0 1

友情鏈接更多精彩內(nèi)容