Apache之HttpClient

本文基于下述版本進行分析

<dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.4.1</version>
</dependency>

下述所有代碼進行了必要的刪減

發(fā)送請求流程

當(dāng)我們要訪問一個接口執(zhí)行HttpClientexecute()的方法時,會運用責(zé)任鏈模式走到MainClientExecexecute()中;

public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        //1. 從池中獲取連接
        Object userToken = context.getUserToken();
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        
        final RequestConfig config = context.getRequestConfig();
        final HttpClientConnection managedConn;
        
        //ConnectionRequestTimeout配置用在這里
        final int timeout = config.getConnectionRequestTimeout();
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);

        //第二個配置:檢查connection的有效性
        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            for (int execCount = 1;; execCount++) {
                if (!managedConn.isOpen()) {//沒有綁定socket
                    //上面已經(jīng)獲取了connection,這里就要把這個connection和一個socket綁定了
                    this.log.debug("Opening connection " + route);
                   //這里會創(chuàng)建tcp/ip連接,并把socket綁定到managedConn上
                   establishRoute(proxyAuthState, managedConn, route, request, context);
                }
                //在真正和服務(wù)器交互之前,還要設(shè)置好socketTimeOut
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }
                
                //2. 真正發(fā)送數(shù)據(jù)
                response = requestExecutor.execute(request, managedConn, context);

                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    //這個會影響releaseConnection()的行為
                    connHolder.markReusable(); 
                } else {
                    connHolder.markNonReusable();
                }          
            }
            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (...) {
           ...
        } 
}

大概總結(jié)一下上述流程:

  1. connectionPool中獲取connection(還有各種驗證);
  2. 使用這個connection發(fā)送數(shù)據(jù);
  3. 根據(jù)返回的response,設(shè)置一些參數(shù),比如keepAlive;
  4. 釋放這個連接并返回response中的數(shù)據(jù);

池中獲取連接

池化技術(shù)相信很多人都使用過,比如ThreadPool,JDBCPool(DataSource)等。接下來看一下HttpConnectionPool的工作原理。

//  PoolingHttpClientConnectionManager.java
public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        //這里是真正干活的
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
 }

lease()的返回值實際是自定義的一個Future,其實現(xiàn)的get()中調(diào)用了getPoolEntryBlocking(),在研究具體的代碼之前,需要先說明一下代碼中幾個集合的作用,便于理解,如下圖:

HttpClientPool.png

HttpClientPool(姑且稱之為HPool吧)中維護了多個pool(specific pool,姑且稱之為UPool吧), 一個url會對應(yīng)一個pool,不同顏色的connection可以理解為訪問不同的url創(chuàng)建的;其中的collection的含義如下:

  • leased: 總的借出去的connection;
  • available:可用的connection;
  • connection pool:url對應(yīng)的pool;
  • pending:等待的線程隊列;
    在程序中,leased和available實際的和為allocatedCount。
SpecificPool.png

UPool的結(jié)構(gòu)和HPool基本一致,只是這里面的connection才是真正被使用的,每次當(dāng)有線程來獲取connection的時候,會到一個具體的UPool中來查找connection。HPool中維護的leased、available和pending是用來統(tǒng)計的;

當(dāng)連接池里的connection超出限制時,當(dāng)前線程就會被放入pending中等待被喚醒;

了解了上述的設(shè)計,讀下面的代碼就輕而易舉了。

 private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            //定位 UPool
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {//死循環(huán)-1
                for (;;) { //死循環(huán)-2:循環(huán)直到從UPool中獲取一個沒有過期的connection
                    entry = pool.getFree(state);
                    /////////////////////////////////////////////////////////// getFree()方法體
                    public E getFree(final Object state) {
                            if (!this.available.isEmpty()) {//有可用的connection
                                  if (state != null) { //state與認(rèn)證有關(guān),先忽略
                                        final Iterator<E> it = this.available.iterator();
                                        while (it.hasNext()) {
                                              final E entry = it.next();
                                              if (state.equals(entry.getState())) {
                                                    it.remove();
                                                    this.leased.add(entry);
                                                    return entry;
                                              }
                                         }
                                   }
                                  final Iterator<E> it = this.available.iterator();
                                  while (it.hasNext()) {
                                        final E entry = it.next();
                                        if (entry.getState() == null) {
                                              it.remove(); //UPool的available中刪掉這個connection
                                              this.leased.add(entry);//UPool的leased中添加這個connection
                                              return entry;
                                        }
                                   }
                           }
                            //走到這里說明沒有可用的connection,下文一定會創(chuàng)建
                            return null;
                      }
                    ///////////////////////////////////////////////////////////
                    if (entry == null) {//沒有借到connection
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        //這個connection關(guān)閉了(這里是底層socket的關(guān)閉),也把HPool中available和leased中保存的刪掉,池里徹底沒有這個connection了
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }//死循環(huán)-2結(jié)束
                if (entry != null) {//上面借到了connection
                    //HPool中做相應(yīng)的處理以作統(tǒng)計用
                    this.available.remove(entry);
                    this.leased.add(entry);
                    //鉤子方法
                    onReuse(entry);
                    return entry;
                }

                // 走到這里說明沒有獲取到有效的connection,需要創(chuàng)建
                // 創(chuàng)建前先壓縮一下UPool,把暫時空閑的connection刪掉,騰出地兒
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
                //UPool中的connection量沒到最大值才能新建
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    //也需要滿足HPool對connection數(shù)量總的限制
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // HPool中,總的可用的connection很多,幾乎沒有使用
                        // 為了讓當(dāng)前的url可以新創(chuàng)建一個connection,隨機刪除一個可用的connection
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        //已經(jīng)刪除了一個沒有使用的connection把地兒挪了出來,接著創(chuàng)建當(dāng)前url的connection
                        final C conn = this.connFactory.create(route);
                        //放入HPool和UPool的leased中
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
                //走到這里說明pool已經(jīng)滿了,不能創(chuàng)建新的connection
                boolean success = false;
                //一個線程對應(yīng)一個future
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    //放入pending隊列中
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        //ConnectionRequestTimeout的設(shè)置最終會在這里起作用
                        //當(dāng)前線程park了直到deadline這個時間點
                        //1. 線程一直park到deadline,返回false;
                        //2. 還沒到deadline,被signal了,返回true;
                        //這是一個相對積極的信號,說明可能存在可用的connection。
                        //那么誰來調(diào)用signal呢?有兩種可能:a. releaseConnection();b. 當(dāng)前的獲取操作被cancel()
                        //3. 被中斷了,success也是false,直接走入finally;
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    // park被signal或睡到自然醒后,判斷當(dāng)前獲取connection的操作是否被cancel
                    // 這里的cancel和FutureTask的cancel還不太一樣。FutureTask的cancel是直接對線程進行interrupt(),這里只是對一個變量的值進行了改變;
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
               
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    //這里說明這個線程在deadline之前被中斷了,或者等到醒來都沒有新的connection可用
                    break;//跳出死循環(huán)-1
                }
            } //死循環(huán)-1 結(jié)束
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

釋放連接

在上文中提到,在response返回給客戶端之前會釋放連接,接下來我們看一下釋放的過程。

// ConnectionHolder.java
public void releaseConnection() {
        //一個connection只能釋放一次,因此要加鎖
        synchronized (this.managedConn) {
            if (this.released) {
                return;
            }
            this.released = true;
            //上文說過,reuseable會影響釋放的過程
            if (this.reusable) {
                //可重復(fù)使用的connection,其實就是把Pool中l(wèi)eased里的connection挪到available中的過程
                //response的http頭可能是這個樣子:   Keep-Alive: timeout=5, max=100
                //這里的validDuration實際上是服務(wù)端返回的keep-alive的時間,若沒有,就為-1
                this.manager.releaseConnection(this.managedConn,
                        this.state, this.validDuration, this.tunit);
            } else {
                try {  
                    //這里是真正的關(guān)閉,意味著socket也已經(jīng)關(guān)閉
                    this.managedConn.close();
                    log.debug("Connection discarded");
                } catch (final IOException ex) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(ex.getMessage(), ex);
                    }
                } finally {
                    this.manager.releaseConnection(
                            this.managedConn, null, 0, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

池中釋放就是把leased的connection挪到available中,但除了這個動作,還要有別的地方需要注意。available中可用connection并不是永遠都有效的,因為tcp/ip協(xié)議是全雙工方式工作,一個connection是否有效,要根據(jù)雙方的時時狀態(tài)來更新connection的生命周期。實際工作中,客戶端一般要隨服務(wù)端的狀態(tài)來改變。比如服務(wù)端返回值中顯示keepalive為10s,那么當(dāng)這個connection在available中的存活時間也不能超過10s,否則就有問題。

// PoolingHttpClientConnectionManager.java
public void releaseConnection(
            final HttpClientConnection managedConn,
            final Object state,
            final long keepalive, final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            final ManagedHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
                    entry.setState(state);
                    // 存活的最后時間點是 放入available的那一刻向后推keepalive;
                    // 當(dāng)然,如果這個時間點在我們初始化時設(shè)置的最后時間點之后,還是以設(shè)置的值為準(zhǔn)
                    entry.updateExpiry(keepalive, effectiveUnit);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (keepalive > 0) {
                            s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                }
            } finally {
                // 這里就是connection從leased到available的挪動
                // HPool和UPool都要進行挪動的操作并喚醒等待的線程
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
}

關(guān)閉連接

除了不能重復(fù)使用的connection需要關(guān)閉外,一些超時無用的connection也要關(guān)閉

// 這個方法可以傳入?yún)?shù),可以由業(yè)務(wù)方根據(jù)實際情況設(shè)定值
public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
        }
        this.pool.closeIdle(idleTimeout, tunit);
}

 // 這個方法沒有參數(shù),那么哪些算是expired的呢?
 // 由上節(jié)我們知道,在釋放連接的時候,會根據(jù)服務(wù)端的keepalive(沒有的話,也有默認(rèn)值) 設(shè)置expired的deadline;
public void closeExpiredConnections() {
        this.log.debug("Closing expired connections");
        this.pool.closeExpired();
}

idle: 從connection創(chuàng)建的時間點開始的idleTimeout時間范圍,是一個絕對的時間范圍;比如一個connection是10:00創(chuàng)建,idleTimeout設(shè)為60s,那么10:01以后這個connection就得關(guān)閉;

expire:expire需要一個deadline,這個deadline每次release的時候都會更新,值為release的時間點 + keepalive(或validityDeadline),是一個相對的時間范圍;比如一個connection最后一次release的時間點是10:00,keepalive=6min,validityDeadline=5min,那么deadline=10:05,如果這個connection再沒有使用過,則過了10:05,就算是過期的connection,應(yīng)該被關(guān)閉; 如果在10:04的時候又被借出去使用了,release的時間是10:10,keepalive還是為6min,那么過了10:15,這個connection就應(yīng)關(guān)閉了;

很多情況response的keepalive和validityDeadline都沒有值,那么這個時候deadline就是Long.MAX_VALUE了,這個時候只能通過idle的值來關(guān)閉不需要的connection了;

下面再說明一下幾個時間點

// 首次創(chuàng)建connection
public PoolEntry(final String id, final T route, final C conn,
            final long timeToLive, final TimeUnit tunit) {
        super();
        Args.notNull(route, "Route");
        Args.notNull(conn, "Connection");
        Args.notNull(tunit, "Time unit");
        this.id = id;
        this.route = route;
        this.conn = conn;
        this.created = System.currentTimeMillis();
        this.updated = this.created; //這個就是connection被創(chuàng)建的時間,會用于idle的判斷
        if (timeToLive > 0) { //這個值通過HttpClientBuilder.setConnectionTimeToLive()傳入
            final long deadline = this.created + tunit.toMillis(timeToLive);
            // If the above overflows then default to Long.MAX_VALUE
            this.validityDeadline = deadline > 0 ? deadline : Long.MAX_VALUE;
        } else {
            this.validityDeadline = Long.MAX_VALUE;
        }
        this.expiry = this.validityDeadline; //默認(rèn)的expire deadline
    }

上述兩種關(guān)閉connection的方式都是從時間入手,到了一個時間點,過期的connection都干掉?,F(xiàn)在假如把connection的idleTimeout設(shè)為10天,expired的deadline沒有設(shè)置,即為Long.MAX_VALUE,這個時候池里面的connection會有什么問題?服務(wù)器端的connection不會保留10天這么久,很快就會斷掉,那么此時池里的connection實際上就是半雙工狀態(tài)了,這個不正常的connection會被客戶端獲取到。為了解決這個問題,引入了validateAfterInactivity(默認(rèn)5s)

for (;;) {
    final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
    //池中獲取的connection要驗證
    if (validateAfterInactivity > 0)  {
        //比如10:00創(chuàng)建的connection,那么10:05后就要驗證了
        if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
            if (!validate(leasedEntry)) {
                //validate調(diào)用的是connection的isStale()
                //////////////////////////////////////////////////////////////
                public boolean isStale() {
                    if (!isOpen()) { //沒有綁定socket 或 socket關(guān)閉
                        return true;
                    }
                  
                    try {
                        //其實socket沒讀到數(shù)據(jù)也不能說明該socket無效
                        //這里我覺得是一種較悲觀的處理,寧可錯殺一千,不可放過一個
                        final int bytesRead = fillInputBuffer(1);
                        return bytesRead < 0;
                    } catch (final SocketTimeoutException ex) {
                        //這里要注意,SocketTimeoutException不能說明這個connection無效
                        return false; //上面的if無法進入,這個connection可能有問題
                    } catch (final IOException ex) {
                        return true;
                    }
            }
                //////////////////////////////////////////////////////////////
                leasedEntry.close();
                release(leasedEntry, false);
                continue;
            }
        }
    }
    entryRef.set(leasedEntry);
    done.set(true);
    onLease(leasedEntry);
    if (callback != null) {
        callback.completed(leasedEntry);
    }
    return leasedEntry;
}

最后,本文有點長,如果讀者覺得有哪里不對的地方,歡迎批評指正。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • HttpClient整理資料 1、httpClient HttpClient是Apache中的一個開源的項目。它實...
    小白豆豆5閱讀 30,532評論 5 38
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評論 19 139
  • 第一章 Nginx簡介 Nginx是什么 沒有聽過Nginx?那么一定聽過它的“同行”Apache吧!Ngi...
    JokerW閱讀 33,022評論 24 1,002
  • 昨天和朋友一起去看了郭敬明和落落的《悲傷逆流成河》的電影版,說出來你們可能不信,電影的最后幾幕情景,我哭...
    月羽九九閱讀 521評論 1 2
  • 昨天晚上和我的一個妹聊天,得知她最近跑回我出生的那個小縣城實習(xí)了。兩個多月前,她還在貴陽實習(xí),無意中問了一下她畢業(yè)...
    326公路閱讀 282評論 0 0

友情鏈接更多精彩內(nèi)容