Okio 源碼解析(二):超時(shí)機(jī)制

簡(jiǎn)介

上一篇文章(Okio 源碼解析(一):數(shù)據(jù)讀取流程)分析了 Okio 數(shù)據(jù)讀取的流程,從中可以看出 Okio 的便捷與高效。Okio 的另外一個(gè)優(yōu)點(diǎn)是提供了超時(shí)機(jī)制,并且分為同步超時(shí)與異步超時(shí)。本文具體分析這兩種超時(shí)的實(shí)現(xiàn)。

同步超時(shí)

回顧一下 Okio.source 的代碼:

public static Source source(InputStream in) {
    // 生成一個(gè) Timeout 對(duì)象
    return source(in, new Timeout());
  }

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
            // 超時(shí)檢測(cè)
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

Source 的構(gòu)造方法中,傳入了一個(gè) Timeout 對(duì)象。在下面創(chuàng)建的匿名的 Source 對(duì)象的 read 方法中,先調(diào)用了 timeout.throwIfReached(),這里顯然是判斷是否已經(jīng)超時(shí),代碼如下:

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }

這里邏輯很簡(jiǎn)單,如果超時(shí)了則拋出異常。在 TimeOut 中有幾個(gè)變量用于設(shè)定超時(shí)的時(shí)間:

private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;

由于 throwIfReached 是在每次讀取數(shù)據(jù)之前調(diào)用并且與數(shù)據(jù)讀取在同一個(gè)線程,所以如果讀取操作阻塞,則無(wú)法及時(shí)拋出異常。

異步超時(shí)

異步超時(shí)與同步超時(shí)不同,其開(kāi)了新的線程用于檢測(cè)是否超時(shí),下面是 Socket 的例子。

Okio 可以接受一個(gè) Socket 對(duì)象構(gòu)建 Source,代碼如下:

public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    // 返回 timeout 封裝的 source
    return timeout.source(source);
  }

相比于 InputStream,這里的額外操作是引入了 AsyncTimeout 來(lái)封裝 socket。timeout 方法生成一個(gè) AsyncTimeout 對(duì)象,看一下代碼:

private static AsyncTimeout timeout(final Socket socket) {
    return new AsyncTimeout() {
      @Override protected IOException newTimeoutException(@Nullable IOException cause) {
        InterruptedIOException ioe = new SocketTimeoutException("timeout");
        if (cause != null) {
          ioe.initCause(cause);
        }
        return ioe;
      }
        // 超時(shí)后調(diào)用
      @Override protected void timedOut() {
        try {
          socket.close();
        } catch (Exception e) {
          logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) {
            logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
          } else {
            throw e;
          }
        }
      }
    };
  }

上面的代碼生成了一個(gè)匿名的 AsyncTimeout,其中有個(gè) timedout 方法,這個(gè)方法是在超時(shí)的時(shí)候被調(diào)用,可以看出里面的操作主要是關(guān)閉 socket。有了 AsyncTimeout 之后,調(diào)用其 source 方法來(lái)封裝 socketInputStream。

下面具體看看 AsyncTimeout 。

AsyncTimeout

AsyncTimeout 繼承了 Timeout,提供了異步的超時(shí)機(jī)制。每一個(gè) AsyncTimeout 對(duì)象包裝一個(gè) source,并與其它 AsyncTimeout 組成一個(gè)鏈表,根據(jù)超時(shí)時(shí)間的長(zhǎng)短插入。AsyncTimeout 內(nèi)部會(huì)新開(kāi)一個(gè)叫做 WatchDog 的線程,根據(jù)超時(shí)時(shí)間依次處理 AsyncTimout 鏈表的節(jié)點(diǎn)。

下面是 AsyncTimeout 的一些內(nèi)部變量:

// 鏈表頭結(jié)點(diǎn)
static @Nullable AsyncTimeout head;
// 此節(jié)點(diǎn)是否在隊(duì)列中
private boolean inQueue;
// 鏈表中下一個(gè)節(jié)點(diǎn)
private @Nullable AsyncTimeout next;

其中 head 是鏈表的頭結(jié)點(diǎn),next 是下一個(gè)節(jié)點(diǎn),inQueue 則標(biāo)識(shí)此 AsyncTimeout 是否處于鏈表中。

在上面的 Okio.source(Socket socket) 中,最后返回的是 timeout.source(socket),下面是其代碼:

public final Source source(final Source source) {
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        boolean throwOnTimeout = false;
        // enter
        enter();
        try {
          long result = source.read(sink, byteCount);
          throwOnTimeout = true;
          return result;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public void close() throws IOException {
        boolean throwOnTimeout = false;
        try {
          source.close();
          throwOnTimeout = true;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }

      @Override public Timeout timeout() {
        return AsyncTimeout.this;
      }

      @Override public String toString() {
        return "AsyncTimeout.source(" + source + ")";
      }
    };
  }

AsyncTimtout#source 依然是返回一個(gè)匿名的 Source 對(duì)象,只不過(guò)是將參數(shù)中真正的 source 包裝了一下,在 source.read 之前添加了 enter 方法,在 catch 以及 finally 中添加了 exit 方法。enterexit 是重點(diǎn),其中 enter 中會(huì)將當(dāng)前的 AsyncTimeout 加入鏈表,具體代碼如下:

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
  }
private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // 如果鏈表為空,則新建一個(gè)頭結(jié)點(diǎn),并且啟動(dòng) Watchdog線程
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    long now = System.nanoTime();
    if (timeoutNanos != 0 && hasDeadline) {
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      node.timeoutAt = node.deadlineNanoTime();
    } else {
      throw new AssertionError();
    }

    // 按時(shí)間將節(jié)點(diǎn)插入鏈表
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
  }

真正插入鏈表的操作在 scheduleTimeout 中,如果 head 節(jié)點(diǎn)還不存在則新建一個(gè)頭結(jié)點(diǎn),并且啟動(dòng) Watchdog 線程。接著就是計(jì)算超時(shí)時(shí)間,然后遍歷鏈表進(jìn)行插入。如果插入在鏈表的最前面(head 節(jié)點(diǎn)后面的第一個(gè)節(jié)點(diǎn)),則主動(dòng)進(jìn)行喚醒 Watchdog 線程,從這里可以猜到 Watchdog 線程在等待超時(shí)的過(guò)程中是調(diào)用了 AsyncTimeout.classwait 進(jìn)入了休眠狀態(tài)。那么就來(lái)看看 WatchDog 線程的實(shí)際邏輯:

private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }


WatchDog 主要是調(diào)用 awaitTimeout() 獲取一個(gè)已超時(shí)的 timeout,如果不為空并且是 head 節(jié)點(diǎn),說(shuō)明鏈表中已經(jīng)沒(méi)有其它節(jié)點(diǎn),可以結(jié)束線程,否則調(diào)用 timedOut.timedOut()timeOut() 是一個(gè)空方法,由用戶實(shí)現(xiàn)超時(shí)后應(yīng)該采取的操作。 awaitTimeout 是獲取超時(shí)節(jié)點(diǎn)的方法:

  static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // 隊(duì)列為空的話等待有節(jié)點(diǎn)進(jìn)入隊(duì)列或者達(dá)到超時(shí)IDLE_TIMEOUT_MILLIS的時(shí)間
    if (node == null) {
      long startNanos = System.nanoTime();
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
          ? head  // The idle timeout elapsed.
          : null; // The situation has changed.
    }

    // 計(jì)算等待時(shí)間
    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      // 調(diào)用 wait
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }

    // 第一個(gè)節(jié)點(diǎn)超時(shí),移除并返回這個(gè)節(jié)點(diǎn)
    head.next = node.next;
    node.next = null;
    return node;
  }

enter 相反,exit 則是視情況拋出異常并且移除鏈表中的節(jié)點(diǎn),這里就不放具體代碼了。

總結(jié)

Okio 通過(guò) Timeout 以及 AsyncTimeout 分別提供了同步超時(shí)和異步超時(shí)功能,同步超時(shí)是在每次讀取數(shù)據(jù)前判斷是否超時(shí),異步超時(shí)則是將 AsyncTimeout 組成有序鏈表,并且開(kāi)啟一個(gè)線程來(lái)監(jiān)控,到達(dá)超時(shí)則觸發(fā)相關(guān)操作。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容