OkHttp設(shè)計及源碼解析

Okhttp設(shè)計與源碼解析

本文針對OkHttp的設(shè)計及源碼作一下解析:

**Okhttp如何做到請求的控制?
Okhttp核心設(shè)計Interceptor
Okhttp如何支持https? **

1.核心類庫、簡單使用、主要流程


(此圖片來自網(wǎng)絡(luò),作者不詳)

從上述UML類圖可以看出,Okhttp在設(shè)計時采用的門面模式,將整個系統(tǒng)的復(fù)雜性給隱藏起來,將子系統(tǒng)接口通過一個客戶端OkHttpClient統(tǒng)一暴露出來,使用戶與內(nèi)部系統(tǒng)之間達(dá)到解構(gòu)的目的,這種方式在很多開源框架上都有所體現(xiàn),像Glide、Picaso、ImageLoader、EventBus之類的,都有所體現(xiàn)。

Okhttp使用上和HttpClient基本一致,沒啥好說的。下面是基本的get異步請求:

OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder().url("http://www.baidu.com").build();
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.i("ggg",e.getMessage());
        }

        @Override
        public void onResponse(Call call,Response response) throws IOException {
            // NOT UI Thread
            if (response.isSuccessful()) {
                Log.i("ggg",response.code()+"");
                Log.i("ggg",response.headers()+"");
                Log.i("ggg",response.body().string());
            }
            response.close();
        }
    });

有兩點是需要注意的:一是Okhttp建議使用一個OkHttpClient,二是Reponse使用完需要關(guān)閉。
Okhttp的基本加載流程如下:

上圖中的攔截器即Interceptor,后面會著重講。

2.請求的分發(fā)處理,如何做到對請求數(shù)量控制前提下做到高并發(fā)、低阻塞?

想到并發(fā)必然想到多線程,繼而就自然想到線程池,通常我們在使用線程池的時候,通過ThreadPoolExecutor構(gòu)造方法:

 public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

來創(chuàng)建線程池,那么要想做到高并發(fā)低阻塞,就意味著maximumPoolSize的值需要比較大,Okhttp在構(gòu)造線程池時,使用了如下參數(shù):

    public synchronized ExecutorService executorService() {
    if (executorService == null) {
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

可以看到,maximumPoolSize基本沒有限制大小,工作隊列上,采用了SynchronousQueue,這是一個沒有容量的隊列。當(dāng)執(zhí)行enqueue的時候,事實上,Okhttp并沒有直接將任務(wù)交給線程池來執(zhí)行,而是通過講任務(wù)調(diào)度交給了Dispatcher這個類似代理服務(wù)器,來進行調(diào)度,這樣才做到了在控制同一個host請求數(shù)量的前提下,達(dá)到高并發(fā)低阻塞,我們來看看enqueue的實現(xiàn):

@Override
public void enqueue(Callback responseCallback) {
    synchronized (this) {
        if (executed)
            throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

當(dāng)調(diào)用Realcall的enqueue時,會重新創(chuàng)建一個AsyncCall,AsyncCall是RealCall的一個內(nèi)部類,因此可以看作是RealCall的一個超集。然后交給了Dispatcher來進行調(diào)度:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
        runningAsyncCalls.add(call);
        executorService().execute(call);
    } else {
        readyAsyncCalls.add(call);
    }
}

這段代碼邏輯很清晰,如果正在執(zhí)行請求與同一個Host請求沒有達(dá)到上限,那么將該次請求加入到正在執(zhí)行集合中,并交由線程池立刻執(zhí)行,否則講該次請求加入到等待隊列中,然后我們繼續(xù)看AsyncCall的執(zhí)行代碼:

        @Override
    protected void execute() {
        //這個標(biāo)記用于確?;卣{(diào)只得到一次執(zhí)行
        boolean signalledCallback = false;
        try {
            //這句代碼會通過攔截鏈來獲取響應(yīng)
            Response response = getResponseWithInterceptorChain();
            //請求被取消
            if (retryAndFollowUpInterceptor.isCanceled()) {
                signalledCallback = true;
                responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
                signalledCallback = true;
                responseCallback.onResponse(RealCall.this, response);
            }
        } catch (IOException e) {
            if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
                responseCallback.onFailure(RealCall.this, e);
            }
        } finally {
            //無論執(zhí)行過程發(fā)生了什么,通知Dispatcher該次請求已執(zhí)行完畢
            client.dispatcher().finished(this);
        }
    }

再看看finished方法:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
        if (!calls.remove(call))
            throw new AssertionError("Call wasn't in-flight!");
        if (promoteCalls)
            //異步請求會執(zhí)行這里
            promoteCalls();
        runningCallsCount = runningCallsCount();
        idleCallback = this.idleCallback;
    }
    //如果沒有正在執(zhí)行的請求并且設(shè)置了空閑回調(diào),那么會執(zhí)行此分發(fā)器的空閑回掉
    if (runningCallsCount == 0 && idleCallback != null) {
        idleCallback.run();
    }
}

如果是異步的請求,那么當(dāng)這個請求走到finally中的finished方法時,會調(diào)用promoteCalls:

    private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests)
        return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty())
        return; // No ready calls to promote.
    //注意這里的等待請求是一個先進先出的隊列結(jié)構(gòu)
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();) {
        AsyncCall call = i.next();
        //如果主機請求限制沒到上限就會取出執(zhí)行
        if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
        }
        //一直遍歷到達(dá)到總數(shù)上限為止
        if (runningAsyncCalls.size() >= maxRequests)
            return; // Reached max capacity.
    }
}

也就是說,當(dāng)一個請求結(jié)束時,會馬上使用該線程繼續(xù)執(zhí)行等待中的請求,避免了線程的閑置與浪費,這里,finally的使用是一個亮點,這在AsyncCall的父類NamedRunable中,同樣有所體現(xiàn):

public abstract class NamedRunnable implements Runnable {
    protected final String name;

    public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
    }

    @Override
    public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
            execute();
        } finally {
            Thread.currentThread().setName(oldName);
        }
    }

    protected abstract void execute();

}

這個類實現(xiàn)了,在該線程執(zhí)行該Runable時,線程名字與該Runable一致。

3.Okhttp的核心設(shè)計,Interceptor

不管是同步還是異步的請求,最終都會走到方法getResponseWithInterceptorChain來獲得Reponse,從字面意思,這個方法可以理解為根據(jù)攔截鏈獲得Reponse,后面我們會說到怎么自定義攔截器:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //如果使用時我們沒有自定義過攔截器,那么這里addAll的集合其實是空的
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        //如果使用時我們沒有自定義過攔截器,那么這里addAll的集合其實是空的
        interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //這里構(gòu)造方法接收五個參數(shù),目前除了攔截器,當(dāng)前攔截器位置,初始的請求,其它為null。
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest);
    return chain.proceed(originalRequest);
}

這里代碼最終創(chuàng)建了一個RealInterceptorChain,即攔截鏈,并調(diào)用了其proceed方法,來獲取Reponse,繼續(xù)跟進去:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
        RealConnection connection) throws IOException {
    if (index >= interceptors.size())
        throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
                + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
        throw new IllegalStateException(
                "network interceptor " + interceptor + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
        throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
}

這段代碼比較長,但是核心代碼其實就三行:

RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

創(chuàng)建一個攔截鏈,將其index+1,并拿到當(dāng)前的攔截器,然后調(diào)用攔截器的intercept方法,根據(jù)我們前面的getResponseWithInterceptorChain方法可以知道,這第一個應(yīng)該是retryAndFollowUpInterceptor(前提是沒有自定義攔截器),即用來作重試與重定向處理的攔截器:

@Override
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
            callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
        if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
        }

        Response response = null;
        boolean releaseConnection = true;
        try {
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            //如果上面這句代碼執(zhí)行時沒有發(fā)生異常,那么標(biāo)記置為false,即連接是正??捎玫?            releaseConnection = false;
        } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), false, request)) {
                //如果無法恢復(fù)連接 直接拋異常結(jié)束循環(huán)
                throw e.getLastConnectException();
            }
            //可恢復(fù),標(biāo)記置為false,即連接是正??捎玫?,不需要釋放
            releaseConnection = false;
            //循環(huán)繼續(xù),即重試
            continue;
        } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request))
                throw e;
            releaseConnection = false;
            continue;
        } finally {
            // We're throwing an unchecked exception. Release any resources.
            if (releaseConnection) {
                //最終如果連接不可用,那么需要釋放掉,因為catch塊拋出了異常,這里需要在finally處理
                streamAllocation.streamFailed(null);
                streamAllocation.release();
            }
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if (priorResponse != null) {
            //如果有重定向之前的reponse
            response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build())
                    .build();
        }

        Request followUp = followUpRequest(response);

        if (followUp == null) {
            //如果沒有重定向請求
            if (!forWebSocket) {
                streamAllocation.release();
            }
            //大部分情況下,會以返回response結(jié)束
            return response;
        }

        closeQuietly(response.body());
        //重定向
        if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }

        if (followUp.body() instanceof UnrepeatableRequestBody) {
            streamAllocation.release();
            throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
        }

        if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();
            streamAllocation = new StreamAllocation(client.connectionPool(),
                    createAddress(followUp.url()), callStackTrace);
        } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                    + " didn't close its backing stream. Bad interceptor?");
        }
        //
        request = followUp;
        priorResponse = response;
    }
}

方法也很長,但是流程比較簡單,其中有一個StreamAllocation比較重要,但在這里我們先不講。在try語塊中,重新又回到了RealInterceptorChain的proceed方法之中,這個方法執(zhí)行完畢后,會根據(jù)結(jié)果做不同處理,如失敗、返回Reponse、重試、重定向。值得注意的是,這里的RealInterceptorChain是我們在之前new出來的,index為加1之后的,對應(yīng)的Interceptor為BridgeInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
        MediaType contentType = body.contentType();
        if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
        }

        long contentLength = body.contentLength();
        if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
        } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
        }
    }

    if (userRequest.header("Host") == null) {
        requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
        requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
        transparentGzip = true;
        requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
        requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
        requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);

    if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
        GzipSource responseBody = new GzipSource(networkResponse.body().source());
        Headers strippedHeaders = networkResponse.headers().newBuilder().removeAll("Content-Encoding")
                .removeAll("Content-Length").build();
        responseBuilder.headers(strippedHeaders);
        responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
}

代碼雖長,但是很簡明,主要是對請求的header、body與響應(yīng)的header、body做了一些處理,然后繼續(xù)調(diào)用攔截鏈的proceed方法來處理對request處理后的request,這一次,我們來到了CacheInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    //拿到該request的緩存Response
    Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;

    long now = System.currentTimeMillis();
    //CacheStrategy即決定了發(fā)網(wǎng)絡(luò)請求還是使用緩存
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    //如果沒有緩存,或者緩存無效,并且請求頭沒有設(shè)置“Cache-Control: only-if-cached”,networkRequest則不為null
    Request networkRequest = strategy.networkRequest;
    //如果緩存存在或者有效,那么cacheResponse不為null
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
        //統(tǒng)計被請求、發(fā)出網(wǎng)絡(luò)請求、使用緩存的數(shù)目
        cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
        //緩存雖存在,但是已經(jīng)失效了,關(guān)閉資源
        closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
        //直接504超時
        return new Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504)
                .message("Unsatisfiable Request (only-if-cached)").body(Util.EMPTY_RESPONSE)
                .sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
        //緩存有效,自此,整個攔截鏈調(diào)用就結(jié)束了,這里就會回到我們之前的RetryAndFollowUpInterceptor的循環(huán)中去了
        return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
    }

    Response networkResponse = null;
    try {
        //緩存無效,那么繼續(xù)走我們的攔截鏈
        networkResponse = chain.proceed(networkRequest);
    } finally {
        // If we're crashing on I/O or otherwise, don't leak the cache body.
        if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
        }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
        //如果攔截鏈繼續(xù)走執(zhí)行完返回的header有not modified,那么取緩存更新并返回
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                    .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                    .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                    .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                    .cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse))
                    .build();
            networkResponse.body().close();

            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
        } else {
            closeQuietly(cacheResponse.body());
        }
    }
    //如果是新數(shù)據(jù),做緩存處理后返回
    Response response = networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse)).build();

    if (HttpHeaders.hasBody(response)) {
        CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
        response = cacheWritingResponse(cacheRequest, response);
    }

    return response;
}

這里注釋比較詳細(xì),根據(jù)注釋就能明白整個緩存處理的流程,我們繼續(xù)走攔截鏈,當(dāng)沒有緩存或者緩存已經(jīng)失效時,上述代碼會執(zhí)行:

 networkResponse = chain.proceed(networkRequest);

繼續(xù)我們的攔截鏈,這一次,來到了ConnectInterceptor:

@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

終于有個比較短的intercept了,但其實這里代碼確是最復(fù)雜的。注意上述代碼中,從RealInterceptorChain中拿到了一個StreamAllocation,這個我們之前沒有提及,事實上,可以將它理解為一個協(xié)調(diào)器,用來協(xié)調(diào)connection、stream、call三者之間的關(guān)系,在http/2中每個connection可能對應(yīng)多個stream,而同一個call可能也對應(yīng)著多個stream。它是在第一個攔截器,即retryAndFollowUpInterceptor被初始化的,我們先看看它時如何初始化的:

   public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
}

其初始化需要一個連接池、物理地址、路由選擇器,新的類比較多,我們用到的時候再解釋每個是干嘛用的,這里我們只看看Address這個參數(shù)是怎么獲取的:

private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
        //如果是https,那么還需要初始化sslsocket、主機認(rèn)證、證書編解碼相關(guān)類
        sslSocketFactory = client.sslSocketFactory();
        hostnameVerifier = client.hostnameVerifier();
        certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory,
            hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(),
            client.protocols(), client.connectionSpecs(), client.proxySelector());
}

回到ConnectInterceptor的intercept方法:

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

這句代碼通過調(diào)用streamAllocation的newStream拿到了一個HttpCodec,HttpCodec實質(zhì)上就是一個輔助類,主要負(fù)責(zé)在Socket連接中,遵循h(huán)ttp的規(guī)范來進行流的輸入輸出,其內(nèi)部是通過OK IO這個jar包中的相關(guān)API實現(xiàn)流的讀寫操作,OK IO我們就不去詳細(xì)介紹了,內(nèi)部通過java nio相關(guān)api實現(xiàn)的。只要記住,凡是繼承sink的就是輸出流,繼承source的就是輸入流即可,HttpCodec有兩個子類,Http1Codec 與Http2Codec,分別對應(yīng)著http/1.x和http/2.x,我們來看看newStream的具體實現(xiàn):

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
        RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout,
                connectionRetryEnabled, doExtensiveHealthChecks);
        HttpCodec resultCodec = resultConnection.newCodec(client, this);

        synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
        }
    } catch (IOException e) {
        throw new RouteException(e);
    }
}

這里的關(guān)鍵代碼在于如何拿到一個RealConnection,ReConnection實質(zhì)上是一個對Socket連接的封裝類,與Sdk的HttpUrlConnection功能上是差不多的,最終獲取RealConnection的代碼走到了方法:

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
        boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
        if (released)
            throw new IllegalStateException("released");
        if (codec != null)
            throw new IllegalStateException("codec != null");
        if (canceled)
            throw new IOException("Canceled");

        // 嘗試去獲取之前已經(jīng)獲取的connection
        RealConnection allocatedConnection = this.connection;
        if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            //如果這個連接存在并且沒有新的請求直接拿來用
            return allocatedConnection;
        }

        // 嘗試從連接池中獲取連接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
            return connection;
        }

        selectedRoute = route;
    }
    //這里阻塞去獲取能用的服務(wù)器,如果遍歷完沒有可用的服務(wù)器,會拋出異常結(jié)束程序
    if (selectedRoute == null) {
        selectedRoute = routeSelector.next();
    }

    RealConnection result;
    //涉及到連接池的讀寫,加上同步
    synchronized (connectionPool) {
        if (canceled)
            throw new IOException("Canceled");

        //獲取新的服務(wù)器地址之后,再次嘗試去連接池中找到連接來復(fù)用
        Internal.instance.get(connectionPool, address, this, selectedRoute);
        if (connection != null)
            return connection;

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        //這個方法將創(chuàng)建的connection賦值給成員變量connection
        acquire(result);
    }
    //這里釋放了鎖
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    //涉及到連接池的讀寫這里又鎖上了
    synchronized (connectionPool) {
        // 將連接存入連接池
        Internal.instance.put(connectionPool, result);

        // If another multiplexed connection to the same address was created concurrently, then
        // release this connection and acquire that one.
        //這里意思是如果同時有一個http/2.0的connection存在,那么我們廢棄剛剛創(chuàng)建的那個改用那個連接
        //之所以會存在同時創(chuàng)建的connection,注意到這段代碼中間握手的代碼并沒有加上同步,因為它是阻塞代碼
        //那么很可能我們這條線程在執(zhí)行握手的時候,別的線程也在握手中,并且先于我們走到到將連接放入連接池
        // 的代碼,之后我們再進來,就可能存在指向同一個地址的連接。
        if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
        }
    }
    //關(guān)閉socket的代碼放在同步外面,因為這句代碼沒有必要同步,加快釋放鎖的時間
    closeQuietly(socket);

    return result;
}

這個方法代碼比較復(fù)雜,所以注釋的比較詳細(xì),我們再來看看RealConnection的握手過程

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
        boolean connectionRetryEnabled) {
    //協(xié)議已經(jīng)確定
    if (protocol != null)
        throw new IllegalStateException("already connected");

    RouteException routeException = null;
    //傳輸層協(xié)議(tls)與加密套件,即OkHttpClient的默認(rèn)數(shù)組中的
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    //創(chuàng)建一個傳輸協(xié)議選擇器
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    if (route.address().sslSocketFactory() == null) {
        //說明不是https
        if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
            throw new RouteException(
                    new UnknownServiceException("CLEARTEXT communication not enabled for client"));
        }
        String host = route.address().url().host();
        //沒看懂,因為isCleartextTrafficPermitted永遠(yuǎn)返回的true
        if (!Platform.get().isCleartextTrafficPermitted(host)) {
            throw new RouteException(new UnknownServiceException(
                    "CLEARTEXT communication to " + host + " not permitted by network security policy"));
        }
    }

    while (true) {
        try {
            //如果是https而又是通過http代理訪問的,最終都會走到connnectSocket
            if (route.requiresTunnel()) {
                connectTunnel(connectTimeout, readTimeout, writeTimeout);
            } else {
                connectSocket(connectTimeout, readTimeout);
            }
            establishProtocol(connectionSpecSelector);
            break;
        } catch (IOException e) {
            closeQuietly(socket);
            closeQuietly(rawSocket);
            socket = null;
            rawSocket = null;
            source = null;
            sink = null;
            handshake = null;
            protocol = null;
            http2Connection = null;

            if (routeException == null) {
                routeException = new RouteException(e);
            } else {
                routeException.addConnectException(e);
            }

            if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
                throw routeException;
            }
        }
    }

    if (http2Connection != null) {
        synchronized (connectionPool) {
            allocationLimit = http2Connection.maxConcurrentStreams();
        }
    }
}

最終,代碼走到了connectSock中真正建立了連接:

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket() : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
        //這里,便真正地建立了連接,Platform用于平臺適配
        Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
        ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
        ce.initCause(e);
        throw ce;
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
}

這里主要做了三件事,創(chuàng)建了Socket、拿到了Socket建立連接后的輸入流、輸出流、有了這三者,和服務(wù)器的通訊就建立起來了,然后便通過establishProtocol(connectionSpecSelector),建立起與服務(wù)器進行通訊的協(xié)議:

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    if (route.address().sslSocketFactory() == null) {
        //都不是https,那只能1.x了
        protocol = Protocol.HTTP_1_1;
        socket = rawSocket;
        return;
    }
    
    connectTls(connectionSpecSelector);
    //如果支持http2.0協(xié)議,那么創(chuàng)建一個http2Connection,并開啟,這里實際是開啟一個線程去不斷地進行讀操作
    if (protocol == Protocol.HTTP_2) {
        socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
        http2Connection = new Http2Connection.Builder(true)
                .socket(socket, route.address().url().host(), source, sink).listener(this).build();
        http2Connection.start();
    }
}

我們再看看connectTls:

private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
        // Create the wrapper over the connected socket.
        sslSocket = (SSLSocket) sslSocketFactory.createSocket(rawSocket, address.url().host(),
                address.url().port(), true /* autoClose */);

        // Configure the socket's ciphers, TLS versions, and extensions.
        //這里是去驗證服務(wù)器握手傳遞過來的數(shù)據(jù),包括公鑰,傳輸協(xié)議版本等信息
        ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
        if (connectionSpec.supportsTlsExtensions()) {
            Platform.get().configureTlsExtensions(sslSocket, address.url().host(), address.protocols());
        }

        // Force handshake. This can throw!
        sslSocket.startHandshake();
        //通過握手,拿到Session,并將傳輸協(xié)議版本,支持的解密套件,及SSL證書解析出來
        Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());

        // Verify that the socket's certificates are acceptable for the target host.
        if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
            X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
            throw new SSLPeerUnverifiedException(
                    "Hostname " + address.url().host() + " not verified:" + "\n    certificate: "
                            + CertificatePinner.pin(cert) + "\n    DN: " + cert.getSubjectDN().getName()
                            + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
        }

        // Check that the certificate pinner is satisfied by the certificates presented.
        //檢查該證書是否有效,注意這里如果沒有檢查沒通過將會拋出AssertException
        //結(jié)束程序,因此,訪問自簽名證書且沒有將證書配置到OkHttpClient的https網(wǎng)站,將會導(dǎo)致程序crash
        address.certificatePinner().check(address.url().host(), unverifiedHandshake.peerCertificates());
        //到這里證書驗證通過
        // Success! Save the handshake and the ALPN protocol.
        String maybeProtocol = connectionSpec.supportsTlsExtensions()
                ? Platform.get().getSelectedProtocol(sslSocket) : null;
        socket = sslSocket;
        source = Okio.buffer(Okio.source(socket));
        sink = Okio.buffer(Okio.sink(socket));
        handshake = unverifiedHandshake;
        //拿到服務(wù)器支持的協(xié)議
        protocol = maybeProtocol != null ? Protocol.get(maybeProtocol) : Protocol.HTTP_1_1;
        success = true;
    } catch (AssertionError e) {
        if (Util.isAndroidGetsocknameError(e))
            throw new IOException(e);
        throw e;
    } finally {
        if (sslSocket != null) {
            Platform.get().afterHandshake(sslSocket);
        }
        if (!success) {
            closeQuietly(sslSocket);
        }
    }
}

到這里,整個握手的階段就已經(jīng)完成了,也就是RealConnection創(chuàng)建成功,我們回到方法streamAllocation.newStream中,現(xiàn)在我們有了Connnection來初始化一個HttpCodec,來看看它的初始化過程,RealConnection通過newCodec來進行初始化HttpCodec:

 public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    //根據(jù)協(xié)議版本的不同,創(chuàng)建不同的HttpCodec
    if (http2Connection != null) {
        return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
        socket.setSoTimeout(client.readTimeoutMillis());
        source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
        sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
        return new Http1Codec(client, streamAllocation, source, sink);
    }
}

還記得我們getResponseWithInterceptorChain方法中創(chuàng)建的第一個RealInterceptorChain,那時大多數(shù)參數(shù)還是null,攔截鏈執(zhí)行到這兒,總算是把參數(shù)都湊齊了,我們繼續(xù)回到ConnectInterceptor的intercept方法中,由于隔得遠(yuǎn),這里干脆再貼下代碼:

@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

終于是來到了攔截鏈的最后一環(huán),這一次的proceed將對應(yīng)著會調(diào)用到CallServerInterceptor的intercept方法:

@Override
public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //寫入請求頭
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
        // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
        // Continue" response before transmitting the request body. If we don't get that, return what
        // we did get (such as a 4xx response) without ever transmitting the request body.
        //100-continue用于客戶端在發(fā)送POST數(shù)據(jù)給服務(wù)器前,征詢服務(wù)器情況,看服務(wù)器是否處
        // 理POST的數(shù)據(jù),如果不處理,客戶端則不上傳POST數(shù)據(jù),如果處理,則POST上傳數(shù)據(jù)。在現(xiàn)實應(yīng)
        // 用中,通過在POST大數(shù)據(jù)時,才會使用100-continue協(xié)議
        if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
        }

        // Write the request body, unless an "Expect: 100-continue" expectation failed.
        //如果沒有100-continue頭,那么會執(zhí)行這里
        if (responseBuilder == null) {
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
        }
    }
    //到這里 整個request數(shù)據(jù)就寫進去了,下面這句代碼flush了一下
    httpCodec.finishRequest();

    if (responseBuilder == null) {
        //這里builder會記錄下這次響應(yīng)的頭信息
        responseBuilder = httpCodec.readResponseHeaders(false);
    }
    //創(chuàng)建一個響應(yīng),依次為綁定request,獲取握手的session數(shù)據(jù),請求發(fā)出去時間,接收到響應(yīng)時間
    Response response = responseBuilder.request(request)
            .handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis()).build();

    int code = response.code();
    if (forWebSocket && code == 101) {
        //切換協(xié)議,這里需要確保拿到的不是空的body
        response = response.newBuilder().body(Util.EMPTY_RESPONSE).build();
    } else {
        //注意這里responsebody是一個輸入流,意味著可能是大文件
        response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();
    }
    //響應(yīng)頭包含close,標(biāo)識關(guān)閉,此時可以復(fù)用Connection
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
        streamAllocation.noNewStreams();
    }
    //響應(yīng)成功,標(biāo)識無內(nèi)容,但是又明明有內(nèi)容
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
        throw new ProtocolException(
                "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}

代碼不是很復(fù)雜,主要是一些讀流寫流的操作罷了,如果以上環(huán)節(jié)都沒有出現(xiàn)問題,那么這個Response會一路return到攔截鏈的開始,繼而通過方法getResponseWithInterceptorChain()返回,從而我們在AsyncCall中的回調(diào)得到了執(zhí)行,自此,整個Interceptor的流程就結(jié)束了。我們將整個流程作出草圖:

整個設(shè)計是非常優(yōu)雅的,可以說是責(zé)任鏈模式的完美體現(xiàn),又有點類似AOP設(shè)計思想。不但將從Request獲取Reponse整個過程從RealCall中剝離出來,而且分層設(shè)計,每個攔截器均有可能完成這一工作,每個攔截器自行決定是自己完成整個工作,還是完成部分,然后交由下一個攔截器來完成,各個攔截器之間代碼獨立,各司其職,耦合度極低,即使需要替換某一個攔截器,也完全不會影響到其他攔截器的運行,有點類似于工廠的流水線。
責(zé)任鏈模式在Android中的體現(xiàn)即觸摸事件的傳遞機制,這里我們不再展開,下面說說AOP,面向切面編程的思想。

面向切面編程(AOP是Aspect Oriented Program的首字母縮寫) ,我們知道,面向?qū)ο蟮奶攸c是繼承、多態(tài)和封裝。而封裝就要求將功能分散到不同的對象中去,這在軟件設(shè)計中往往稱為職責(zé)分配。實際上也就是說,讓不同的類設(shè)計不同的方法。這樣代碼就分散到一個個的類中去了。這樣做的好處是降低了代碼的復(fù)雜程度,使類可重用。 但是人們也發(fā)現(xiàn),在分散代碼的同時,也增加了代碼的重復(fù)性。什么意思呢?比如說,我們在兩個類中,可能都需要在每個方法中做日志。按面向?qū)ο蟮脑O(shè)計方法,我們就必須在兩個類的方法中都加入日志的內(nèi)容。也許他們是完全相同的,但就是因為面向?qū)ο蟮脑O(shè)計讓類與類之間無法聯(lián)系,而不能將這些重復(fù)的代碼統(tǒng)一起來。 也許有人會說,那好辦啊,我們可以將這段代碼寫在一個獨立的類獨立的方法里,然后再在這兩個類中調(diào)用。但是,這樣一來,這兩個類跟我們上面提到的獨立的類就有耦合了,它的改變會影響這兩個類。那么,有沒有什么辦法,能讓我們在需要的時候,隨意地加入代碼呢?這種在運行時,動態(tài)地將代碼切入到類的指定方法、指定位置上的編程思想就是面向切面的編程。 (注:本段文字來自知乎,鏈接https://www.zhihu.com/question/24863332/answer/48376158,著作權(quán)歸作者所有)

雖然真正的AOP是與動態(tài)代理密不可分的,但為什么說這里類似AOP的設(shè)計思想呢?從上述的攔截鏈,我們可以發(fā)現(xiàn),Okhttp提供了兩個位置供開發(fā)者去自定義攔截器,一個是攔截鏈的開頭,一個是與服務(wù)器數(shù)據(jù)交互之前,為什么不是其他位置呢?事實上,Okhttp將整個攔截鏈流程,分為兩個切面,連接層與通訊層,從而,開發(fā)者自定義攔截器的地方恰好就是連接點:


雖然在使用Okhttp時,我們一般比較少去對攔截器進行自定義,但是可以看到,Okhttp早就為這種擴展預(yù)留好了接口,事實上,包括java的線程池、android的asynctask對aop的設(shè)計思路均有一定體現(xiàn),從而做到了以不變應(yīng)萬變,極大地提高了程序的擴展性,在Okhttp的wiki上可以找到這樣一段話:
Since making this change we’ve been able to simplify OkHttp’s internals substantially. The code is faster and easier to understand: the whole thing is just a stack of built-in interceptors.

下面貼兩個Okhttp wiki上的兩個自定義的Interceptor,打印Log:

class LoggingInterceptor implements Interceptor {

    @Override public Response intercept(Interceptor.Chain chain) throws IOException {
        Request request = chain.request();

        long t1 = System.nanoTime();
        logger.info(String.format("Sending request %s on %s%n%s",
        request.url(), chain.connection(), request.headers()));

        Response response = chain.proceed(request);

        long t2 = System.nanoTime();
        logger.info(String.format("Received response for %s in %.1fms%n%s",
        response.request().url(), (t2 - t1) / 1e6d, response.headers()));

        return response;
    }
}

Gzip壓縮:

final class GzipRequestInterceptor implements Interceptor {

    @Override public Response intercept(Interceptor.Chain chain) throws IOException {
        Request originalRequest = chain.request();
        if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
            return chain.proceed(originalRequest);
        }

        Request compressedRequest = originalRequest.newBuilder()
        .header("Content-Encoding", "gzip")
        .method(originalRequest.method(), gzip(originalRequest.body()))
        .build();
        return chain.proceed(compressedRequest);
    }

    private RequestBody gzip(final RequestBody body) {
        return new RequestBody() {
            
            @Override public MediaType contentType() {
                 return body.contentType();
            }

            @Override public long contentLength() {
                return -1; // We don't know the compressed length in advance!
            }

            @Override public void writeTo(BufferedSink sink) throws IOException {
                BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
                body.writeTo(gzipSink);
                gzipSink.close();
            }
        };
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容