okhttp3 源碼分析

在上一篇 Retrofit 學(xué)習(xí)第三彈—源碼分析篇 分析了 Retrofit 的源碼,分析到請求 Call 位置,是調(diào)用的 okhttp3 中的 OkHttpClient 來完成請求的,所以 Retrofit 是基于 okhttp3 的一個(gè)封裝,通過注解來設(shè)定參數(shù)構(gòu)造出 Request,然后通過 OkHttpClient 創(chuàng)建 Call 實(shí)例。下面就來分析下 okhttp3 的流程。

1. okhttp 使用回顧

簡單看下 okhttp Get 的請求過程:

public static String request(String url) throws IOException {
    // 構(gòu)建 OkHttpClient
    OkHttpClient client = genericClient(headMaps);
    //新建一個(gè) Request 對(duì)象
    Request request = new Request.Builder()
            .url(url)
            .build();
    // 同步請求        
    Response response = client.newCall(request).execute();
    if (response.isSuccessful()) {
        return response.body().string();
    }else{
        throw new IOException("Unexpected code " + response);
    }
}

// 構(gòu)建 OkHttpClient 實(shí)例
private static OkHttpClient genericClient(Map<String, String> headMaps) {
    // HttpLoggingInterceptor
    HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor(
            new HttpLoggingInterceptor.Logger() {
                @Override
                public void log(String message) {
                    LogUtil.d("httpInfo", message);
                }
            }
    );
    httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
    OkHttpClient httpClient = new OkHttpClient.Builder()
            // 添加 header 信息
            .addInterceptor(new Interceptor() {
                @Override
                public Response intercept(Chain chain) throws IOException {
                    Request.Builder newBuilder = chain.request().newBuilder();
                    if (headMaps != null && !headMaps.isEmpty()) {
                        newBuilder.headers(Headers.of(headMaps));
                    }
                    return chain.proceed(newBuilder.build());
                }
            })
            // 添加 log 打印
            .addNetworkInterceptor(httpLoggingInterceptor)
            // 連接超時(shí)時(shí)間
            .connectTimeout(60, TimeUnit.SECONDS)
            // 讀取超時(shí)時(shí)間
            .readTimeout(60 * 30, TimeUnit.SECONDS)
            // 寫入超時(shí)時(shí)間
            .writeTimeout(60 * 30, TimeUnit.SECONDS)
            .build();

    return httpClient;
}

以上就是 OkHttpClient 構(gòu)建后,完成 get 同步請求的一個(gè)過程,包括通過 OkHttpClient.Builder 設(shè)置超時(shí)時(shí)間,設(shè)置攔截器等,后面會(huì)對(duì)自己設(shè)置的攔截器部分詳細(xì)講解。

2. 整體流程

okhttp_process.png

先來看下 okhttp 的整體流程,先對(duì)整體流程有個(gè)印象,然后再一步步分析具體的步驟:

  • (1) 直接構(gòu)建 OkHttpClient;或者通過 Builder 構(gòu)建,設(shè)置參數(shù)

  • (2) 構(gòu)建 Request,url、header、body、請求方法等

  • (3) 構(gòu)建 RealCall,通過 RealCall 執(zhí)行請求,同步 execute() 或者異步 enqueue(Callback responseCallback)

  • (4) 只要通過 getResponseWithInterceptorChain() 方法,開啟請求的責(zé)任鏈:自定義攔截器、RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor

  • (5) 中間異常處理,請求成功 Response 處理等

3. OkHttpClient

創(chuàng)建 OkHttpClient 是通過 Builder 構(gòu)建的,看下 Builder 中的參數(shù):

final Dispatcher dispatcher;  //分發(fā)器
final Proxy proxy;  //代理
final List<Protocol> protocols; //協(xié)議 Http1 Http2
final List<ConnectionSpec> connectionSpecs; //傳輸層版本和連接協(xié)議
final List<Interceptor> interceptors; //攔截器
final List<Interceptor> networkInterceptors; //網(wǎng)絡(luò)攔截器,一般設(shè)置 HttpLoggingInterceptor
final ProxySelector proxySelector; //代理選擇
final CookieJar cookieJar; //cookie
final Cache cache; //緩存
final InternalCache internalCache;  //內(nèi)部緩存
final SocketFactory socketFactory;  //socket 工廠
final SSLSocketFactory sslSocketFactory; //安全套接層socket 工廠,用于HTTPS
final CertificateChainCleaner certificateChainCleaner; // 驗(yàn)證確認(rèn)響應(yīng)證書 適用 HTTPS 請求連接的主機(jī)名。
final HostnameVerifier hostnameVerifier;    //  主機(jī)名字確認(rèn)
final CertificatePinner certificatePinner;  //  證書鏈
final Authenticator proxyAuthenticator;     //代理身份驗(yàn)證
final Authenticator authenticator;      // 本地身份驗(yàn)證
final ConnectionPool connectionPool;    //連接池,復(fù)用連接
final Dns dns;  //域名
final boolean followSslRedirects;  //安全套接層重定向
final boolean followRedirects;  //本地重定向
final boolean retryOnConnectionFailure; //重試連接失敗
final int connectTimeout;    //連接超時(shí)
final int readTimeout; //read 超時(shí)
final int writeTimeout; //write 超時(shí) 

很多參數(shù)我們在使用時(shí)都是默認(rèn)的,即使我們不設(shè)置參數(shù) Builder 中也會(huì)給出默認(rèn)的參數(shù),如超時(shí)時(shí)間

    connectTimeout = 10_000;
    readTimeout = 10_000;
    writeTimeout = 10_000;

4. RealCall

有了 OkHttpClient 實(shí)例,然后需要一個(gè) Request 參數(shù),設(shè)置 url,請求方法等,通過 newCall(Request request) 方法來得到 RealCall,RealCall 是請求執(zhí)行的對(duì)象,請求分為同步請求和異步請求

  • void enqueue(Callback responseCallback) 異步請求

  • Response execute() 同步請求

上面例子中 Response response = client.newCall(request).execute(); 開啟了整個(gè) GET 請求。

RealCall 構(gòu)造方法:

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    // 失敗重試以及重定向 攔截器
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

同步請求 execute()

public Response execute() throws IOException {
    // (1) 一個(gè) Call 只能請求一次,重復(fù)請求拋出異常
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      // (2) 將請求添加到分發(fā)器的同步請求集合中
      client.dispatcher().executed(this);
      // (3) 開啟請求責(zé)任鏈,請求真正執(zhí)行的位置
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      // (4) 請求結(jié)束,將請求從分發(fā)器的同步結(jié)合中移除
      client.dispatcher().finished(this);
    }
 }

(1) 一個(gè) Call 對(duì)象只能執(zhí)行一次,重復(fù)請求拋出異常,再次請求會(huì)重新創(chuàng)建 RealCall 對(duì)象;

(2) 將請求添加到 dispatcher 分發(fā)器的同步請求集合中,對(duì)于同步請求,分發(fā)器僅僅是記錄一下同步請求的集合,并沒有做更多的操作;

(3) okhttp 網(wǎng)絡(luò)請求是通過一連串的攔截器來完成的,基于責(zé)任鏈的方式,也是最重要的過程;

(4) 請求結(jié)束,告知分發(fā)器,將請求從同步集合中移除。

對(duì)于同步請求 Dispatcher 并沒有過多參與,僅僅是在請求時(shí)將請求添加同步請求集合中, 請求完畢,從集合中移除;而異步請求,分發(fā)器才真正發(fā)揮作用, 里面是通過線程池來執(zhí)行異步請求。

請求真正執(zhí)行的位置是第 (3) 步,開啟請求的責(zé)任鏈

Response getResponseWithInterceptorChain() throws IOException {
    //  interceptors 集合
    List<Interceptor> interceptors = new ArrayList<>();
    // (1) 使用者自定義攔截器
    interceptors.addAll(client.interceptors());
    // (2) 失敗重試以及重定向 攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    // (3) 橋接攔截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // (4) 緩存攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // (5) 連接攔截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
    // (6) 網(wǎng)絡(luò)攔截器,一般是設(shè)置打印 log 的攔截器,HttpLoggingInterceptor
      interceptors.addAll(client.networkInterceptors());
    }
    // (7) 請求服務(wù)器攔截器(最后一步)
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // (8) 創(chuàng)建責(zé)任鏈
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    // (9) 開啟責(zé)任鏈
    return chain.proceed(originalRequest);
}

getResponseWithInterceptorChain() 方法主要是創(chuàng)建各個(gè)攔截器,并添加到集合中,然后創(chuàng)建責(zé)任鏈 RealInterceptorChain,該責(zé)任鏈持有各個(gè)攔截器,開啟責(zé)任鏈,執(zhí)行每一個(gè)攔截器,最終完成網(wǎng)絡(luò)請求。注意攔截器添加的順序,也就是責(zé)任鏈的執(zhí)行順序。

5. 責(zé)任鏈模式

在分析各個(gè)攔截器執(zhí)行的過程前,插入責(zé)任鏈模式的一個(gè)小例子,方便對(duì)責(zé)任鏈的理解。

責(zé)任鏈可以抽象成一個(gè)鏈條 IChain

// 責(zé)任鏈
public interface IChain {
    void proceed();
}

鏈條有 N 個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有一個(gè)節(jié)點(diǎn)處理器 INodeHandler

// 節(jié)點(diǎn)處理器
public interface INodeHandler {

    void handle(IChain chain);
}

實(shí)際例子:假設(shè)有一個(gè)汽車生產(chǎn)線,生產(chǎn)組裝汽車 Car,簡化分為 3 個(gè)步驟,組裝車身,安裝輪胎,添加內(nèi)飾。汽車生產(chǎn)線相當(dāng)于責(zé)任鏈,組裝的 3 個(gè)步驟相當(dāng)于 3 個(gè)節(jié)點(diǎn)處理器。處理過程開始時(shí),首先將各個(gè)節(jié)點(diǎn)處理器添加到責(zé)任鏈的節(jié)點(diǎn)處理器結(jié)合中,每個(gè)節(jié)點(diǎn)執(zhí)行完畢,改變責(zé)任鏈中當(dāng)前需要執(zhí)行的索引,也就是完成一個(gè)步驟,告知責(zé)任鏈需要執(zhí)行下一個(gè)步驟,最終所有節(jié)點(diǎn)都執(zhí)行完畢,一個(gè)汽車也就生產(chǎn)完了。下面給出主要的代碼。

汽車類 Car

public class Car {

    private String body;
    private String inner;
    private String wheel;

    public Car() {
    }

    ...
}

汽車生產(chǎn)線責(zé)任鏈

public class CarChain implements IChain {

    private Car car;
    private List<INodeHandler> handlerList = new ArrayList<>();
    private int index;

    public CarChain(Car car) {
        this.car = car;
    }

    public Car getCar() {
        return car;
    }

    public void setCar(Car car) {
        this.car = car;
    }

    public List<INodeHandler> getHandlerList() {
        return handlerList;
    }

    public void setHandlerList(List<INodeHandler> handlerList) {
        this.handlerList = handlerList;
    }

    public int getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }

    // 責(zé)任鏈執(zhí)行,每次執(zhí)行索引對(duì)應(yīng)的節(jié)點(diǎn)
    @Override
    public void proceed() {
        if (handlerList == null || handlerList.size() < 1){
            throw new RuntimeException("handlerList is empty");
        }
        if (index >= handlerList.size()) {
            System.out.println("Car is finished");
            System.out.println("A new car : " + car.toString());
            return;
        }
        handlerList.get(index).handle(this);
    }
}

責(zé)任鏈的 3 個(gè)節(jié)點(diǎn)處理器

// 車身節(jié)點(diǎn)處理器
public class CarBodyHandler implements INodeHandler{

    @Override
    public void handle(IChain chain) {
        CarChain carChain = (CarChain) chain;
        carChain.getCar().setBody("Body");
        carChain.setIndex(carChain.getIndex() + 1);
        carChain.proceed();
    }
}

// 輪胎節(jié)點(diǎn)處理器
public class CarWheelHandler implements INodeHandler{

    @Override
    public void handle(IChain chain) {
        CarChain carChain = (CarChain) chain;
        carChain.getCar().setWheel("Wheel");
        carChain.setIndex(carChain.getIndex() + 1);
        carChain.proceed();
    }
}

// 內(nèi)飾節(jié)點(diǎn)處理器
public class CarInnerHandler implements INodeHandler{

    @Override
    public void handle(IChain chain) {
        CarChain carChain = (CarChain) chain;
        carChain.getCar().setInner("Inner");
        carChain.setIndex(carChain.getIndex() + 1);
        carChain.proceed();
    }
}

測試責(zé)任鏈

public static void main(String[] args) {
    Car car = new Car();
    System.out.println("before -- " + car.toString());
    CarChain carChain = new CarChain(car);
    List<INodeHandler> nodeHandlers = new ArrayList<>();
    nodeHandlers.add(new CarBodyHandler());
    nodeHandlers.add(new CarWheelHandler());
    nodeHandlers.add(new CarInnerHandler());
    carChain.setHandlerList(nodeHandlers);
    carChain.proceed();
}

6. RealInterceptorChain

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // 假設(shè)已經(jīng)有了流 stream,確保 connection 和 請求的端口是一致的,是可用的
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // 假設(shè)已經(jīng)有了流 stream,確保前一個(gè)攔截器調(diào)用了 責(zé)任鏈的  proceed()  方法
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // 調(diào)用下一個(gè)攔截器,將索引 +1 處理,可以看到和上面的例子的方式是一樣的
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    // 調(diào)用攔截器進(jìn)行處理
    Response response = interceptor.intercept(next);

    // 確保如果不是最后一個(gè)攔截器,僅需要調(diào)用 proceed() 方法
    // 因?yàn)樵谶@個(gè)方法中傳入的參數(shù)是前面攔截器一步一步準(zhǔn)備的,以便于最后一個(gè)攔截器請求服務(wù)器
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // 請求結(jié)果不能為空
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }
    // 請求結(jié)果的 body 不能為空
    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
 }

RealInterceptorChain 是執(zhí)行的責(zé)任鏈,傳入的參數(shù)很多,有些參數(shù)不是開始就有的,需要每個(gè)攔截器在執(zhí)行過程中準(zhǔn)備的,所以需要除了最后一個(gè)攔截器 CallServerInterceptor 以外,其他攔截器都需要調(diào)用 RealInterceptorChain 的 proceed() 方法,以便于為下一步準(zhǔn)備參數(shù)。

7. Interceptor

下面來看看各個(gè)攔截器,根據(jù)執(zhí)行的順序來分析。

注意:上面提到,除了 攔截器 CallServerInterceptor 以外,其他攔截器都需要調(diào)用 RealInterceptorChain 的 proceed() 方法,分析每個(gè)攔截器時(shí),我們需要關(guān)注一下。

7.1 自定義攔截器

在最開始的 get 請求的例子中,添加了一個(gè)自定義的攔截器,用于設(shè)置網(wǎng)絡(luò)請求的 header,主要通過責(zé)任鏈 Chain 拿到請求,然后重新構(gòu)建一個(gè)請求,構(gòu)建過程中將 header 設(shè)置進(jìn)去,然后調(diào)用 proceed() 方法,向下傳遞。

OkHttpClient httpClient = new OkHttpClient.Builder()
            // 添加 header 信息
            .addInterceptor(new Interceptor() {
                @Override
                public Response intercept(Chain chain) throws IOException {
                    Request.Builder newBuilder = chain.request().newBuilder();
                    if (headMaps != null && !headMaps.isEmpty()) {
                        newBuilder.headers(Headers.of(headMaps));
                    }
                    return chain.proceed(newBuilder.build());
                }
            })
            .build();

7.2 RetryAndFollowUpInterceptor

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // 通過路線連接失敗,請求將不會(huì)再發(fā)送
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // 與服務(wù)器嘗試通信失敗,請求不會(huì)再發(fā)送。
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // 拋出未檢查的異常,釋放資源
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp = followUpRequest(response, streamAllocation.route());

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }

(1) 失敗重試和重定向攔截器主要構(gòu)建 StreamAllocation 參數(shù),StreamAllocation 類主要協(xié)調(diào) Connections、Streams、Calls,Connections 是對(duì)遠(yuǎn)程服務(wù)器連接的物理 socket;Streams是在 Connections 上的 Http 請求/響應(yīng) 對(duì);Calls 是一系列的 Streams

(2)通過調(diào)用 chain.proceed(request, streamAllocation, null, null);向下傳遞,并對(duì)很多種異常情況做出了處理

(3) 對(duì)返回結(jié)果 response 處理

7.3 BridgeInterceptor

  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    
    // (1)將用戶的request轉(zhuǎn)換為發(fā)送到server的請求
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    // (2)將服務(wù)器的響應(yīng)轉(zhuǎn)換成返回結(jié)果的響應(yīng),主要添加一些附加信息,如請求等
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

BridgeInterceptor 是橋接攔截器,主要將用戶請求轉(zhuǎn)換成對(duì)服務(wù)器的請求,從代碼中可以看出,添加了很多請求頭中的信息,如 Content-Type、Content-Length、Host、Connection、Accept-Encoding、Cookie、User-Agent;同時(shí)在請求結(jié)果回來時(shí),也能夠?qū)憫?yīng)做出轉(zhuǎn)換,在響應(yīng)中添加附加信息等。

7.4 CacheInterceptor

  @Override 
  public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    // 創(chuàng)建緩存
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    // 有緩存,但是不可用,關(guān)閉緩存的結(jié)果
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // 不允許網(wǎng)絡(luò)請求時(shí)返回失敗的結(jié)果
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 不需要請求時(shí),從緩存中取出結(jié)果
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    // 繼續(xù)執(zhí)行責(zé)任鏈
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // 如果有緩存結(jié)果,需要選擇性返回響應(yīng)
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // 更新緩存
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

CacheInterceptor 緩存攔截器沒有過多說的,主要是對(duì)請求進(jìn)行緩存,然后繼續(xù)執(zhí)行責(zé)任鏈,同樣, 當(dāng)響應(yīng)返回時(shí),需要根據(jù)結(jié)果對(duì)比緩存的響應(yīng),選擇性返回,并且更新緩存。

7.5 ConnectInterceptor

RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

連接攔截器主要是構(gòu)建 RealConnection 對(duì)象和 HttpCodec 對(duì)象,其中 RealConnection 是在連接池中查找,HttpCodec 是對(duì) Http 請求和相應(yīng)的一個(gè)編碼和解碼的抽象,它的構(gòu)建需要 StreamAllocation、RealConnection,StreamAllocation 在 RetryAndFollowUpInterceptor 中已經(jīng)構(gòu)建,RealConnection 是從連接池中查找。有了 streamAllocation, httpCodec, connection 這個(gè)幾個(gè)參數(shù),通過調(diào)用 realChain.proceed(request, streamAllocation, httpCodec, connection); 傳遞到責(zé)任鏈的下一個(gè)步驟中。

7.6 netWorkInterceptor

這一個(gè)攔截器是我們自己添加的攔截器,如果沒有設(shè)置,則不會(huì)執(zhí)行,這個(gè)攔截器一般是用于打印 log 信息,將請求信息和響應(yīng)信息打印出來,一般使用 HttpLoggingInterceptor,我們只需要自定義一下打印的格式即可。

7.7 CallServerInterceptor

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    // (1)寫入請求
    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // (2)如果有請求體,寫入請求體,POST 請求
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    // (3)請求碼判斷
    int code = response.code();
    if (code == 100) {
      responseBuilder = httpCodec.readResponseHeaders(false);
      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

    if (forWebSocket && code == 101) {
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // (4) openResponseBody 獲取響應(yīng)體信息
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
    // (5)往上級(jí) ConnectInterceptor 返回 Response。
    return response;
  }

CallServerInterceptor 是請求責(zé)任鏈的最后一個(gè)動(dòng)作,完成后就可以獲取到 Response。CallServerInterceptor 主要完成這個(gè)幾個(gè)任務(wù),信息的寫和讀取主要是利用 Okio 來完成。

(1) 寫入請求頭信息。

(2) 有請求體的情況下,寫入請求體信息。

(3) 結(jié)束請求。

(4) 讀取響應(yīng)頭信息。

(5) 往上一級(jí) ConnectInterceptor 返回一個(gè)網(wǎng)絡(luò)請求回來的 Response。

小結(jié)

okhttp 的請求實(shí)際上就是分解為各個(gè)攔截器的動(dòng)作,各個(gè)攔截器按照先后的順序,依次執(zhí)行,采用責(zé)任鏈模式,最終完成請求,責(zé)任鏈在請求過程中逐步攜帶請求的參數(shù)信息, 逐步的意思就是來源前面攔截器創(chuàng)建的參數(shù),后面的攔截器依賴前面攔截器的參數(shù),參數(shù)由責(zé)任鏈傳遞過來;此外,在請求結(jié)果返回時(shí),責(zé)任鏈中又從后面回溯到前面,依次處理返回結(jié)果,就是這樣一去一回的過程。

okhttp_process_1.png

異步請求

上面是同步請求的詳細(xì)過程,下面再簡單看一下異步請求的過程。

異步執(zhí)行過程通過調(diào)用 client.newCall(request).enqueue(callback);

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
 }

realCall 的 enqueue() 方法實(shí)際上是調(diào)用分發(fā)器 dispatcher 的 enqueue() 方法,同時(shí)將 回調(diào)方法包裝了一下,包裝成一個(gè) AsyncCall,AsyncCall 實(shí)際上是一個(gè) Runnable。

synchronized void enqueue(AsyncCall call) {
    // 正在異步執(zhí)行的請求數(shù)量小于最大限制,并且正在要請求的這個(gè) call,指向的目標(biāo)服務(wù)器沒有達(dá)到最大數(shù)量,小于 5 個(gè)
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else { 
      // 添加到異步請求的準(zhǔn)備集合當(dāng)中
      readyAsyncCalls.add(call);
    }
  }

(1) Dispatcher 將請求添加到集合中,添加過程中先判斷正在異步執(zhí)行的請求數(shù)量是否最大限制,并且正在要請求的這個(gè) call,指向的目標(biāo)服務(wù)器都否達(dá)到最大數(shù)量,小于默認(rèn) 5 個(gè),也就說當(dāng)前請求數(shù)量沒有達(dá)到最大數(shù)量,請求的目標(biāo)主機(jī) 不超過 5 個(gè)時(shí)(可修改數(shù)量),就添加到正在執(zhí)行的集合中,否則添加到等待集合中,等待執(zhí)行。

(2) executorService 是一個(gè)線程池,異步執(zhí)行 AsyncCall。

@Override 
protected void execute() {
  boolean signalledCallback = false;
  try {
    // (1) 上面同步過程分析過的,網(wǎng)絡(luò)請求的入口,責(zé)任鏈開啟的地方
    Response response = getResponseWithInterceptorChain();
    // (2) 請求取消
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
    // (3) 請求成功
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}

線程池開始執(zhí)行一個(gè) AsyncCall 時(shí),調(diào)用它的 execute() 方法??梢钥吹綄?shí)際上執(zhí)行過程和同步過程基本一致,只不過最后請求結(jié)果返回時(shí),通過 responseCallback 回調(diào)給請求者。

(1) getResponseWithInterceptorChain() 請求責(zé)任鏈開啟,和上面同步分析過程一致

(2) 取消請求或者請求過程中發(fā)生異常,通過回調(diào)接口告知請求失敗 responseCallback.onFailure()

(3) 請求成功 responseCallback.onResponse(RealCall.this, response);將請求結(jié)果返回

以上就是 okhttp 請求過程分析,里面涉及一些網(wǎng)絡(luò)請求,okio 的東西,沒有具體講,后面再有文章單獨(dú)來講,重點(diǎn)需要掌握責(zé)任鏈這個(gè)設(shè)計(jì)模式的使用,文章中也有一個(gè)小例子,沒有看懂 okhttp 責(zé)任鏈模式,可以回頭看看上面的小例子,當(dāng)然責(zé)任鏈模式還有其他形式,如通過每個(gè)節(jié)點(diǎn)來設(shè)置后續(xù)節(jié)點(diǎn),這樣就需要持有責(zé)任鏈。

參考

OKHttp源碼解析

OKHTTP攔截器CallServerInterceptor的簡單分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 重點(diǎn) 本文打算從三點(diǎn)來剖析OkHttp3: 網(wǎng)絡(luò)請求的整理流程-會(huì)使用 攔截器模式-易擴(kuò)展 緩存和連接池-高性能 ...
    lycknight閱讀 381評(píng)論 0 3
  • OkHttp3的使用 1、創(chuàng)建OkHttpClient;2、創(chuàng)建Request請求對(duì)象;3、OkHttpClien...
    Samuel_Tom閱讀 359評(píng)論 0 0
  • 在OkHttp3中,其靈活性很大程度上體現(xiàn)在可以 intercept 其任意一個(gè)環(huán)節(jié),而這個(gè)優(yōu)勢便是okhttp3...
    Jdqm閱讀 41,013評(píng)論 7 71
  • <<不必費(fèi)神>> 事情就這么簡單,他是一個(gè)專注的人。 舉起筷子,他想把天空那片烏云夾來吃了。
    小莊主人閱讀 165評(píng)論 0 0
  • 還記得之前聽過老沈講的一個(gè)關(guān)于長生的話題。至今印象深刻! 在生物研究中發(fā)現(xiàn),大部分正常體細(xì)胞在體外培養(yǎng)時(shí),不能無限...
    蝸行2019閱讀 2,248評(píng)論 0 1

友情鏈接更多精彩內(nèi)容