HikariCP源碼分析之ConcurrentBag一

歡迎訪問我的博客,同步更新: 楓山別院

源代碼版本2.4.5-SNAPSHOT

大家好,今天我們一起分析下 HikariCP 的核心ConcurrentBag,它是管理連接池的最重要的核心類。從它的名字大家可以看得出來,它是一個并發(fā)管理類,性能非常好,這是它性能甩其他連接池十條街的秘密所在。

代碼概覽

我們先看一下代碼,注意這不是全部的代碼,省略了不太重要的部分。大家可以看到我加了非常詳細的注釋,對詳解不太感興趣的朋友可以直接讀一下代碼即可,不過這部分歷時好幾個夜晚我才寫完,大家可以稍稍捧個場:

//可用連接同步器, 用于線程間空閑連接數(shù)的通知, synchronizer.currentSequence()方法可以獲取當(dāng)前數(shù)量
//其實就是一個計數(shù)器, 連接池中創(chuàng)建了一個連接或者還回了一個連接就 + 1, 但是連接池的連接被借走, 是不會 -1 的, 只加不減
//用于在線程從連接池中獲取連接時, 查詢是否有空閑連接添加到連接池, 詳見borrow方法
private final QueuedSequenceSynchronizer synchronizer;
//sharedList保存了所有的連接
private final CopyOnWriteArrayList<T> sharedList;
//threadList可能會保存sharedList中連接的引用
private final ThreadLocal<List<Object>> threadList;
//對HikariPool的引用, 用于請求創(chuàng)建新連接
private final IBagStateListener listener;
//當(dāng)前等待獲取連接的線程數(shù)
private final AtomicInteger waiters;
//標(biāo)記連接池是否關(guān)閉的狀態(tài)
private volatile boolean closed;


/**
 * 該方法會從連接池中獲取連接, 如果沒有連接可用, 會一直等待timeout超時
 *
 * @param timeout  超時時間
 * @param timeUnit 時間單位
 * @return a borrowed instance from the bag or null if a timeout occurs
 * @throws InterruptedException if interrupted while waiting
 */
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
   //①
   //先嘗試從ThreadLocal中獲取
   List<Object> list = threadList.get();
   if (weakThreadLocals && list == null) {
      //如果ThreadLocal是 null, 就初始化, 防止后面 npe
      list = new ArrayList<>(16);
      threadList.set(list);
   }
   //②
   //如果ThreadLocal中有連接的話, 就遍歷, 嘗試獲取
   //從后往前反向遍歷是有好處的, 因為最后一次使用的連接, 空閑的可能性比較大, 之前的連接可能會被其他線程偷竊走了
   for (int i = list.size() - 1; i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }
   //③
   //如果沒有從ThreadLocal中獲取到連接, 那么就sharedList連接池中遍歷, 獲取連接, timeout時間后超時
   //因為ThreadLocal中保存的連接是當(dāng)前線程使用過的, 才會在ThreadLocal中保留引用, 連接池中可能還有其他空閑的連接, 所以要遍歷連接池
   //看一下requite(final T bagEntry)方法的實現(xiàn), 還回去的連接放到了ThreadLocal中
   timeout = timeUnit.toNanos(timeout);
   Future<Boolean> addItemFuture = null;
   //記錄從連接池獲取連接的開始時間, 后面用
   final long startScan = System.nanoTime();
   final long originTimeout = timeout;
   long startSeq;
   //將等待連接的線程計數(shù)器加 1
   waiters.incrementAndGet();
   try {
      do {
         // scan the shared list
         do {
            //④
            //當(dāng)前連接池中的連接數(shù), 在連接池中添加新連接的時候, 該值會增加
            startSeq = synchronizer.currentSequence();
            for (T bagEntry : sharedList) {
               if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                  // if we might have stolen another thread's new connection, restart the add...
                  //⑤
                  //如果waiters大于 1, 說明除了當(dāng)前線程之外, 還有其他線程在等待空閑連接
                  //這里, 當(dāng)前線程的addItemFuture是 null, 說明自己沒有請求創(chuàng)建新連接, 但是拿到了連接, 這就說明是拿到了其他線程請求創(chuàng)建的連接, 這就是所謂的偷竊了其他線程的連接, 然后當(dāng)前線程請求創(chuàng)建一個新連接, 補償給其他線程
                  if (waiters.get() > 1 && addItemFuture == null) {
                     //提交一個異步添加新連接的任務(wù)
                     listener.addBagItem();
                  }
                  return bagEntry;
               }
            }
         } while (startSeq < synchronizer.currentSequence()); //如果連接池中的空閑連接數(shù)量比循環(huán)之前多了, 說明有新連接加入, 繼續(xù)循環(huán)獲取
         //⑥
         //循環(huán)完一遍連接池(也可能循環(huán)多次, 如果正好在第一次循環(huán)完連接池后有新連接加入, 那么會繼續(xù)循環(huán)), 還是沒有能拿到空閑連接, 就請求創(chuàng)建新的連接
         if (addItemFuture == null || addItemFuture.isDone()) {
            addItemFuture = listener.addBagItem();
         }
         //計算 剩余的超時時間 = 用戶設(shè)置的connectionTimeout - (系統(tǒng)當(dāng)前時間 - 開始獲取連接的時間_代碼①處 即從連接池中獲取連接一共使用的時間)
         timeout = originTimeout - (System.nanoTime() - startScan);
      } while (timeout > 10_000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout)); //③
      //⑦
      //這里的循環(huán)條件比較復(fù)雜
      //1. 如果剩余的超時時間, 大于10_000納秒
      //2. startSeq的數(shù)量, 即空閑連接數(shù)超過循環(huán)之前的數(shù)量
      //3. 沒有超過超時時間timeout
      //滿足以上 3 個條件才會繼續(xù)循環(huán), 否則阻塞線程, 直到滿足以上條件
      //如果一直等到timeout超時時間用完都沒有滿足條件, 結(jié)束阻塞, 往下走
      //有可能會動態(tài)改變的條件, 只有startSeq數(shù)量改變, 是②處添加的創(chuàng)建連接請求
   } finally {
      waiters.decrementAndGet();
   }

   return null;
}

/**
 * 該方法將借出去的連接還回到連接池中
 * 不通過該方法還回的連接會造成內(nèi)存泄露
 *
 * @param bagEntry the value to return to the bag
 * @throws NullPointerException  if value is null
 * @throws IllegalStateException if the requited value was not borrowed from the bag
 */
public void requite(final T bagEntry) {
   //⑧
   //lazySet方法不能保證連接會立刻被設(shè)置成可用狀態(tài), 這是個延遲方法
   //這是一種優(yōu)化, 如果要立即生效的話, 可能會需要使用volatile等, 讓其他線程立即發(fā)現(xiàn), 這會降低性能, 使用lazySet浪費不了多少時間, 但是不會浪費性能
   bagEntry.lazySet(STATE_NOT_IN_USE);

   //⑨
   //將連接放回到threadLocal中
   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList != null) {
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
   //通知等待線程, 有可用連接
   synchronizer.signal();
}

/**
 * 在連接池中添加一個連接
 * 新連接都是添加到sharedList中, threadList是sharedList中的部分連接的引用
 *
 * @param bagEntry an object to add to the bag
 */
public void add(final T bagEntry) {
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }
   //⑩
   sharedList.add(bagEntry);
   synchronizer.signal();
}

/**
 * 從連接池中移除一個連接.
 * 這個方法只能用于從<code>borrow(long, TimeUnit)</code> 或者 <code>reserve(T)</code>方法中獲取到的連接
 * 也就是說, 這個方法只能移除處于使用中和保留狀態(tài)的連接
 *
 * @param bagEntry the value to remove
 * @return true if the entry was removed, false otherwise
 * @throws IllegalStateException if an attempt is made to remove an object
 *                               from the bag that was not borrowed or reserved first
 */
public boolean remove(final T bagEntry) {
   //?
   //嘗試標(biāo)記移除使用中和保留狀態(tài)的連接, 如果標(biāo)記失敗, 就是空閑的連接, 直接返回 false
   //也就是檢查連接的狀態(tài), 不能移除空閑的連接或者已經(jīng)標(biāo)記移除的連接
   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
      return false;
   }
   //如果上面標(biāo)記成功了, 那么從連接池中移除這個連接
   final boolean removed = sharedList.remove(bagEntry);
   if (!removed && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
   }

   // synchronizer.signal();
   return removed;
}

上面的代碼是ConcurrentBag中的成員變量和最重要的四個方法,ConcurrentBag中的屬性我們穿插在代碼中解釋。

borrow方法

borrow方法應(yīng)該是整個 HikariCP 中最最核心的方法,它是我們從連接池中獲取連接的時候最終會調(diào)用到的方法,一切秘密都在這里了。我們分析下:

①ThreadLocal

//①
//先嘗試從ThreadLocal中獲取
List<Object> list = threadList.get();
if (weakThreadLocals && list == null) {
  //如果ThreadLocal是 null, 就初始化, 防止后面 npe
  list = new ArrayList<>(16);
  threadList.set(list);
}

threadList是在ConcurrentBag上的成員變量,它的定義是private final ThreadLocal<List> threadList;,可見它是一個ThreadLocal,也就是每個線程都有獨享的一個 List,它是用于保存當(dāng)前線程用過的連接。注意這里我說的是"用過",不是所有的連接。因為還有一個成員變量private final CopyOnWriteArrayList<T> sharedList;,它是真正的保存所有的連接的地方,它是一個CopyOnWriteArrayList,在寫入的時候會先復(fù)制一個 sharedList2,然后修改這個新的 sharedList2,最后將變量地址指向新的sharedList2,不是直接寫入當(dāng)前的sharedList,典型的空間換時間的一個做法,可以避免寫入前要鎖住sharedList,從而導(dǎo)致降低性能。

HikariCP 使用過的連接,在還回連接池的時候,是直接放在了ThreadLocal中。說到這里,可能會有同學(xué)問了:sharedList中保存了所有的連接,當(dāng)用戶借走了一個連接,不是應(yīng)該把這個連接從sharedList中移除,然后還回來的時候再把連接加入到sharedList中?為什么還回去的時候,沒有放到sharedList中呢?

首先,明確一點,HikariCP 不是這樣做的。為什么呢?如果用戶借用連接的時候,你從sharedList中移除了,那么相當(dāng)于這個連接脫離了 HikariCP 的管理,后面 HikariCP 還怎么管理這個連接呢?比如這個連接的生命周期到時間了,連接都讓用戶拐跑了,我還怎么關(guān)閉這個連接呢?所以,所有的連接都不能脫離掌控,一個都不能少。其實,我們在sharedList中保存的僅僅是數(shù)據(jù)庫連接的引用,這些連接是所有的線程都可見的,各個線程也可以隨意保存連接的引用,只是要使用的時候必須要走borrow方法,按流程來。

為什么要放到線程的threadList中?

因為下次獲取的時候比較方便,也許會提高性能。每個線程都優(yōu)先從自己的本地線程中拿,競爭的可能性大大降低啊,也許這個連接剛剛用完到再次獲取的時間極短,這個連接很可能還空閑著。只有在本地線程中的連接都不能使用的時候,才去sharedList這個 HikariCP的總倉庫里獲取。

舉一個生活例子:假如你是一個連鎖店老板,提供汽車出租服務(wù),有一個總倉庫,所有的連鎖店都從這里提車出租給用戶。剛開始,你是每租一輛車都去倉庫直接提貨,用戶還車的時候,你直接送到倉庫。過了一段時間,你覺得這樣不行啊,太浪費時間了,而且所有的連鎖店都這樣,各個店的老板都去提車,太忙了,還得排隊。要不用戶還回來的車先放店里吧,這樣下次有用戶租車就不用去倉庫了,直接給他,方便很多,店里沒車了再去總倉提車。其他連鎖店都開始這么搞,大家都先用店里的車不夠再去總倉。生意火爆,有一天店里沒車了,你去倉庫提車,倉庫管理員說:倉庫也沒車了,天通苑的連鎖店里有閑著的,你去那里提吧,于是你把天通苑連鎖店的車借走了。所以各個連鎖店之間也有相互借車。

例子可能不太恰當(dāng),一時也想不到同樣道理的生活例子,但是就這個意思。HikariCP 也是這樣,用戶使用的連接,還回連接池的時候,直接放到線程的本地threadList中,如果用戶又要借用連接,先看本地有沒有,優(yōu)先使用本地連接,只有本地沒有或者都不可用的時候,再去 HikariCP 的連接池里獲取。但是跟借車不同,因為我們本地是保存的sharedList中連接的引用,雖然你還有這個連接的引用,但是很可能它已經(jīng)被其他線程從sharedList借走了,這就是HikariCP所謂的線程間的連接竊取。所以線程在本地的threadList就算拿到了連接,也必須檢查下狀態(tài),是不是可用的。

說到這里,還沒有解析代碼,扯遠了。①出代碼就是先從本地的threadList里取出連接的 List,然后檢查下List 是否為空,是空的直接初始化一個 List,因為下面要用到,防止拋空指針了。大家可以看到判空的時候,還有一個條件是weakThreadLocals,這個標(biāo)識是表示threadList是否是弱引用。如果是弱引用,那么很可能 GC 的時候會被回收掉,所以變成 null 了,但是如果不是弱引用的話,那么它是在初始化ConcurrentBag的時候,就是一個FastList了,不用擔(dān)心是 null。那么什么情況下threadList會是弱引用呢?當(dāng) HikariCP 運行在容器中時,會使用弱引用,因為在容器重新部署的時候,可能會導(dǎo)致內(nèi)存泄露,具體大家可以看下#39 的 issue。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容