OkHttp

  1. OkHttp 使用。
  • 創(chuàng)建OkHttpClient 對(duì)象。
  • build Request 對(duì)象,注入Call對(duì)象,new 出Call對(duì)象。
  • 調(diào)用execute(同步)或者 enqueue(異步)發(fā)送請(qǐng)求
package com.example.myapplication;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class TestOkHttp {
   static OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .readTimeout(5, TimeUnit.MINUTES).build();

   //同步
    public static void synRequest(){
        Request request = new Request.Builder().url("http://www.baidu.com").get().build();
        Call call = okHttpClient.newCall(request);
        try {
            Response response = call.execute();
            System.out.println(response.body().string());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //異步
    public static void AsynRequest(){
        Request request = new Request.Builder().url("http://www.baidu.com").get().build();
        Call call = okHttpClient.newCall(request);
        Callback callback = new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                System.out.println(response.body().string());
            }
        };
        call.enqueue(callback);
    }
    public static void main(String[] args) {
        synRequest();
        AsynRequest();
    }
}

簡(jiǎn)圖
  1. OkHttp源碼分析。
image.png
  • Dispatcher 分發(fā)器
    用于維護(hù)請(qǐng)求的狀態(tài),并維護(hù)一個(gè)線程池,執(zhí)行請(qǐng)求。
  /** Executes calls. Created lazily. */
  /** 線程池 */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  /** 準(zhǔn)備就緒的請(qǐng)求隊(duì)列 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  /** (異步)正在執(zhí)行的請(qǐng)求隊(duì)列 ,并包含了已經(jīng)執(zhí)行,被取消的請(qǐng)求*/
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  /** (同步)正在執(zhí)行的請(qǐng)求隊(duì)列 ,并包含了已經(jīng)執(zhí)行,被取消的請(qǐng)求*/
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  • 異步請(qǐng)求任務(wù)調(diào)度
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
//遍歷就緒請(qǐng)求隊(duì)列
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
//判斷不能大于請(qǐng)求最大值
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//判斷執(zhí)行線程不行超過(guò)最大線程數(shù)
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
//從就緒隊(duì)列移除
        i.remove();
//請(qǐng)求線程計(jì)數(shù)+1
        asyncCall.callsPerHost().incrementAndGet();
//添加到線程池執(zhí)行隊(duì)列
        executableCalls.add(asyncCall);
//添加到正在執(zhí)行的異步隊(duì)列
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
//遍歷執(zhí)行請(qǐng)求
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
  • interceptor 攔截器
image.png

image.png

執(zhí)行請(qǐng)求最主要的是攔截器鏈,通過(guò)\color{red}{chain.proceed}和攔截器方法\color{red}{interceptor.intercept(next)}形成遞歸調(diào)用(chain.proceed這個(gè)被攔截器調(diào)用的代碼我就不貼了)

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

 public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

\color{red}{總結(jié)3點(diǎn):}
\color{red}{1. 在發(fā)起請(qǐng)求前對(duì)響應(yīng)做處理。}
\color{red}{2. 調(diào)用下個(gè)攔截器返回response。}
\color{red}{3. 對(duì)response處理返回給上一個(gè)攔截器。}

\color{red}{攔截器做什么?}

  1. RetryAndFollowUpInterceptor.png
  2. BridgeInterceptor.png
  3. 緩存攔截器,主要是采用\color{red}{DiskLRUCache},用get方法,key的md5。
    緩存策略:
  • 緩存中沒有Response、不使用緩存、https沒有成功握手,hasConditions、!isCacheable;cacheResponse=null,重新請(qǐng)求。
  • 其余的符合條件在head中打上標(biāo)記返回緩存。
  1. ConnectInterceptor.png
  2. ConnectInterceptor.png

6.根據(jù)\color{red}{streamallocation}的數(shù)量啟動(dòng)清理連接池,采用的GC的標(biāo)記-清除算法--\color{red}{pruneAndGetAllocationCount}

image.png

  1. \color{red}{CallServerInterceptor}主要看下面代碼\color{red}{exchange}執(zhí)行請(qǐng)求
    @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        if (request.body().isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
      exchange.noRequestBody();
    }

    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }

    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }

    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }

    exchange.responseHeadersEnd(response);

    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容