OkHttp源碼解析

參考:

源碼版本:3.14.9
源碼地址:https://github.com/square/okhttp

此篇文章主要講解OkHttp的請(qǐng)求過(guò)程,以及源碼中一些比較難懂的知識(shí)點(diǎn)的使用,并把這些知識(shí)點(diǎn)抽取成一個(gè)Demo,可以比較深刻的去理解源碼的執(zhí)行過(guò)程。Demo 包含 :

  1. 源碼中 Dispatcher類(lèi)的調(diào)度過(guò)程
  2. 以及RealCall類(lèi)中getResponseWithInterceptorChain()方法中責(zé)任鏈模式的調(diào)度過(guò)程。
image.png

簡(jiǎn)單調(diào)用

       String url = "www.baidu.com";
       OkHttpClient client = new OkHttpClient.Builder().build();
       Request request = new Request.Builder().url(url).build();

       Call call = client.newCall(request);
       call.enqueue(new Callback() {
           @Override
           public void onFailure(Call call, IOException e) {

           }

           @Override
           public void onResponse(Call call, Response response) throws IOException {

           }
       });

源碼解析流程

此處不講解OkHttpClient的初始化過(guò)程,以及Request的構(gòu)建過(guò)程,用到的是 Builder設(shè)計(jì)模式,直接從enqueue()方法開(kāi)始解析。

1. 請(qǐng)求發(fā)送過(guò)程

  • 注意:call.execute()是同步方法,call.enqueue()是異步方法,一般調(diào)用call.enqueue()

點(diǎn)擊查看call.enqueue()方法,進(jìn)入到 RealCall類(lèi)中的call.enqueue()方法,如下:

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

這個(gè) RealCall里面執(zhí)行的方法,首先會(huì)把這個(gè)Call加入到Dispatcher這個(gè)調(diào)度器里面,進(jìn)入Dispatcher類(lèi)中的 enqueue() 方法,如下:

void enqueue(AsyncCall call) {
  synchronized (this) {
    //將此次請(qǐng)求加入準(zhǔn)備請(qǐng)求隊(duì)列中
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}

Dispatcher類(lèi)中包含了三個(gè)隊(duì)列,一個(gè)線程池:
為了管理所有的請(qǐng)求,Dispatcher采用了隊(duì)列+生產(chǎn)+消費(fèi)的模式。

  • 為同步執(zhí)行提供了runningSyncCalls來(lái)管理所有的同步請(qǐng)求;
  • 為異步執(zhí)行提供了runningAsyncCalls和readyAsyncCalls來(lái)管理所有的異步請(qǐng)求。其中readyAsyncCalls是在當(dāng)前可用資源不足時(shí),用于緩存請(qǐng)求的。
  • ExecutorService線程池

查看 promoteAndExecute()方法,

  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
    //調(diào)用 AsyncCall 類(lèi) executeOn的方法,在executeOn方法中開(kāi)啟線程池
      asyncCall.executeOn(executorService());//1
    }

    return isRunning;
  }

在1處會(huì)調(diào)用AsyncCall中的executeOn()方法

    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);//開(kāi)啟線程池
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

此時(shí)會(huì)調(diào)用AsyncCall中的execute()方法,

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        //核心函數(shù)
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

getResponseWithInterceptorChain()方法就是執(zhí)行攔截器鏈,直到返回 Response。通過(guò)責(zé)任鏈模式循環(huán)調(diào)用所有攔截器,每個(gè)攔截器可以根據(jù)Request和Reponse前后執(zhí)行相應(yīng)的邏輯。

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

這里把自定義的攔截器,還有內(nèi)置的必要的攔截器都添加到一個(gè)List里面了,最后用List的攔截器創(chuàng)建一個(gè)攔截器的鏈—— RealInterceptorChain。而這個(gè)RealInterceptorChain 其實(shí)是實(shí)現(xiàn)了Interceptor 里面的一個(gè)Chain 接口。

我們先了解一下RealInterceptorChain 這個(gè)類(lèi)是如何進(jìn)行工作的。

首先RealInterceptorChain,是實(shí)現(xiàn)了Interceptor.Chain 接口的一個(gè)類(lèi),這個(gè)接口有是三個(gè)方法,后面會(huì)介紹到,其中一個(gè)就是前面調(diào)用的proceed方法。

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
      ...
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    return response;
  }

省略了前后的一些代碼,發(fā)現(xiàn),其實(shí)這里又新建一個(gè) RealInterceptorChain,然后調(diào)用了這個(gè)攔截器的 intercept 的方法。那么在每個(gè)攔截器里面做了什么,這個(gè)攔截的方法又是做了什么呢?

首先看一下攔截器這個(gè)接口:

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;
    ...
  }
}

很明顯,每個(gè)實(shí)現(xiàn)Interceptor的攔截器。都會(huì)實(shí)現(xiàn)了intercept,這樣一個(gè)攔截的方法。而傳入的就是一個(gè)Chain。符合前面RealInterceptorChain的調(diào)用。

以一個(gè)攔截器的代碼為例:BridgeInterceptor,這個(gè)類(lèi)實(shí)現(xiàn)的intercept方法,而這個(gè)方法里面,除了那些前置或后續(xù)的操作之外,在中間你會(huì)發(fā)現(xiàn) 下面一句代碼:

Response networkResponse = chain.proceed(requestBuilder.build());

又通過(guò)傳進(jìn)來(lái)的Chain ,重新調(diào)用了proceed的方法。好吧,到這里應(yīng)該可以了解到這些攔截器,是如何一個(gè)個(gè)調(diào)用被調(diào)用的了,原理都是通過(guò)了中間的一個(gè)RealInterceptorChain,一個(gè)接一個(gè)的向下調(diào)用。然后得到的結(jié)果又一個(gè)一個(gè)向上傳。

總結(jié)一下,這些攔截器的調(diào)用:

剛開(kāi)始getResponseWithInterceptorChain的最后開(kāi)始調(diào)用了 chain.proceed,然后在RealInterceptorChain的里面,又新建一個(gè) RealInterceptorChain,并且調(diào)用當(dāng)前的攔截器的 intercept,并且把新建的Chain傳遞進(jìn)去,然后攔截器的 intercept方法里面,除了一些自身的操作外,又調(diào)用Chain的proceed方法。就這樣一直調(diào)用新建Chian, 一直調(diào)用到最后一個(gè)攔截器。

這里的調(diào)用其實(shí)是一個(gè)迭代,遞歸的結(jié)構(gòu)。請(qǐng)求做了前一步的處理才能到下一級(jí),一級(jí)級(jí)下去,到最后真正發(fā)出去請(qǐng)求,相對(duì)做了攔截。而得到結(jié)果,只有當(dāng)最后一個(gè)攔截器完全了請(qǐng)求,拿到結(jié)果之后,才會(huì)把結(jié)果往上一級(jí),上一級(jí)拿到結(jié)果,做了相應(yīng)的處理之后,再到上一級(jí),這就是對(duì)結(jié)果的攔截處理。

Demo

結(jié)構(gòu):

image.png

調(diào)用:

    public static void main(String[] args) throws IOException {
        //Dispatcher類(lèi)的調(diào)度過(guò)程
        new Dispatcher().enqueue(new AsyncCall("22","33"));

        //demo02調(diào)用RealCall類(lèi)中g(shù)etResponseWithInterceptorChain()
        //方法中責(zé)任鏈模式的調(diào)度過(guò)程
        Request request = new Request();
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.add(new RetryAndFollowUpInterceptor());
        interceptors.add(new BridgeInterceptor());
        interceptors.add(new CacheInterceptor());
        interceptors.add(new ConnectInterceptor());

        Interceptor.Chain chain = new RealInterceptorChain(interceptors,0,request);

        Response response = chain.proceed(request);
    }

運(yùn)行結(jié)果:

---Dispatcher---enqueue-
--AsyncCall執(zhí)行execute方法--
1.失敗重試攔截器
2.請(qǐng)求和響應(yīng)轉(zhuǎn)換攔截器
3.緩存攔截器
4.與服務(wù)器建立連接攔截器
 RealInterceptorChain 第 3 次執(zhí)行
 RealInterceptorChain 第 2 次執(zhí)行
 RealInterceptorChain 第 1 次執(zhí)行
 RealInterceptorChain 第 0 次執(zhí)行

Demo挺簡(jiǎn)單的,可以自行下載:
https://github.com/Ityang/OKHttpDemo

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容