連接池commons-pool源碼學習

上一篇簡單的hello world

ReaderUtil readerUtil = new ReaderUtil(new GenericObjectPool<StringBuffer>(new StringBufferFactory()));

理解commons pool,需要了解主要類,接口

  • PooledObject : 池化對象,在對象的基礎上,添加對象的屬性,方法,方便判斷對象的可用狀
  • ObjectPool : 對象池,利用它管理,獲取對象
  • PoolableObjectFactory : 可池化對象的維護工廠
  • GenericObjectPoolConfig : 連接池配置
  • LinkedBlockingDeque : 線程安全的阻塞隊列數(shù)據(jù)結構
  • Evictor : 驅(qū)逐者線程,當該線程啟動的是否,負責判斷隊列中的對象是否需要驅(qū)逐,驅(qū)逐完如果空閑對象數(shù)量小于最小可以使用的數(shù)量,維持最小的idel個對象,就創(chuàng)建等于最小數(shù)量的對象數(shù)

一步步來吧

  • PooledObject : 池化對象,在對象的基礎上,添加對象的屬性,方法,方便判斷對象的可用狀態(tài),比如池化StringBuffer字符操作對象
PooledObject pooledObject = new DefaultPooledObject(new StringBuffer());

public DefaultPooledObject(final T object) {
    //真正操作的還是StringBuffer對象,只是為了方便維護,讓DefaultPooledObject裝飾一番
    this.object = object;
}

ObjectPool : 對象池,利用它管理,獲取對象,看看主要方法

ObjectPool接口實現(xiàn)GenericObjectPool

public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;
        //支持阻塞的線程安全隊列
        idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
        //設置連接池配置
        setConfig(config);
        //是否啟動驅(qū)逐線程
        startEvictor(getTimeBetweenEvictionRunsMillis());
    }

主要方法

//從空閑隊列LinkedBlockingDeque獲取連接對象
T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;

//把使用完的對象歸還到空閑隊列LinkedBlockingDeque
void returnObject(T obj) throws Exception;

//是否啟動驅(qū)逐線程
final void startEvictor(final long delay);
獲取連接對象
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        assertOpen();

        final AbandonedConfig ac = this.abandonedConfig;
        //如果配置了遺棄,當前空閑隊列對象數(shù)量小于2,并且正在使用中的對象數(shù)量大于(池中最多對象數(shù)量-3)
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            //正在使用狀態(tài)的,并且使用時間超過配置時間還沒有歸還的對象,則銷毀
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            p = idleObjects.pollFirst();
            if (p == null) {
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            if (blockWhenExhausted) {
                if (p == null) {
                    //如果沒有設置獲取等待超時就一直等待,直到有對象可以獲取
                    if (borrowMaxWaitMillis < 0) {
                        p = idleObjects.takeFirst();
                    } else {
                        //設置了獲取超時時間,如果超過設置時間還沒有獲取到,直接返回null
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            if (!p.allocate()) {
                p = null;
            }

            if (p != null) {
                try {
                    //激活對象
                    factory.activateObject(p);
                } catch (final Exception e) {
                    try {
                        //激活失敗,銷毀對象
                        destroy(p);
                    } catch (final Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        //檢查對象是否有效可用
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    if (!validate) {
                        try {
                            //無效則銷毀
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }
removeAbandoned銷毀狀態(tài)在使用中,超過一段時間沒有歸還的對象
    private void removeAbandoned(final AbandonedConfig ac) {
        // Generate a list of abandoned objects to remove
        final long now = System.currentTimeMillis();
        final long timeout =
                now - (ac.getRemoveAbandonedTimeout() * 1000L);
        final ArrayList<PooledObject<T>> remove = new ArrayList<PooledObject<T>>();
        final Iterator<PooledObject<T>> it = allObjects.values().iterator();
        while (it.hasNext()) {
            final PooledObject<T> pooledObject = it.next();
            synchronized (pooledObject) {
                if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                        pooledObject.getLastUsedTime() <= timeout) {
                    pooledObject.markAbandoned();
                    remove.add(pooledObject);
                }
            }
        }

        // Now remove the abandoned objects
        final Iterator<PooledObject<T>> itr = remove.iterator();
        while (itr.hasNext()) {
            final PooledObject<T> pooledObject = itr.next();
            if (ac.getLogAbandoned()) {
                pooledObject.printStackTrace(ac.getLogWriter());
            }
            try {
                invalidateObject(pooledObject.getObject());
            } catch (final Exception e) {
                e.printStackTrace();
            }
        }
    }
歸還連接
    public void returnObject(final T obj) {
        //從所有對象池中獲取返回的對象
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));

        if (p == null) {
            //如果沒有遺棄配置AbandonedConfig,拋出異常,有則直接返回
            if (!isAbandonedConfig()) {
                throw new IllegalStateException(
                        "Returned object not currently part of this pool");
            }
            return; // Object was abandoned and removed
        }

        synchronized(p) {
            final PooledObjectState state = p.getState();
            //判斷對象狀態(tài)是否是正在使用,如果不是拋出異常,是則修改對象狀態(tài)為正在歸還,防止被遺棄
            if (state != PooledObjectState.ALLOCATED) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
            p.markReturning(); // Keep from being marked abandoned
        }

        final long activeTime = p.getActiveTimeMillis();

        if (getTestOnReturn()) {
            //如果對象無效
            if (!factory.validateObject(p)) {
                try {
                    //銷毀對象,在空閑隊列,所有集合中剔除對象,并且更新銷毀對象數(shù)量,創(chuàng)建對象數(shù)量
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    //試圖確保空閑池中存在有可用的實例
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                updateStatsReturn(activeTime);
                return;
            }
        }

        try {
            //鈍化對象,下次之前可以再復用該對象,比如對象是StringBuffer,可以用setLength(0)清空
            factory.passivateObject(p);
        } catch (final Exception e1) {
            swallowException(e1);
            try {
                //同上
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                //同上
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        //釋放資源
        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        //空閑隊列是否已經(jīng)等于配置的最多空閑數(shù)量,如果是則銷毀對象,不是則歸還到空閑隊列中
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            //如果配置的是先進先出,先進后出歸還到空閑隊列中
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }

PoolableObjectFactory : 可池化對象的維護工廠

public interface PooledObjectFactory<T> {
  /**
   * 創(chuàng)建對象
   * Create an instance that can be served by the pool and wrap it in a
   * {@link PooledObject} to be managed by the pool.
   *
   * @return a {@code PooledObject} wrapping an instance that can be served by the pool
   *
   * @throws Exception if there is a problem creating a new instance,
   *    this will be propagated to the code requesting an object.
   */
  PooledObject<T> makeObject() throws Exception;

  /**
   * 銷毀對象
   * Destroys an instance no longer needed by the pool.
   * <p>
   * It is important for implementations of this method to be aware that there
   * is no guarantee about what state <code>obj</code> will be in and the
   * implementation should be prepared to handle unexpected errors.
   * <p>
   * Also, an implementation must take in to consideration that instances lost
   * to the garbage collector may never be destroyed.
   * </p>
   *
   * @param p a {@code PooledObject} wrapping the instance to be destroyed
   *
   * @throws Exception should be avoided as it may be swallowed by
   *    the pool implementation.
   *
   * @see #validateObject
   * @see ObjectPool#invalidateObject
   */
  void destroyObject(PooledObject<T> p) throws Exception;

  /**
   * 檢驗對象的有效性
   * Ensures that the instance is safe to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be validated
   *
   * @return <code>false</code> if <code>obj</code> is not valid and should
   *         be dropped from the pool, <code>true</code> otherwise.
   */
  boolean validateObject(PooledObject<T> p);

  /**
   * 激活對象
   * Reinitialize an instance to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be activated
   *
   * @throws Exception if there is a problem activating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void activateObject(PooledObject<T> p) throws Exception;

  /**
   * 鈍化對象,簡單來說就是在歸還對象的時候,清空對象,下次借用的可以直接使用
   * Uninitialize an instance to be returned to the idle object pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be passivated
   *
   * @throws Exception if there is a problem passivating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void passivateObject(PooledObject<T> p) throws Exception;
}

看看jedis的實現(xiàn)JedisFactory,方便理解

class JedisFactory implements PooledObjectFactory<Jedis> {

  @Override
  public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.getDB() != database) {
      jedis.select(database);
    }

  }

  @Override
  public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
    final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.isConnected()) {
      try {
        try {
          jedis.quit();
        } catch (Exception e) {
        }
        jedis.disconnect();
      } catch (Exception e) {

      }
    }

  }

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    final HostAndPort hostAndPort = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
        soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

    try {
      jedis.connect();
      if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }

    return new DefaultPooledObject<Jedis>(jedis);

  }

  @Override
  public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    // TODO maybe should select db 0? Not sure right now.
  }

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}

GenericObjectPoolConfig : 連接池配置

  • lifo: true為先進先出;false為先進后出,默認為true,表示對象的出借方式
  • maxWaitMillis: 當連接池資源耗盡時,調(diào)用者最大等待阻塞的時間(ms),默認為-1表示永不超時,建議設置值,如果資源一直等待超時,會卡死服務
  • maxTotal: 連接池中最大連接數(shù),默認為8.
  • maxIdle: 連接池中最大空閑的連接數(shù),默認為8.該參數(shù)一般盡量與maxTotal相同,以提高并發(fā)數(shù)
  • minIdle: 連接池中最小空閑的連接數(shù),默認為0,該參數(shù)一般盡量比maxIdle小一些
  • blockWhenExhausted: 當連接池資源耗盡時,是否會阻塞等待,默認為true:阻塞
  • testOnBorrow: 調(diào)用者獲取連接池資源時,是否檢測是有有效,如果無效則從連接池中移除,并嘗試繼續(xù)獲取。默認為false。建議保持默認值
  • testOnReturn: 向連接池歸還連接時,是否檢測“連接”對象的有效性。默認為false。建議保持默認值
  • testOnCreate:向連接池添加創(chuàng)建對象時,是否檢測“連接”對象的有效性。默認為false。建議保持默認值
  • testWhileIdle: 當驅(qū)逐空閑隊列的連接對象時,是否允許空閑時進行有效性測試,默認為false
  • timeBetweenEvictionRunsMillis: “空閑連接”驅(qū)逐線程,檢測的周期,毫秒數(shù)。如果為負值,表示不運行“驅(qū)逐線程”。默認為-1
  • numTestsPerEvictionRun:驅(qū)逐線程一次運行檢查多少條“連接”,不要設置太大,太大需要更多的時間來執(zhí)行
  • minEvictableIdleTimeMillis: 連接空閑的最小時間,達到此值后空閑連接將可能會被移除。負值(-1)表示不移除
  • softMinEvictableIdleTimeMillis: 連接空閑的最小時間,達到此值后空閑鏈接將會被移除,且保留“minIdle”個空閑連接數(shù)。默認為-1.

PooledObject對象的狀態(tài)

/**
 * Provides the possible states that a {@link PooledObject} may be in.
 *
 * @version $Revision: $
 *
 * @since 2.0
 */
public enum PooledObjectState {
    /**
     * In the queue, not in use.
     * 位于隊列中,未使用
     */
    IDLE,

    /**
     * In use.
     * 在使用
     */
    ALLOCATED,

    /**
     * In the queue, currently being tested for possible eviction.
     * 位于隊列中,當前正在測試,可能會被回收
     */
    EVICTION,

    /**
     * Not in the queue, currently being tested for possible eviction. An
     * attempt to borrow the object was made while being tested which removed it
     * from the queue. It should be returned to the head of the queue once
     * eviction testing completes.
     * TODO: Consider allocating object and ignoring the result of the eviction
     *       test.
     * 不在隊列中,當前正在測試,可能會被回收。從池中借出對象時需要從隊列出移除并進行測試
     */
    EVICTION_RETURN_TO_HEAD,

    /**
     * In the queue, currently being validated.
     * 2.0沒有用到
     */
    VALIDATION,

    /**
     * Not in queue, currently being validated. The object was borrowed while
     * being validated and since testOnBorrow was configured, it was removed
     * from the queue and pre-allocated. It should be allocated once validation
     * completes.
     * 2.0沒有用到
     */
    VALIDATION_PREALLOCATED,

    /**
     * Not in queue, currently being validated. An attempt to borrow the object
     * was made while previously being tested for eviction which removed it from
     * the queue. It should be returned to the head of the queue once validation
     * completes.
     * 2.0沒有用到
     */
    VALIDATION_RETURN_TO_HEAD,

    /**
     * Failed maintenance (e.g. eviction test or validation) and will be / has
     * been destroyed
     * 回收或驗證失敗,將銷毀
     */
    INVALID,

    /**
     * Deemed abandoned, to be invalidated.
     * 即將無效
     */
    ABANDONED,

    /**
     * Returning to the pool.
     * 正在返還到池中
     */
    RETURNING
}

LinkedBlockingDeque是保存空閑隊列的地方,借出,歸還都在這里

雙向鏈表實現(xiàn)的雙向并發(fā)阻塞隊列。該阻塞隊列同時支持FIFO和FILO兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除);并且,該阻塞隊列是支持線程安全

private static final class Node<E> {
        /**
         * The item, or null if this node has been removed.
         */
        E item;

        /**
         * One of:
         * - the real predecessor Node
         * - this Node, meaning the predecessor is tail
         * - null, meaning there is no predecessor
         */
        Node<E> prev;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head
         * - null, meaning there is no successor
         */
        Node<E> next;

        /**
         * Create a new list node.
         *
         * @param x The list item
         * @param p Previous item
         * @param n Next item
         */
        Node(final E x, final Node<E> p, final Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
}
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 緩存池的作用就是減少重復創(chuàng)建資源,不是到每次需要使用資源再創(chuàng)建,而是提前準備好資源,每次需要時,從池中獲取,用完歸...
    zyzab閱讀 890評論 0 0
  • 一、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是,通過 New 關鍵字創(chuàng)...
    Java旅行者閱讀 4,865評論 0 44
  • 前言 記錄個人在2017年08月的學習和總結,不定期更新 2017-08-02 有序的Map HashMap是無序...
    Kevin_ZGJ閱讀 450評論 0 0
  • layout: posttitle: 《Java并發(fā)編程的藝術》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,018評論 1 19
  • day79《韓三篇》 “革命”和“民主”是兩個名詞,這兩個名詞是完全不等同的,革命不保證就能帶來民主?,F(xiàn)今中國...
    Molly_zhang閱讀 709評論 0 0

友情鏈接更多精彩內(nèi)容