前言
????本篇從上邊說的方式,自頂向下,從粗到細(xì)的思路來進(jìn)行學(xué)習(xí)。 知道druid核心功能、每個(gè)功能大概實(shí)現(xiàn)原理。
學(xué)習(xí)步驟 - 包結(jié)構(gòu)
druid
|
--src
--main
-- java
-- com.alibaba.druid
-- filter (過濾器相關(guān))
-- mock (猜測(cè)測(cè)試相關(guān))
-- pool (數(shù)據(jù)庫連接池相關(guān))
-- proxy (jdbc API相關(guān)代理實(shí)現(xiàn))
-- sql (sql解析相關(guān))
-- stat (增強(qiáng)功能:統(tǒng)計(jì)類信息)
-- support (支持類)
-- util (工具類)
-- wall (增強(qiáng)功能:防sql注入相關(guān))
-- META-INF
-- resources
-- scripts
-- test
-- pom.xml
看完包結(jié)構(gòu),以及大概猜測(cè)的含義,接下來,進(jìn)入每個(gè)包,看下具體含義,以及猜測(cè)是否正確。這里的順序呢,我們以理解的重要程度來作分析。
本篇目的:分析Pool包
打開pool包,我們發(fā)現(xiàn)有三個(gè)子包ha、vendor、xa,以及其他的針對(duì)jdbcApi的池化實(shí)現(xiàn)(datasource、connection、statement、resultset等)
ha子包
????從字面意思理解,是高可用(high availability)
- selector包
這里主要是圍繞都數(shù)據(jù)源的時(shí)候,具體使用哪個(gè)數(shù)據(jù)源的選舉展開的,所以,我們弄明白他的設(shè)計(jì)就可以了。(抓住主干)
-
DataSourceSelector:數(shù)據(jù)源選舉的抽象方法定義(接口)
DataSourceSelector的實(shí)現(xiàn)類.png
我們看到他的實(shí)現(xiàn)類有三個(gè),分別是按名稱、隨機(jī)、粘性選舉。
NamedDataSourceSelector
按名稱來選舉。大致實(shí)現(xiàn)邏輯:使用方通過ThreadLocal傳遞需要使用的數(shù)據(jù)源的名稱來獲取對(duì)應(yīng)的數(shù)據(jù)源。如果使用方有傳遞這個(gè)名稱,則按照這個(gè)名來從數(shù)據(jù)源map(這個(gè)是構(gòu)造時(shí)入?yún)ighAvailableDataSource自帶的)中取出來。如果沒有這個(gè)名稱,則會(huì)取默認(rèn)的名稱,然后再取數(shù)據(jù)源。RandomDataSourceSelector
隨機(jī)選取數(shù)據(jù)源。從數(shù)據(jù)源map(同上解釋)中,根據(jù)總大小使用隨機(jī)函數(shù)(Random.nextInt())選取一個(gè)數(shù)據(jù)源返回。StickyRandomDataSourceSelector
粘性選取數(shù)據(jù)源。這是一個(gè)比較特殊的選取器。它繼承隨機(jī)選取數(shù)據(jù)源,也就是說如果是第一次進(jìn)來,就是走的隨機(jī)選取。那區(qū)別在哪呢?在選取后,它會(huì)把這個(gè)數(shù)據(jù)源跟綁定到當(dāng)前線程(ThreadLocal),會(huì)持續(xù)一段時(shí)間(這個(gè)時(shí)間可以配置)。
2.HighAvailableDataSource:DataSource class which contains multiple DataSource objects. 包含多個(gè)數(shù)據(jù)源對(duì)象的數(shù)據(jù)源類
剛剛上面有提到過,三個(gè)選取器的構(gòu)造入?yún)⒕褪沁@個(gè)類的示例。它實(shí)現(xiàn)了javax.sql.DataSource,意味著它對(duì)外就是一個(gè)datasource示例。它的實(shí)例化工作在getConnection()方法,調(diào)用的時(shí)候,會(huì)初始化一個(gè)數(shù)據(jù)源集合(初始化邏輯通過DataSourceCreator),然后初始化數(shù)據(jù)源選取器,可以設(shè)置選取器類型(就是上面提到的三種),如果沒有設(shè)置,使用隨機(jī)類型。初始化完畢后,使用數(shù)據(jù)源選取器獲取數(shù)據(jù)源,然后返回該數(shù)據(jù)源的一個(gè)鏈接(這個(gè)是正常的jdbc獲取鏈接的思路)。
3.DataSourceCreator:An utility class to create DruidDataSource dynamically. 動(dòng)態(tài)創(chuàng)建DruidDataSource 的工具類。
上面有提到過,高可用數(shù)據(jù)源的底層初始化數(shù)據(jù)源集合就是使用該類。通過構(gòu)造進(jìn)來配置文件,以及配置參數(shù)的前綴,做解析,緊接著創(chuàng)建數(shù)據(jù)源集合。具體創(chuàng)建過程就是拿解析到的name、url、username、psw,直接構(gòu)建DruidDataSource,除了這幾個(gè)基本參數(shù)外,其他參數(shù)使用高可用數(shù)據(jù)源實(shí)例自己的默認(rèn)值(外面也可以設(shè)置)
總結(jié)下ha包,使用的場(chǎng)景就是你有多數(shù)據(jù)源的時(shí)候。通過統(tǒng)一的高可用數(shù)據(jù)源對(duì)外充當(dāng)只有一個(gè)數(shù)據(jù)源(具體實(shí)現(xiàn)邏輯對(duì)使用方不可見)。
比如:
- 按名稱選舉:主從數(shù)據(jù)源。通過提前設(shè)置數(shù)據(jù)源名稱(高可用數(shù)據(jù)源設(shè)置方法),做到數(shù)據(jù)源的切換。類似思路其實(shí)在sharding-jdbc的讀寫分離中有用到。
- 隨機(jī)選舉:多個(gè)從數(shù)據(jù)源,之前無差別。
- 粘性選擇:我沒想到好的場(chǎng)景,可以看看上面的差別,自己想想。
建議:高可用的數(shù)據(jù)源這里,我提一種場(chǎng)景 -- 多個(gè)從庫需要最近路由來做數(shù)據(jù)查詢。其實(shí)現(xiàn)在的選舉器是滿足不了的?,F(xiàn)在的選舉受DataSourceSelectorEnum枚舉的限制,不具有擴(kuò)展能力。自定義規(guī)則無法接入。為啥提這個(gè)呢,其實(shí)就是想實(shí)現(xiàn)類似dubbo(rpc框架)的最近路由或者按group路由。達(dá)到性能最高的目的。
vendor子包
????google一下,是供應(yīng)商、小攤小販的意思。其實(shí)點(diǎn)一點(diǎn)里面的類,大概值圍繞兩種接口的實(shí)現(xiàn)來做的。
-
ExceptionSorter
An interface to allow for exception evaluation. 允許評(píng)估異常的接口。直接這么看是比較生硬的,看下它定義的方法就知道啥意思了,boolean isExceptionFatal(SQLException e);該包下羅列了各種db的實(shí)現(xiàn)類以及mock等異常的判斷。
-
ValidConnectionChecker
顧名思義,檢查連接是否正常有效。這里以MySqlValidConnectionChecker為例,就是判斷判斷connection是否正常,兩種方式,1.使用connection的pingInternal進(jìn)行反射調(diào)用。 2.使用connection執(zhí)行自定義查詢語句。 如果沒有異常拋出,就認(rèn)為是有效的connection。
總結(jié):這個(gè)包名字起的也算到位,就是小功能的集合包。這與這個(gè)包下的功能何時(shí)使用,我們后面看其他包的時(shí)候再看下。
xa子包
????用于兼容分布式事務(wù)相關(guān)。這里可參考維基百科介紹的分布式事務(wù)概念。
DruidXADataSource
extends DruidDataSource implements XADataSource,這里其實(shí)就是擴(kuò)展了普通的datasource,以滿足分布式接口的需要。比如getXAConnection,就是先獲取普通的鏈接,然后根據(jù)不同的數(shù)據(jù)庫方言,使用對(duì)應(yīng)的工具類進(jìn)行創(chuàng)建XAConnection,對(duì)外再統(tǒng)一包裝一層DruidPooledXAConnection。目前支持的方言有:Mysql、postgresql、h2、還有下面擴(kuò)展的jtds。DruidPooledXAConnection
implements XAConnection,這里是對(duì)不同的方言對(duì)應(yīng)的XAConnection做統(tǒng)一的包裝。JtdsXAResource
implements XAResource,commit、end、rollback、start等方法,都是包裝在XASupport之上。自實(shí)現(xiàn)了isSameRMJtdsXAConnection
implements XAConnection ,包裝普通的connection(創(chuàng)建JtdsXAResource、獲取分布式鏈接id)
總結(jié):這里就是對(duì)xa的支持,包裝邏輯類似于前面介紹的selector,可以對(duì)比著理解。
包外的核心類:這里才是重點(diǎn)。
????假如之前沒有使用過druid很可能會(huì)一臉懵逼,不知道從哪入手。不過呢,我們一般可以從一個(gè)框架它的核心功能入手,然后一個(gè)點(diǎn)切進(jìn)去,你就會(huì)發(fā)現(xiàn)不知不覺就都串聯(lián)起來了。我們這里以DruidDataSource會(huì)切入點(diǎn),我們一般使用的時(shí)候,會(huì)直接配置DruidDataSource(spring xml配置方式),并且指定它的初始化方法為init方法。接下我們就從這個(gè)方法入手,看一看究竟干了啥,又是怎樣把這部分核心代碼給串聯(lián)起來的。
/**
* 數(shù)據(jù)源的初始化入口
*
* @throws SQLException
*/
public void init() throws SQLException {
// 成員變量,如果已經(jīng)初始化,直接return
if (inited) {
return;
}
// bug fixed for dead lock, for issue #2980
// 這里主要是初始化這個(gè)類,利用靜態(tài)方法注冊(cè)自己的驅(qū)動(dòng)類
DruidDriver.getInstance();
final ReentrantLock lock = this.lock;
try {
// 嘗試拿鎖
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
// 雙重檢查用于更安全(常用于同步后,再做層檢查)
if (inited) {
return;
}
// 調(diào)用棧信息
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
// 內(nèi)存級(jí)別的id原子加1
this.id = DruidDriver.createDataSourceId();
// 如果不是第一個(gè)數(shù)據(jù)源,則它下面的幾個(gè)屬性做內(nèi)存指令級(jí)別的直接更新,跨度10萬個(gè)
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
// 如果是druid自定義包裝的jdbcurl做解析
initFromWrapDriverUrl();
}
/**
* 通過filter(通過datasource的property參數(shù)注入)去裝飾datasource,每個(gè)filter關(guān)注的功能不同,比如configFilter(去解密秘鑰、設(shè)置其他屬性)
*/
for (Filter filter : filters) {
filter.init(this);
}
if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
}
if (JdbcConstants.MYSQL.equals(this.dbType)
|| JdbcConstants.MARIADB.equals(this.dbType)
|| JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
//初始化spi的fitler
initFromSPIServiceLoader();
// 如果沒有直接設(shè)置,會(huì)從url中識(shí)別出dbType,然后返回定義好的DriverClassName
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set");
}
// 加載真實(shí)驅(qū)動(dòng)
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
// 一些基本的簡單檢查,比如oracle某些版本不支持等
initCheck();
// 根據(jù)驅(qū)動(dòng)類型,初始化異常的排序器,在 pool-ventor包下,主要是用來判斷異常的嚴(yán)重性
initExceptionSorter();
// 初始化鏈接的檢查器,同上ExceptionSorter同一包下,用于檢查鏈接是否正常
initValidConnectionChecker();
// 檢查校驗(yàn)鏈接是否有效的sql語句是否設(shè)定。
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
// 使用全局的數(shù)據(jù)源統(tǒng)計(jì)
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbType);
}
} else {
// 不使用全局?jǐn)?shù)據(jù)源統(tǒng)計(jì),新建
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
// 設(shè)置數(shù)據(jù)源統(tǒng)計(jì)是否支持重置,可以追下代碼,發(fā)現(xiàn)是servlet調(diào)用過來進(jìn)行reset。
dataSourceStat.setResetStatEnable(this.resetStatEnable);
// 鏈接持有者 數(shù)組
connections = new DruidConnectionHolder[maxActive];
// 被驅(qū)逐鏈接持有者 數(shù)組
evictConnections = new DruidConnectionHolder[maxActive];
// ?;铈溄映钟姓?數(shù)組
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
// 創(chuàng)建指定的初始化個(gè)數(shù)的鏈接,并與holder綁定
if (createScheduler != null && asyncInit) {
// 如果是異步創(chuàng)建
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// 同步創(chuàng)建
// init connections
while (poolingCount < initialSize) {
try {
// 創(chuàng)建物理連接,里面會(huì)調(diào)用driver的連接方法,然后就會(huì)靜態(tài)緩存datasource信息,供其他地方使用
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
// 創(chuàng)建數(shù)據(jù)源打印統(tǒng)計(jì)信息的線程并啟動(dòng)開始打印
createAndLogThread();
// 創(chuàng)建并啟動(dòng)鏈接的創(chuàng)建線程(異步并且沒有調(diào)度線程池的時(shí)候使用,不會(huì)走上面兩種判斷的初始化)
createAndStartCreatorThread();
// 用于檢查最大生命周期、驅(qū)逐鏈接、最大超時(shí)時(shí)間等配置,用于剔除或者標(biāo)記鏈接
createAndStartDestroyThread();
// 保證前面兩個(gè)線程的創(chuàng)建
initedLatch.await();
init = true;
initedTime = new Date();
// 注冊(cè)Mbean,用于對(duì)外提供監(jiān)控等功能,可參考(https://blog.csdn.net/u013256816/article/details/52800742)
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true);
}
} else {
this.emptySignal();
}
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited";
LOG.info(msg);
}
}
}
如上就是整個(gè)datasource的初始化過程,具體直接看注釋,我覺得還是夠嗆,還是建議自己讀一遍代碼,到具體方法了,就點(diǎn)進(jìn)去,看下一下方法干什么。這樣整個(gè)就了解了。
其實(shí)整個(gè)過程也就是:校驗(yàn)基本參數(shù)、初始化鏈接、校驗(yàn)鏈接、輸出統(tǒng)計(jì)信息等。
如上就是整個(gè)Pool包的包結(jié)構(gòu)情況。我們可以看出來,主要是圍繞數(shù)據(jù)源、鏈接相關(guān),展開一系列的周邊建設(shè)。
ok,pool包分析就到這里。有什么疑問,可以下面留言給我。
