Druid連接池實(shí)現(xiàn)分析

主要想要了解幾個(gè)問(wèn)題。
1.該連接池是否使用了隊(duì)列,使用的是有界還是無(wú)界隊(duì)列?
2.拒絕策略是什么樣的?
3.如何被Spring引用的?
4.Spring如何通過(guò)該連接池獲取數(shù)據(jù)庫(kù)連接的?

項(xiàng)目是Springboot引入druid-spring-boot-starter來(lái)使用Druid,因此從start開(kāi)始分析

druid-spring-boot-starter分析

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure

DruidDataSourceAutoConfigure
該類(lèi)引入各種配置,看名字,數(shù)據(jù)庫(kù)狀態(tài)的配置一律不管,帶Stat的全部過(guò)濾,先關(guān)注下DataSource這個(gè)Bean,該Bean在沒(méi)有數(shù)據(jù)源時(shí)會(huì)生效。返回了一個(gè)包裝類(lèi)DruidDataSourceWrapper

@Configuration
@ConditionalOnClass({DruidDataSource.class})
@AutoConfigureBefore({DataSourceAutoConfiguration.class})
@EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class})
@Import({DruidSpringAopConfiguration.class, DruidStatViewServletConfiguration.class, DruidWebStatFilterConfiguration.class, DruidFilterConfiguration.class})
public class DruidDataSourceAutoConfigure {
    private static final Logger LOGGER = LoggerFactory.getLogger(DruidDataSourceAutoConfigure.class);

    public DruidDataSourceAutoConfigure() {
    }

    @Bean(
        initMethod = "init"
    )
    @ConditionalOnMissingBean
    public DataSource dataSource() {
        LOGGER.info("Init DruidDataSource");
        return new DruidDataSourceWrapper();
    }
}

DruidDataSourceWrapper部分代碼
實(shí)現(xiàn)了InitializingBean,設(shè)置了數(shù)據(jù)庫(kù)核心屬性,然后繼承了DruidDataSource,最后增加了許多filter添加方法

public void afterPropertiesSet() throws Exception {
        if (super.getUsername() == null) {
            super.setUsername(this.basicProperties.determineUsername());
        }

        if (super.getPassword() == null) {
            super.setPassword(this.basicProperties.determinePassword());
        }

        if (super.getUrl() == null) {
            super.setUrl(this.basicProperties.determineUrl());
        }

        if (super.getDriverClassName() == null) {
            super.setDriverClassName(this.basicProperties.getDriverClassName());
        }

    }

DruidDataSource
核心類(lèi),繼承自DruidAbstractDataSource模板,實(shí)習(xí)了DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration等接口
部分值得關(guān)注的屬性

    private volatile DruidConnectionHolder[] connections;
    private DruidConnectionHolder[] evictConnections;
    private DruidConnectionHolder[] keepAliveConnections;
    private DruidDataSource.CreateConnectionThread createConnectionThread;
    private DruidDataSource.DestroyConnectionThread destroyConnectionThread;
    private LogStatsThread   logStatsThread;
    public static ThreadLocal<Long> waitNanosLocal = new ThreadLocal();
    private final CountDownLatch initedLatch;
    private volatile boolean enable;

    protected static final AtomicLongFieldUpdater<DruidDataSource> recycleErrorCountUpdater = AtomicLongFieldUpdater.newUpdater(DruidDataSource.class, "recycleErrorCount");
    protected static final AtomicLongFieldUpdater<DruidDataSource> connectErrorCountUpdater = AtomicLongFieldUpdater.newUpdater(DruidDataSource.class, "connectErrorCount");
    protected static final AtomicLongFieldUpdater<DruidDataSource> resetCountUpdater = AtomicLongFieldUpdater.newUpdater(DruidDataSource.class, "resetCount");

第一個(gè)屬性connections,必然是數(shù)據(jù)庫(kù)連接,沒(méi)跑了,DruidConnectionHolder必然是保存連接的類(lèi)。
DruidConnectionHolder

  protected final DruidAbstractDataSource       dataSource;
  protected final long                          connectionId;
  protected final Connection                    conn;
  protected final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();
  protected volatile long                       lastActiveTimeMillis;
  protected volatile long                       lastValidTimeMillis;
  private long                                  useCount                 = 0;
  private long                                  lastNotEmptyWaitNanos;
  protected final boolean                       defaultAutoCommit;
  protected boolean                             discard                  = false;
  public DruidConnectionHolder(DruidAbstractDataSource dataSource, Connection conn, long connectNanoSpan,
                                 Map<String, Object> variables, Map<String, Object> globleVariables)
                                                                                                    throws SQLException{
        this.dataSource = dataSource;
        this.conn = conn;
        this.createNanoSpan = connectNanoSpan;
        this.variables = variables;
        this.globleVariables = globleVariables;

        this.connectTimeMillis = System.currentTimeMillis();
        this.lastActiveTimeMillis = connectTimeMillis;

        this.underlyingAutoCommit = conn.getAutoCommit();

        if (conn instanceof WrapperProxy) {
            this.connectionId = ((WrapperProxy) conn).getId();
        } else {
            this.connectionId = dataSource.createConnectionId();
        }

        {
            boolean initUnderlyHoldability = !holdabilityUnsupported;
            if (JdbcConstants.SYBASE.equals(dataSource.dbType) //
                || JdbcConstants.DB2.equals(dataSource.dbType) //
                || JdbcConstants.HIVE.equals(dataSource.dbType) //
                || JdbcConstants.ODPS.equals(dataSource.dbType) //
            ) {
                initUnderlyHoldability = false;
            }
            if (initUnderlyHoldability) {
                try {
                    this.underlyingHoldability = conn.getHoldability();
                } catch (UnsupportedOperationException e) {
                    holdabilityUnsupported = true;
                    LOG.warn("getHoldability unsupported", e);
                } catch (SQLFeatureNotSupportedException e) {
                    holdabilityUnsupported = true;
                    LOG.warn("getHoldability unsupported", e);
                } catch (SQLException e) {
                    // bug fixed for hive jdbc-driver
                    if ("Method not supported".equals(e.getMessage())) {
                        holdabilityUnsupported = true;
                    }
                    LOG.warn("getHoldability error", e);
                }
            }
        }

        this.underlyingReadOnly = conn.isReadOnly();
        try {
            this.underlyingTransactionIsolation = conn.getTransactionIsolation();
        } catch (SQLException e) {
            // compartible for alibaba corba
            if ("HY000".equals(e.getSQLState())
                    || "com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException".equals(e.getClass().getName())) {
                // skip
            } else {
                throw e;
            }
        }

        this.defaultHoldability = underlyingHoldability;
        this.defaultTransactionIsolation = underlyingTransactionIsolation;
        this.defaultAutoCommit = underlyingAutoCommit;
        this.defaultReadOnly = underlyingReadOnly;
    }

在構(gòu)造方法里面,查詢(xún)了下數(shù)據(jù)庫(kù)當(dāng)前的隔離級(jí)別,設(shè)置了connectionId,autoCommit,connectTimeMillis,基本上沒(méi)啥需要特殊注意的。返回繼續(xù)看DruidDataSource類(lèi)。

    private volatile DruidConnectionHolder[] connections;
    private DruidConnectionHolder[]          evictConnections;
    private DruidConnectionHolder[]          keepAliveConnections;

connections連接的總數(shù),evictConnections回收的連接,keepAliveConnections活躍的總數(shù)

接下來(lái)看下DruidDataSource的初始化方法,一共200多行,還是比較長(zhǎng)的

public void init() throws SQLException {
        if (inited) {
            return;
        }

        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            if (inited) {
                return;
            }

            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

            this.id = DruidDriver.createDataSourceId();
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

            for (Filter filter : filters) {
                filter.init(this);
            }

            if (this.dbType == null || this.dbType.length() == 0) {
                this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
            }

            if (JdbcConstants.MYSQL.equals(this.dbType)
                    || JdbcConstants.MARIADB.equals(this.dbType)
                    || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
                boolean cacheServerConfigurationSet = false;
                if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                    cacheServerConfigurationSet = true;
                } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                    cacheServerConfigurationSet = true;
                }
                if (cacheServerConfigurationSet) {
                    this.connectProperties.put("cacheServerConfiguration", "true");
                }
            }

            if (maxActive <= 0) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (maxActive < minIdle) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (getInitialSize() > maxActive) {
                throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
            }

            if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
                throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
            }

            if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
                throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
            }

            if (this.driverClass != null) {
                this.driverClass = driverClass.trim();
            }

            initFromSPIServiceLoader();

            if (this.driver == null) {
                if (this.driverClass == null || this.driverClass.isEmpty()) {
                    this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
                }

                if (MockDriver.class.getName().equals(driverClass)) {
                    driver = MockDriver.instance;
                } else {
                    driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
                }
            } else {
                if (this.driverClass == null) {
                    this.driverClass = driver.getClass().getName();
                }
            }

            initCheck();

            initExceptionSorter();
            initValidConnectionChecker();
            validationQueryCheck();

            if (isUseGlobalDataSourceStat()) {
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbType);
                }
            } else {
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
            }
            dataSourceStat.setResetStatEnable(this.resetStatEnable);

            connections = new DruidConnectionHolder[maxActive];
            evictConnections = new DruidConnectionHolder[maxActive];
            keepAliveConnections = new DruidConnectionHolder[maxActive];

            SQLException connectError = null;

            if (createScheduler != null) {
                for (int i = 0; i < initialSize; ++i) {
                    createTaskCount++;
                    CreateConnectionTask task = new CreateConnectionTask(true);
                    this.createSchedulerFuture = createScheduler.submit(task);
                }
            } else if (!asyncInit) {
                try {
                    // init connections
                    for (int i = 0; i < initialSize; ++i) {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount] = holder;
                        incrementPoolingCount();
                    }

                    if (poolingCount > 0) {
                        poolingPeak = poolingCount;
                        poolingPeakTime = System.currentTimeMillis();
                    }
                } catch (SQLException ex) {
                    LOG.error("init datasource error, url: " + this.getUrl(), ex);
                    connectError = ex;
                }
            }

            createAndLogThread();
            createAndStartCreatorThread();
            createAndStartDestroyThread();

            initedLatch.await();
            init = true;

            initedTime = new Date();
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        createTaskCount++;
                        CreateConnectionTask task = new CreateConnectionTask(true);
                        this.createSchedulerFuture = createScheduler.submit(task);
                    }
                } else {
                    this.emptySignal();
                }
            }

        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (RuntimeException e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (Error e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;

        } finally {
            inited = true;
            lock.unlock();

            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }
    }

init方法會(huì)初始化id為1的連接池,核心poolingCount 默認(rèn)為2,connections、evictConnections 、keepAliveConnections 的大小為maxActive的值。然后創(chuàng)建CreateConnectionThread 、LogStatsThread、DestroyConnectionThread三個(gè)線(xiàn)程,都是以守護(hù)線(xiàn)程的方式啟動(dòng)的。

PhysicalConnectionInfo pyConnectInfo = this.createPhysicalConnection();

會(huì)創(chuàng)建一個(gè)條數(shù)據(jù)庫(kù)連接
可以觀察PERFORMANCE_SCHEMA.threads
會(huì)發(fā)現(xiàn)新增了兩條記錄


image.png

再觀察一下java線(xiàn)程池情況

java -classpath "sa-jdi.jar" sun.jvm.hotspot.HSDB

使用HSDB查看創(chuàng)建情況


image.png

CreateConnectionThread

          while(true) {
                PhysicalConnectionInfo connection;
                label338: {
                    while(true) {
                        try {
                            DruidDataSource.this.lock.lockInterruptibly();
                        } catch (InterruptedException var33) {
                            break;
                        }

                        long discardCount = DruidDataSource.this.discardCount;
                        boolean discardChanged = discardCount - lastDiscardCount > 0L;
                        lastDiscardCount = discardCount;

                        try {
                            boolean emptyWait = true;
                            if (DruidDataSource.this.createError != null && DruidDataSource.this.poolingCount == 0 && !discardChanged) {
                                emptyWait = false;
                            }

                            if (emptyWait && DruidDataSource.this.asyncInit && DruidDataSource.this.createCount < (long)DruidDataSource.this.initialSize) {
                                emptyWait = false;
                            }

                            if (emptyWait) {
                                if (DruidDataSource.this.poolingCount >= DruidDataSource.this.notEmptyWaitThreadCount && (!DruidDataSource.this.keepAlive || DruidDataSource.this.activeCount + DruidDataSource.this.poolingCount >= DruidDataSource.this.minIdle)) {
                                    DruidDataSource.this.empty.await();
                                }

                                if (DruidDataSource.this.activeCount + DruidDataSource.this.poolingCount >= DruidDataSource.this.maxActive) {
                                    DruidDataSource.this.empty.await();
                                    continue;
                                }
                            }
                        } catch (InterruptedException var31) {
                            DruidDataSource.this.lastCreateError = var31;
                            DruidDataSource.this.lastErrorTimeMillis = System.currentTimeMillis();
                            if (!DruidDataSource.this.closing) {
                                DruidDataSource.LOG.error("create connection Thread Interrupted, url: " + DruidDataSource.this.jdbcUrl, var31);
                            }
                            break;
                        } finally {
                            DruidDataSource.this.lock.unlock();
                        }

                        connection = null;

                        try {
                            connection = DruidDataSource.this.createPhysicalConnection();
                            break label338;
                        } catch (SQLException var28) {
                            DruidDataSource.LOG.error("create connection SQLException, url: " + DruidDataSource.this.jdbcUrl + ", errorCode " + var28.getErrorCode() + ", state " + var28.getSQLState(), var28);
                            ++errorCount;
                            if (errorCount <= DruidDataSource.this.connectionErrorRetryAttempts || DruidDataSource.this.timeBetweenConnectErrorMillis <= 0L) {
                                break label338;
                            }

                            DruidDataSource.this.setFailContinuous(true);
                            if (DruidDataSource.this.failFast) {
                                DruidDataSource.this.lock.lock();

                                try {
                                    DruidDataSource.this.notEmpty.signalAll();
                                } finally {
                                    DruidDataSource.this.lock.unlock();
                                }
                            }

                            if (!DruidDataSource.this.breakAfterAcquireFailure) {
                                try {
                                    Thread.sleep(DruidDataSource.this.timeBetweenConnectErrorMillis);
                                    break label338;
                                } catch (InterruptedException var27) {
                                }
                            }
                            break;
                        } catch (RuntimeException var29) {
                            DruidDataSource.LOG.error("create connection RuntimeException", var29);
                            DruidDataSource.this.setFailContinuous(true);
                        } catch (Error var30) {
                            DruidDataSource.LOG.error("create connection Error", var30);
                            DruidDataSource.this.setFailContinuous(true);
                            break;
                        }
                    }

                    return;
                }

                if (connection != null) {
                    boolean result = DruidDataSource.this.put(connection);
                    if (!result) {
                        JdbcUtils.close(connection.getPhysicalConnection());
                        DruidDataSource.LOG.info("put physical connection to pool failed.");
                    }

                    errorCount = 0;
                }
            }

該線(xiàn)程負(fù)責(zé)創(chuàng)建數(shù)據(jù)庫(kù)連接,如果有線(xiàn)程在等待連接的話(huà)
DestroyConnectionThread

        public void run() {
            initedLatch.countDown();

            for (;;) {
                // 從前面開(kāi)始刪除
                try {
                    if (closed) {
                        break;
                    }

                    if (timeBetweenEvictionRunsMillis > 0) {
                        Thread.sleep(timeBetweenEvictionRunsMillis);
                    } else {
                        Thread.sleep(1000); //
                    }

                    if (Thread.interrupted()) {
                        break;
                    }

                    destroyTask.run();
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
  public class DestroyTask implements Runnable {

        @Override
        public void run() {
            shrink(true, keepAlive);

            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }
public void shrink(boolean checkTime, boolean keepAlive) {
    // 回收超過(guò)一定時(shí)間不活躍
}

持有連接數(shù)=核心連接數(shù)+活躍的連接數(shù)

int allCount = this.poolingCount + this.activeCount;

至此,Druid連接池初始化基本上有了大致的了解,下面從獲取連接的方法入手,查看下線(xiàn)程獲取數(shù)據(jù)庫(kù)連接的過(guò)程。
DruidPooledConnection

    public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;

        DruidPooledConnection poolableConnection;
        while(true) {
            while(true) {
                try {
                    poolableConnection = this.getConnectionInternal(maxWaitMillis);
                    break;
                } catch (GetConnectionTimeoutException var19) {
                    // 超時(shí)獲取不到poolableConnection,異常處理
                }
            }

            if (this.testOnBorrow) {
                // 調(diào)用ping方法測(cè)試連接是否可用
                boolean validate = this.testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if (validate) {
                    break;
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip not validate connection.");
                }

                Connection realConnection = poolableConnection.conn;
                this.discardConnection(realConnection);
            } else {
                Connection realConnection = poolableConnection.conn;
                if (poolableConnection.conn.isClosed()) {
                    this.discardConnection((Connection)null);
                } else {
                    if (!this.testWhileIdle) {
                        break;
                    }

                    long currentTimeMillis = System.currentTimeMillis();
                    long lastActiveTimeMillis = poolableConnection.holder.lastActiveTimeMillis;
                    long idleMillis = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
                    if (timeBetweenEvictionRunsMillis <= 0L) {
                        timeBetweenEvictionRunsMillis = 60000L;
                    }

                    if (idleMillis < timeBetweenEvictionRunsMillis && idleMillis >= 0L) {
                        break;
                    }

                    boolean validate = this.testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                    if (validate) {
                        break;
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }

                    this.discardConnection(realConnection);
                }
            }
        }

        if (this.removeAbandoned) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            poolableConnection.connectStackTrace = stackTrace;
            poolableConnection.setConnectedTimeNano();
            poolableConnection.traceEnable = true;
            this.activeConnectionLock.lock();

            try {
                this.activeConnections.put(poolableConnection, PRESENT);
            } finally {
                this.activeConnectionLock.unlock();
            }
        }

        if (!this.defaultAutoCommit) {
            poolableConnection.setAutoCommit(false);
        }

        return poolableConnection;
    }

通過(guò)調(diào)用poolableConnection = this.getConnectionInternal(maxWaitMillis);獲取連接

    private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
        if (this.closed) {
            connectErrorCountUpdater.incrementAndGet(this);
            throw new DataSourceClosedException("dataSource already closed at " + new Date(this.closeTimeMillis));
        } else if (!this.enable) {
            connectErrorCountUpdater.incrementAndGet(this);
            throw new DataSourceDisableException();
        } else {
            long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
            int maxWaitThreadCount = this.maxWaitThreadCount;
            boolean createDirect = false;

            DruidConnectionHolder holder;
            while(true) {
                if (createDirect) {
                        //創(chuàng)建新的連接,代碼略過(guò)
                        this.lock.lock();

                        // 更新?tīng)顟B(tài)字段,代碼略過(guò)
                    }
                }

                // 中斷方式獲取鎖,代碼略過(guò)

                try {
                    if (最大等待線(xiàn)程數(shù)是否超出) {
                        connectErrorCountUpdater.incrementAndGet(this);
                        throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + this.lock.getQueueLength());
                    }

                    if (失敗次數(shù)是否超出最大失敗次數(shù)) {
                        // 輸出日志,代碼略過(guò)

                        throw new SQLException(errorMsg, this.lastFatalError);
                    }

                    ++this.connectCount;
                    if (this.createScheduler != null && this.poolingCount == 0 && this.activeCount < this.maxActive && creatingCountUpdater.get(this) == 0 && this.createScheduler instanceof ScheduledThreadPoolExecutor) {
                        // 該條件不會(huì)被滿(mǎn)足,因?yàn)閠his.createScheduler默認(rèn)為空
                        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)this.createScheduler;
                        if (executor.getQueue().size() > 0) {
                            createDirect = true;
                            continue;
                        }
                    }
                    // 獲取是否有超時(shí)時(shí)間,這里會(huì)轉(zhuǎn)換成納秒
                    if (maxWait > 0L) {
                        holder = this.pollLast(nanos);
                    } else {
                        holder = this.takeLast();
                    }

                    if (holder != null) {
                        ++this.activeCount;
                        if (this.activeCount > this.activePeak) {
                            this.activePeak = this.activeCount;
                            this.activePeakTime = System.currentTimeMillis();
                        }
                    }
                    break;
                } catch (InterruptedException var24) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException(var24.getMessage(), var24);
                } catch (SQLException var25) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw var25;
                } finally {
                    this.lock.unlock();
                }
            }
            if (holder == null) {
            // 如果holder為空的,異常處理,這里略過(guò)
            } else {
                holder.incrementUseCount();
                DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
                return poolalbeConnection;
            }
        }
    }

holder = this.pollLast(nanos); 會(huì)調(diào)用這個(gè)獲取DruidConnectionHolder

    private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
        long estimate = nanos;

        while(this.poolingCount == 0) {
            this.emptySignal();
            if (this.failFast && this.failContinuous.get()) {
                throw new DataSourceNotAvailableException(this.createError);
            }

            if (estimate <= 0L) {
                waitNanosLocal.set(nanos - estimate);
                return null;
            }

            ++this.notEmptyWaitThreadCount;
            if (this.notEmptyWaitThreadCount > this.notEmptyWaitThreadPeak) {
                this.notEmptyWaitThreadPeak = this.notEmptyWaitThreadCount;
            }

            try {
                long startEstimate = estimate;
                estimate = this.notEmpty.awaitNanos(estimate);
                ++this.notEmptyWaitCount;
                this.notEmptyWaitNanos += startEstimate - estimate;
                if (!this.enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new DataSourceDisableException();
                }
            } catch (InterruptedException var10) {
                this.notEmpty.signal();
                ++this.notEmptySignalCount;
                throw var10;
            } finally {
                --this.notEmptyWaitThreadCount;
            }

            if (this.poolingCount != 0) {
                break;
            }

            if (estimate <= 0L) {
                waitNanosLocal.set(nanos - estimate);
                return null;
            }
        }

        this.decrementPoolingCount();
        DruidConnectionHolder last = this.connections[this.poolingCount];
        this.connections[this.poolingCount] = null;
        long waitNanos = nanos - estimate;
        last.setLastNotEmptyWaitNanos(waitNanos);
        return last;
    }

自旋獲取連接,這個(gè)地方可用發(fā)現(xiàn)并沒(méi)有等待隊(duì)列。
再來(lái)看看連接數(shù)情況
初始化完成連接情況,兩個(gè)sleep的連接


初始化完成

這里打隔點(diǎn),模擬有三個(gè)線(xiàn)程需要同時(shí)獲取連接的情況,可以發(fā)現(xiàn),前面兩個(gè)線(xiàn)程獲取到連接后,第三個(gè)線(xiàn)程獲取的時(shí)候會(huì)先超時(shí)獲取不到,等待CreateConnectionThread 創(chuàng)建了新的連接后返回。


image.png

總結(jié),閱讀到這里開(kāi)頭的四個(gè)問(wèn)題都有答案了。
1.該連接池是否使用了隊(duì)列,使用的是有界還是無(wú)界隊(duì)列?
不使用隊(duì)列,就是忙等待,while循環(huán)。但是超時(shí)會(huì)返回。
2.拒絕策略是什么樣的?
超過(guò)了最大等待線(xiàn)程數(shù)就會(huì)直接拋出SQLException(前提設(shè)置了maxWaitThreadCount)
3.如何被Spring引用的?
創(chuàng)建了一個(gè)DataSource Bean,使用DruidDataSourceWrapper包裝類(lèi)。
4.Spring如何通過(guò)該連接池獲取數(shù)據(jù)庫(kù)連接的?
如果有可以用的連接,直接返回,否則忙等待創(chuàng)建線(xiàn)程創(chuàng)建新的連接,獲取到連接返回,獲取不到則超時(shí)處理。

補(bǔ)充問(wèn)題:
用完了連接,是如何將連接歸還給DruidDataSource的?
JPA的話(huà)會(huì)調(diào)用
NonContextualJdbcConnectionAccess

  public void releaseConnection(Connection connection) throws SQLException {
        try {
            this.listener.jdbcConnectionReleaseStart();
            this.connectionProvider.closeConnection(connection);
        } finally {
            this.listener.jdbcConnectionReleaseEnd();
        }

    }

this.connectionProvider.dataSource=DruidDataSourceWrapper(最開(kāi)始定義的bean)
DatasourceConnectionProviderImpl

    public void closeConnection(Connection connection) throws SQLException {
        connection.close();
    }

DruidPooledConnection

   public void close() throws SQLException {
        if (!this.disable) {
            DruidConnectionHolder holder = this.holder;
            if (holder == null) {
                if (this.dupCloseLogEnable) {
                    LOG.error("dup close");
                }

            } else {
                DruidAbstractDataSource dataSource = holder.getDataSource();
                boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
                if (!isSameThread) {
                    dataSource.setAsyncCloseConnectionEnable(true);
                }

                if (dataSource.isAsyncCloseConnectionEnable()) {
                    this.syncClose();
                } else {
                    Iterator var4 = holder.getConnectionEventListeners().iterator();

                    while(var4.hasNext()) {
                        ConnectionEventListener listener = (ConnectionEventListener)var4.next();
                        listener.connectionClosed(new ConnectionEvent(this));
                    }

                    List<Filter> filters = dataSource.getProxyFilters();
                    if (filters.size() > 0) {
                        FilterChainImpl filterChain = new FilterChainImpl(dataSource);
                        filterChain.dataSource_recycle(this);
                    } else {
                        this.recycle();
                    }

                    this.disable = true;
                }
            }
        }
    }

經(jīng)過(guò)一系列方法,最終會(huì)調(diào)用DruidDataSource的recycle方法回收資源
DruidDataSource

    protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
        // 回收連接資源
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容