Okio源碼分析

1 概述

Okio是一個(gè)對(duì)java.io和java.nio進(jìn)行補(bǔ)充的庫(kù),使數(shù)據(jù)訪問(wèn),保存和處理變得更容易。

Okio的主要功能是圍繞著B(niǎo)yteString 和 Buffer 兩個(gè)類展開(kāi)的:
1> ByteString是一個(gè)immutable的字節(jié)序列。在java中,String代表的是字符串,ByteString和String很相似,只不過(guò)是用來(lái)處理字節(jié)串的,同時(shí)也提供了常用的操作,比如對(duì)數(shù)據(jù)進(jìn)行十六進(jìn)制(hex)、base64UTF-8 格式的編碼和解碼,equals、substring等操作。

2> Buffer是一個(gè)mutable的字節(jié)序列。 和ArrayList類似,不需要提前設(shè)置緩沖區(qū)大小。讀取數(shù)據(jù)和寫入數(shù)據(jù)和隊(duì)列類似,從它的head讀取數(shù)據(jù),往它的tail寫入數(shù)據(jù),而且不用考慮容量、位置等因素。

java.io設(shè)計(jì)的一個(gè)優(yōu)雅部分是如何將stream分層以進(jìn)行加密和壓縮等轉(zhuǎn)換。 Okio包括自己的stream類型,稱為Source和Sink,和InputStream和OutputStream的工作方式類似,但有一些關(guān)鍵的區(qū)別:
1> Timeouts: 提供對(duì)底層I/O訪問(wèn)的超時(shí)機(jī)制。
2> Source和Sink的API非常簡(jiǎn)潔,易于實(shí)現(xiàn)。
3> 雖然Source和Sink的只提供了三個(gè)方法,但是BufferedSource和BufferedSink接口提供了更豐富的方法(比如針對(duì)不同類型的read和write方法),以應(yīng)對(duì)更加復(fù)雜的場(chǎng)景。
4> 不在區(qū)分byte stream和char stream,它們都是數(shù)據(jù),可以按照任意類型進(jìn)行讀寫。

2 Segment和SegmentPool

Segment的源碼不到200行,直接通過(guò)源碼來(lái)理解Segment的實(shí)現(xiàn)原理也是很簡(jiǎn)單的,首先來(lái)看一下Segment中的所有的字段:

/** Segment可以保存的最大字節(jié)數(shù) */
static final int SIZE = 8192;

/** Segment被共享時(shí)最小的字節(jié)數(shù) */
static final int SHARE_MINIMUM = 1024;

/** Segment中保存數(shù)據(jù)的字節(jié)數(shù)組 */
final byte[] data;

/** 字節(jié)數(shù)組data中被當(dāng)前Segment實(shí)例使用的區(qū)間的第一個(gè)字節(jié)的下標(biāo) */
int pos;

/** 字節(jié)數(shù)組data中被當(dāng)前Segment實(shí)例使用的區(qū)間之后的第一個(gè)字節(jié)的下標(biāo) */
int limit;

/** 代表字節(jié)數(shù)組data是否被 >=2 個(gè)Segment實(shí)例共用*/
boolean shared;

/** 代表字節(jié)數(shù)組data中最后一段被使用的區(qū)間是不是被當(dāng)前Segment實(shí)例占有*/
boolean owner;

/** 當(dāng)前Segment實(shí)例的后置節(jié)點(diǎn) */
Segment next;

/** 當(dāng)前Segment實(shí)例的前置節(jié)點(diǎn) */
Segment prev;

shared、owner的作用:
在向Segment中寫入數(shù)據(jù)時(shí),首先用owner判斷當(dāng)前Segment實(shí)例對(duì)應(yīng)的數(shù)據(jù)區(qū)間(字節(jié)數(shù)組data被使用的區(qū)間)之后是否可以寫入數(shù)據(jù),接著用shared判斷當(dāng)前Segment實(shí)例對(duì)應(yīng)的數(shù)據(jù)區(qū)間之前是否可以寫入數(shù)據(jù),體現(xiàn)在了Segment的writeTo方法中。

接下來(lái)依次分析Segment中的方法:

  /**
   * 從循環(huán)雙向鏈表中移除當(dāng)前Segment實(shí)例,返回當(dāng)前Segment實(shí)例的后置節(jié)點(diǎn)。
   */
  public @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

  /**
   * 在循環(huán)雙向鏈表中的當(dāng)前Segment實(shí)例之后插入segment實(shí)例,返回被插入的segment實(shí)例。
   */
  public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

上面的兩個(gè)方法相信大家一看就明白了,就不再贅敘了。

  /**
   * 將當(dāng)前Segment實(shí)例中的字節(jié)數(shù)組data進(jìn)行分割,從而得到兩個(gè)Segment實(shí)例. 
   * 字節(jié)數(shù)組data中[pos..pos+byteCount)區(qū)間的數(shù)據(jù)屬于第一個(gè)segment. 
   * [pos+byteCount..limit)區(qū)間的數(shù)據(jù)屬于第二個(gè)segment.
   *
   * 返回第一個(gè)segment.
   */
  public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = new Segment(this);
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

  /**
   * 當(dāng)當(dāng)前Segment實(shí)例的前置節(jié)點(diǎn)中的空閑空間可以容納當(dāng)前Segment實(shí)例中的數(shù)據(jù). 
   * 則將當(dāng)前Segment實(shí)例中的數(shù)據(jù)拷貝到前置節(jié)點(diǎn)中并且將當(dāng)前Segment實(shí)例回收到SegmentPool中。
   */
  public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

  /** 將當(dāng)前Segment實(shí)例中的前byteCount個(gè)字節(jié)的數(shù)據(jù)復(fù)制放到sink中 */
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }

上面的注釋已經(jīng)非常清晰了,這里就不再解釋了。

上面的compact方法中用到了SegmentPool.recycle(this)來(lái)回收Segment實(shí)例,那下面就來(lái)講解SegmentPool類,該類的存在就是為了避免GC churn(高頻率的創(chuàng)建和回收Segment實(shí)例會(huì)導(dǎo)致GC churn)和zero-fill(創(chuàng)建Segment實(shí)例時(shí)字節(jié)數(shù)組data需要zero-fill),SegmentPool實(shí)例中用一個(gè)單向的鏈表來(lái)保存回收的Segment實(shí)例,首先來(lái)看看Segment的源代碼:

/**
 * 用于保存被回收的Segment實(shí)例,該類的存在就是為了避免GC churn和zero-fill
 * SegmentPool實(shí)例是線程安全的靜態(tài)單例
 */
final class SegmentPool {
  /** SegmentPool實(shí)例中保存的最大字節(jié)數(shù),因此SegmentPool中最多保存8個(gè)Segment實(shí)例 */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** SegmentPool實(shí)例中是通過(guò)單向非循環(huán)的鏈表來(lái)保存數(shù)據(jù)的,next代表鏈表中的第一個(gè)Segment實(shí)例 */
  static @Nullable Segment next;

  /** SegmentPool實(shí)例中的字節(jié)總數(shù). */
  static long byteCount;

  private SegmentPool() {
  }

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }
}

是不是很簡(jiǎn)單,一共也不到70行,一共提供了兩個(gè)方法:
Segment take():從SegmentPool實(shí)例中獲取被回收的Segment實(shí)例,如果SegmentPool實(shí)例是空的,則創(chuàng)建一個(gè)Segment實(shí)例返回。
void recycle(Segment segment):回收segment實(shí)例。

3 Buffer

Buffer內(nèi)部使用Segment的雙向鏈表來(lái)保存數(shù)據(jù),Segment內(nèi)部使用字節(jié)數(shù)組保存數(shù)據(jù)。 將數(shù)據(jù)從一個(gè)Buffer移動(dòng)到另一個(gè)Buffer時(shí),會(huì)通過(guò)轉(zhuǎn)讓Segment的所有權(quán),而不用拷貝數(shù)據(jù),從而節(jié)省性能上的開(kāi)銷。下面通過(guò)一張圖來(lái)描述一下Buffer中雙向循環(huán)鏈表和SegmentPool單向非循環(huán)鏈表:


左邊對(duì)應(yīng)Buffer,右邊對(duì)應(yīng)SegmentPool

下面通過(guò)一張類圖來(lái)整體的描述一下Buffer:


為了更加清晰的理解上圖,就需要簡(jiǎn)單的了解一下裝飾者模式
1> 定義:Attach additional responsibilities to an object dynamically keeping the same interface.Decorators provide a flexible alternative to subclassing for extending functionality. (動(dòng)態(tài)的給一個(gè)對(duì)象添加額外的職責(zé)。就增加功能來(lái)說(shuō),裝飾者模式相比生成子類更加靈活。)
2> 裝飾者模式通用類圖

說(shuō)明一下類圖中的四個(gè)角色:
Component抽象組件:Component是一個(gè)接口或者是抽象類,在裝飾者模式中,必然有一個(gè)最基本、最核心、最原始的接口或抽象類充當(dāng)Component抽象組件。對(duì)應(yīng)于在Okio框架中的BufferedSource和BufferedSink接口。
ConcreteComponent具體組件:對(duì)Component抽象組件的實(shí)現(xiàn),將要被裝飾的類。對(duì)應(yīng)于Okio框架中的Buffer。
Decorator裝飾者:一般是一個(gè)抽象類,繼承至Component抽象組件。一定擁有一個(gè)指向Component抽象組件的priavte字段。
ConcreteDecorator具體裝飾者:對(duì)Decorator裝飾者的實(shí)現(xiàn),用來(lái)裝飾ConcreteComponent具體組件。
在Okio框架中沒(méi)有細(xì)分Decorator和ConcreteDecorator,只有兩個(gè)具體裝飾類RealBufferedSource和RealBufferedSink。

Source是用來(lái)對(duì)數(shù)據(jù)來(lái)源的封裝,Sink是對(duì)數(shù)據(jù)消費(fèi)的封裝,在Okio工具類中,為Source提供了四種數(shù)據(jù)來(lái)源:Socket、InputStream、File和Path,同樣為Sink提供了四種數(shù)據(jù)消費(fèi):Socket、OutputStream、File和Path,接下來(lái)針對(duì)Socket舉例分析:

public void testSocket(Socket socket) {
    try {
        Source source = Okio.source(socket);
        BufferedSource bufferedSource = Okio.buffer(source);
        bufferedSource.timeout().timeout(500, TimeUnit.MILLISECONDS);
        String data = bufferedSource.readString(Charset.forName("UTF-8"));
    } catch (IOException e) {
        e.printStackTrace();
    }
}

上面就是使用Okio框架讀數(shù)據(jù)的過(guò)程,下面我們就來(lái)看看源碼中是如何實(shí)現(xiàn)的:

   // BufferedSource的方法:
  @Override public String readString(Charset charset) throws IOException {
    if (charset == null) throw new IllegalArgumentException("charset == null");

    buffer.writeAll(source);
    return buffer.readString(charset);
  }

  // Buffered的方法:
  @Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
    }
    return totalBytesRead;
  }

  @Override public String readString(Charset charset) {
    try {
      return readString(size, charset);
    } catch (EOFException e) {
      throw new AssertionError(e);
    }
  }

  @Override public String readString(long byteCount, Charset charset) throws EOFException {
    checkOffsetAndCount(size, 0, byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    if (byteCount > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
    }
    if (byteCount == 0) return "";

    Segment s = head;
    if (s.pos + byteCount > s.limit) {
      // If the string spans multiple segments, delegate to readBytes().
      return new String(readByteArray(byteCount), charset);
    }

    String result = new String(s.data, s.pos, (int) byteCount, charset);
    s.pos += byteCount;
    size -= byteCount;

    if (s.pos == s.limit) {
      head = s.pop();
      SegmentPool.recycle(s);
    }

    return result;
  }

  // Okio的方法:
  /**
   * Returns a new source that buffers reads from {@code source}. The returned
   * source will perform bulk reads into its in-memory buffer. Use this wherever
   * you read a source to get an ergonomic and efficient access to data.
   */
  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

  /**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

上面方法的流程可以概括如下:
1> 利用Source的public long read(Buffer sink, long byteCount)方法從Socket輸入流中讀取數(shù)據(jù)到Buffer實(shí)例中。
2> 接著調(diào)用Buffer的public String readString(Charset charset)方法將Buffer實(shí)例中的數(shù)據(jù)讀取到String對(duì)象中并且返回。
通過(guò)Okio框架寫數(shù)據(jù)的過(guò)程與讀數(shù)據(jù)的過(guò)程類似,只不過(guò)過(guò)程相反,就不再贅敘了。

下面給出Okio框架讀寫String數(shù)據(jù)的流程圖:


對(duì)于Okio框架讀寫其他類型數(shù)據(jù)也是類似的過(guò)程。

在上面的例子中還用到了TimeOut機(jī)制,其實(shí)Okio實(shí)現(xiàn)了兩種超時(shí)機(jī)制:
1> TimeOut 同步超時(shí)機(jī)制
利用throwIfReached方法在數(shù)據(jù)讀取過(guò)程中輪詢判斷是否超時(shí)。
2> AsyncTimeout 異步超時(shí)機(jī)制
由于通過(guò)Socket來(lái)讀寫數(shù)據(jù)會(huì)阻塞線程,所以用的是異步超時(shí)機(jī)制。
有興趣的同學(xué)可以自己閱讀源碼來(lái)分析超時(shí)機(jī)制。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容