02. 我看到的Druid包結(jié)構(gòu) -- pool

前言

????本篇從上邊說的方式,自頂向下,從粗到細(xì)的思路來進(jìn)行學(xué)習(xí)。 知道druid核心功能、每個(gè)功能大概實(shí)現(xiàn)原理。

學(xué)習(xí)步驟 - 包結(jié)構(gòu)

druid
 |
  --src
    --main
      -- java
          -- com.alibaba.druid
              -- filter                       (過濾器相關(guān))
              -- mock                    (猜測(cè)測(cè)試相關(guān))
              -- pool                      (數(shù)據(jù)庫連接池相關(guān))
              -- proxy                    (jdbc API相關(guān)代理實(shí)現(xiàn))
              -- sql                        (sql解析相關(guān))
              -- stat                       (增強(qiáng)功能:統(tǒng)計(jì)類信息)
              -- support                 (支持類)
              -- util                         (工具類)
              -- wall                        (增強(qiáng)功能:防sql注入相關(guān))

          -- META-INF
      -- resources
      -- scripts
    -- test
  
  -- pom.xml    

看完包結(jié)構(gòu),以及大概猜測(cè)的含義,接下來,進(jìn)入每個(gè)包,看下具體含義,以及猜測(cè)是否正確。這里的順序呢,我們以理解的重要程度來作分析。

本篇目的:分析Pool包

打開pool包,我們發(fā)現(xiàn)有三個(gè)子包ha、vendor、xa,以及其他的針對(duì)jdbcApi的池化實(shí)現(xiàn)(datasource、connection、statement、resultset等)


ha子包

????從字面意思理解,是高可用(high availability)

  1. selector包
    這里主要是圍繞都數(shù)據(jù)源的時(shí)候,具體使用哪個(gè)數(shù)據(jù)源的選舉展開的,所以,我們弄明白他的設(shè)計(jì)就可以了。(抓住主干)
  • DataSourceSelector:數(shù)據(jù)源選舉的抽象方法定義(接口)


    DataSourceSelector的實(shí)現(xiàn)類.png

    我們看到他的實(shí)現(xiàn)類有三個(gè),分別是按名稱、隨機(jī)、粘性選舉。

  • NamedDataSourceSelector
    按名稱來選舉。大致實(shí)現(xiàn)邏輯:使用方通過ThreadLocal傳遞需要使用的數(shù)據(jù)源的名稱來獲取對(duì)應(yīng)的數(shù)據(jù)源。如果使用方有傳遞這個(gè)名稱,則按照這個(gè)名來從數(shù)據(jù)源map(這個(gè)是構(gòu)造時(shí)入?yún)ighAvailableDataSource自帶的)中取出來。如果沒有這個(gè)名稱,則會(huì)取默認(rèn)的名稱,然后再取數(shù)據(jù)源。

  • RandomDataSourceSelector
    隨機(jī)選取數(shù)據(jù)源。從數(shù)據(jù)源map(同上解釋)中,根據(jù)總大小使用隨機(jī)函數(shù)(Random.nextInt())選取一個(gè)數(shù)據(jù)源返回。

  • StickyRandomDataSourceSelector
    粘性選取數(shù)據(jù)源。這是一個(gè)比較特殊的選取器。它繼承隨機(jī)選取數(shù)據(jù)源,也就是說如果是第一次進(jìn)來,就是走的隨機(jī)選取。那區(qū)別在哪呢?在選取后,它會(huì)把這個(gè)數(shù)據(jù)源跟綁定到當(dāng)前線程(ThreadLocal),會(huì)持續(xù)一段時(shí)間(這個(gè)時(shí)間可以配置)。

2.HighAvailableDataSource:DataSource class which contains multiple DataSource objects. 包含多個(gè)數(shù)據(jù)源對(duì)象的數(shù)據(jù)源類
剛剛上面有提到過,三個(gè)選取器的構(gòu)造入?yún)⒕褪沁@個(gè)類的示例。它實(shí)現(xiàn)了javax.sql.DataSource,意味著它對(duì)外就是一個(gè)datasource示例。它的實(shí)例化工作在getConnection()方法,調(diào)用的時(shí)候,會(huì)初始化一個(gè)數(shù)據(jù)源集合(初始化邏輯通過DataSourceCreator),然后初始化數(shù)據(jù)源選取器,可以設(shè)置選取器類型(就是上面提到的三種),如果沒有設(shè)置,使用隨機(jī)類型。初始化完畢后,使用數(shù)據(jù)源選取器獲取數(shù)據(jù)源,然后返回該數(shù)據(jù)源的一個(gè)鏈接(這個(gè)是正常的jdbc獲取鏈接的思路)。

3.DataSourceCreator:An utility class to create DruidDataSource dynamically. 動(dòng)態(tài)創(chuàng)建DruidDataSource 的工具類。
上面有提到過,高可用數(shù)據(jù)源的底層初始化數(shù)據(jù)源集合就是使用該類。通過構(gòu)造進(jìn)來配置文件,以及配置參數(shù)的前綴,做解析,緊接著創(chuàng)建數(shù)據(jù)源集合。具體創(chuàng)建過程就是拿解析到的name、url、username、psw,直接構(gòu)建DruidDataSource,除了這幾個(gè)基本參數(shù)外,其他參數(shù)使用高可用數(shù)據(jù)源實(shí)例自己的默認(rèn)值(外面也可以設(shè)置)

總結(jié)下ha包,使用的場(chǎng)景就是你有多數(shù)據(jù)源的時(shí)候。通過統(tǒng)一的高可用數(shù)據(jù)源對(duì)外充當(dāng)只有一個(gè)數(shù)據(jù)源(具體實(shí)現(xiàn)邏輯對(duì)使用方不可見)。
比如:

  • 按名稱選舉:主從數(shù)據(jù)源。通過提前設(shè)置數(shù)據(jù)源名稱(高可用數(shù)據(jù)源設(shè)置方法),做到數(shù)據(jù)源的切換。類似思路其實(shí)在sharding-jdbc的讀寫分離中有用到。
  • 隨機(jī)選舉:多個(gè)從數(shù)據(jù)源,之前無差別。
  • 粘性選擇:我沒想到好的場(chǎng)景,可以看看上面的差別,自己想想。

建議:高可用的數(shù)據(jù)源這里,我提一種場(chǎng)景 -- 多個(gè)從庫需要最近路由來做數(shù)據(jù)查詢。其實(shí)現(xiàn)在的選舉器是滿足不了的?,F(xiàn)在的選舉受DataSourceSelectorEnum枚舉的限制,不具有擴(kuò)展能力。自定義規(guī)則無法接入。為啥提這個(gè)呢,其實(shí)就是想實(shí)現(xiàn)類似dubbo(rpc框架)的最近路由或者按group路由。達(dá)到性能最高的目的。


vendor子包

????google一下,是供應(yīng)商、小攤小販的意思。其實(shí)點(diǎn)一點(diǎn)里面的類,大概值圍繞兩種接口的實(shí)現(xiàn)來做的。

  • ExceptionSorter
    An interface to allow for exception evaluation. 允許評(píng)估異常的接口。直接這么看是比較生硬的,看下它定義的方法就知道啥意思了,boolean isExceptionFatal(SQLException e);

    該包下羅列了各種db的實(shí)現(xiàn)類以及mock等異常的判斷。

  • ValidConnectionChecker
    顧名思義,檢查連接是否正常有效。

    這里以MySqlValidConnectionChecker為例,就是判斷判斷connection是否正常,兩種方式,1.使用connection的pingInternal進(jìn)行反射調(diào)用。 2.使用connection執(zhí)行自定義查詢語句。 如果沒有異常拋出,就認(rèn)為是有效的connection。

總結(jié):這個(gè)包名字起的也算到位,就是小功能的集合包。這與這個(gè)包下的功能何時(shí)使用,我們后面看其他包的時(shí)候再看下。


xa子包

????用于兼容分布式事務(wù)相關(guān)。這里可參考維基百科介紹的分布式事務(wù)概念。

  • DruidXADataSource
    extends DruidDataSource implements XADataSource,這里其實(shí)就是擴(kuò)展了普通的datasource,以滿足分布式接口的需要。比如getXAConnection,就是先獲取普通的鏈接,然后根據(jù)不同的數(shù)據(jù)庫方言,使用對(duì)應(yīng)的工具類進(jìn)行創(chuàng)建XAConnection,對(duì)外再統(tǒng)一包裝一層DruidPooledXAConnection。目前支持的方言有:Mysql、postgresql、h2、還有下面擴(kuò)展的jtds。

  • DruidPooledXAConnection
    implements XAConnection,這里是對(duì)不同的方言對(duì)應(yīng)的XAConnection做統(tǒng)一的包裝。

  • JtdsXAResource
    implements XAResource,commit、end、rollback、start等方法,都是包裝在XASupport之上。自實(shí)現(xiàn)了isSameRM

  • JtdsXAConnection
    implements XAConnection ,包裝普通的connection(創(chuàng)建JtdsXAResource、獲取分布式鏈接id)

總結(jié):這里就是對(duì)xa的支持,包裝邏輯類似于前面介紹的selector,可以對(duì)比著理解。


包外的核心類:這里才是重點(diǎn)。
????假如之前沒有使用過druid很可能會(huì)一臉懵逼,不知道從哪入手。不過呢,我們一般可以從一個(gè)框架它的核心功能入手,然后一個(gè)點(diǎn)切進(jìn)去,你就會(huì)發(fā)現(xiàn)不知不覺就都串聯(lián)起來了。我們這里以DruidDataSource會(huì)切入點(diǎn),我們一般使用的時(shí)候,會(huì)直接配置DruidDataSource(spring xml配置方式),并且指定它的初始化方法為init方法。接下我們就從這個(gè)方法入手,看一看究竟干了啥,又是怎樣把這部分核心代碼給串聯(lián)起來的。

    /**
     * 數(shù)據(jù)源的初始化入口
     *
     * @throws SQLException
     */
    public void init() throws SQLException {
//        成員變量,如果已經(jīng)初始化,直接return
        if (inited) {
            return;
        }

        // bug fixed for dead lock, for issue #2980
//        這里主要是初始化這個(gè)類,利用靜態(tài)方法注冊(cè)自己的驅(qū)動(dòng)類
        DruidDriver.getInstance();

        final ReentrantLock lock = this.lock;
        try {
//            嘗試拿鎖
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
//            雙重檢查用于更安全(常用于同步后,再做層檢查)
            if (inited) {
                return;
            }

//            調(diào)用棧信息
            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

//            內(nèi)存級(jí)別的id原子加1
            this.id = DruidDriver.createDataSourceId();
//            如果不是第一個(gè)數(shù)據(jù)源,則它下面的幾個(gè)屬性做內(nèi)存指令級(jí)別的直接更新,跨度10萬個(gè)
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
//                如果是druid自定義包裝的jdbcurl做解析
                initFromWrapDriverUrl();
            }

            /**
             * 通過filter(通過datasource的property參數(shù)注入)去裝飾datasource,每個(gè)filter關(guān)注的功能不同,比如configFilter(去解密秘鑰、設(shè)置其他屬性)
             */
            for (Filter filter : filters) {
                filter.init(this);
            }

            if (this.dbType == null || this.dbType.length() == 0) {
                this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
            }

            if (JdbcConstants.MYSQL.equals(this.dbType)
                    || JdbcConstants.MARIADB.equals(this.dbType)
                    || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
                boolean cacheServerConfigurationSet = false;
                if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                    cacheServerConfigurationSet = true;
                } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                    cacheServerConfigurationSet = true;
                }
                if (cacheServerConfigurationSet) {
                    this.connectProperties.put("cacheServerConfiguration", "true");
                }
            }

            if (maxActive <= 0) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (maxActive < minIdle) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (getInitialSize() > maxActive) {
                throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
            }

            if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
                throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
            }

            if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
                throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
            }

            if (this.driverClass != null) {
                this.driverClass = driverClass.trim();
            }

            //初始化spi的fitler
            initFromSPIServiceLoader();

//            如果沒有直接設(shè)置,會(huì)從url中識(shí)別出dbType,然后返回定義好的DriverClassName
            if (this.driver == null) {
                if (this.driverClass == null || this.driverClass.isEmpty()) {
                    this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
                }

                if (MockDriver.class.getName().equals(driverClass)) {
                    driver = MockDriver.instance;
                } else {
                    if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                        throw new SQLException("url not set");
                    }
//                    加載真實(shí)驅(qū)動(dòng)
                    driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
                }
            } else {
                if (this.driverClass == null) {
                    this.driverClass = driver.getClass().getName();
                }
            }

//            一些基本的簡單檢查,比如oracle某些版本不支持等
            initCheck();
//          根據(jù)驅(qū)動(dòng)類型,初始化異常的排序器,在 pool-ventor包下,主要是用來判斷異常的嚴(yán)重性
            initExceptionSorter();
//          初始化鏈接的檢查器,同上ExceptionSorter同一包下,用于檢查鏈接是否正常
            initValidConnectionChecker();
//            檢查校驗(yàn)鏈接是否有效的sql語句是否設(shè)定。
            validationQueryCheck();


            if (isUseGlobalDataSourceStat()) {
//                使用全局的數(shù)據(jù)源統(tǒng)計(jì)
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbType);
                }
            } else {
//                不使用全局?jǐn)?shù)據(jù)源統(tǒng)計(jì),新建
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
            }
//            設(shè)置數(shù)據(jù)源統(tǒng)計(jì)是否支持重置,可以追下代碼,發(fā)現(xiàn)是servlet調(diào)用過來進(jìn)行reset。
            dataSourceStat.setResetStatEnable(this.resetStatEnable);

//            鏈接持有者 數(shù)組
            connections = new DruidConnectionHolder[maxActive];
//            被驅(qū)逐鏈接持有者 數(shù)組
            evictConnections = new DruidConnectionHolder[maxActive];
//            ?;铈溄映钟姓?數(shù)組
            keepAliveConnections = new DruidConnectionHolder[maxActive];

            SQLException connectError = null;

//             創(chuàng)建指定的初始化個(gè)數(shù)的鏈接,并與holder綁定
            if (createScheduler != null && asyncInit) {
//               如果是異步創(chuàng)建
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
//                同步創(chuàng)建
                // init connections
                while (poolingCount < initialSize) {
                    try {
//                        創(chuàng)建物理連接,里面會(huì)調(diào)用driver的連接方法,然后就會(huì)靜態(tài)緩存datasource信息,供其他地方使用
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

//            創(chuàng)建數(shù)據(jù)源打印統(tǒng)計(jì)信息的線程并啟動(dòng)開始打印
            createAndLogThread();
//          創(chuàng)建并啟動(dòng)鏈接的創(chuàng)建線程(異步并且沒有調(diào)度線程池的時(shí)候使用,不會(huì)走上面兩種判斷的初始化)
            createAndStartCreatorThread();
//            用于檢查最大生命周期、驅(qū)逐鏈接、最大超時(shí)時(shí)間等配置,用于剔除或者標(biāo)記鏈接
            createAndStartDestroyThread();

//            保證前面兩個(gè)線程的創(chuàng)建
            initedLatch.await();
            init = true;

            initedTime = new Date();
//            注冊(cè)Mbean,用于對(duì)外提供監(jiān)控等功能,可參考(https://blog.csdn.net/u013256816/article/details/52800742)
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (RuntimeException e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (Error e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;

        } finally {
            inited = true;
            lock.unlock();

            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }
    }

如上就是整個(gè)datasource的初始化過程,具體直接看注釋,我覺得還是夠嗆,還是建議自己讀一遍代碼,到具體方法了,就點(diǎn)進(jìn)去,看下一下方法干什么。這樣整個(gè)就了解了。
其實(shí)整個(gè)過程也就是:校驗(yàn)基本參數(shù)、初始化鏈接、校驗(yàn)鏈接、輸出統(tǒng)計(jì)信息等。

如上就是整個(gè)Pool包的包結(jié)構(gòu)情況。我們可以看出來,主要是圍繞數(shù)據(jù)源、鏈接相關(guān),展開一系列的周邊建設(shè)。

ok,pool包分析就到這里。有什么疑問,可以下面留言給我。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容