OKio解析

關(guān)于OKio,之前在OKHttp中有提及到,事實上除去OKHttp以外,OKio也是一個非常好的庫,那么作為OKHttp底層IO操作庫究竟有什么特殊的本領(lǐng)呢?我們通過這篇文章做一個詳盡的了解。

OKio使用
Okio.sink(new File("***"));
Okio.sink(new FileOutputStream(new File("***")));
Okio.sink(new Socket("****",888));

Okio.source(new File("***"));
Okio.source(new FileInputStream(new File("***")));
Okio.source(new Socket("****",888));

BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));
bufferedSink.writeInt(1);

BufferedSource bufferedSource = Okio.buffer(Okio.source(new File("***")));
bufferedSource.readInt();

//網(wǎng)絡(luò)請求中可能會用到Gzip功能
GzipSink gzipSink = new GzipSink(Okio.sink(new File("***")));
BufferedSink bufferedSink1 = Okio.buffer(gzipSink);
bufferedSink1.writeInt(1);

使用就是如上了

寫入分析
BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));
bufferedSink.writeInt(1);
1.從這里分析首先是生成Sink對象

Okio.sink(new File("***"))

通過查看代碼發(fā)現(xiàn)最后調(diào)用了方法如下

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

可以看到這里返回了一個Sink類,當(dāng)然這里實現(xiàn)了Sink接口中的幾個方法,這些方法在后面都是有作用的

2.接下來是生成RealBufferedSink,并進行寫入操作

BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));

這里調(diào)用了Okio的buffer方法如下

  public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }

這里生成了RealBufferedSink對象,并將前面生成的sink對象傳進去了

RealBufferedSink類中有一個Buffer變量,而這個Buffer類則是整個寫入操作的實際操作部分

接下來我們看一下寫入操作,如下

bufferedSink.writeInt(1);

RealBufferedSink中writeInt實現(xiàn)方法如下

  @Override public BufferedSink writeInt(int i) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.writeInt(i);
    return emitCompleteSegments();
  }

可以看到這里調(diào)用了Buffer的writeInt方法,如下

@Override public Buffer writeInt(int i) {
    Segment tail = writableSegment(4);
    byte[] data = tail.data;
    int limit = tail.limit;
    data[limit++] = (byte) ((i >>> 24) & 0xff);
    data[limit++] = (byte) ((i >>> 16) & 0xff);
    data[limit++] = (byte) ((i >>>  8) & 0xff);
    data[limit++] = (byte)  (i         & 0xff);
    tail.limit = limit;
    size += 4;
    return this;
  }

這里實際是將數(shù)據(jù)寫入了Segment中,至于Segment是什么,我們后續(xù)在進行說明

我們看下調(diào)用的writableSegment方法,如下

  Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }

    Segment tail = head.prev;
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

可以看到這里的Segment是鏈表的結(jié)構(gòu),當(dāng)新寫入數(shù)據(jù),就通過鏈表加入數(shù)據(jù)

所以到這里,我們寫入的數(shù)據(jù)都是存儲在Segment中的

接下來看一下emitCompleteSegments方法,如下

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }

可以看到這里我們寫入數(shù)據(jù)成功后,會調(diào)用sink的write方法,而這個write方法,就是前面Sink類中的方法,如下

@Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

可以看到這里獲取之前存儲在Segment中的數(shù)據(jù),然后用OutputStream的write方法,將數(shù)據(jù)寫入到需要寫入的地方,然后調(diào)用Segment的pop方法,釋放之前存儲的消息

其他的寫入方法與上述的方法類型,不概述了

讀取分析

同樣的流程就不分析了,就從讀取方法開始分析,如下

bufferedSource.readInt();

具體實現(xiàn)在RealBufferedSource中的readInt方法

@Override public int readInt() throws IOException {
    require(4);
    return buffer.readInt();
  }

這里調(diào)用了require方法,而require方法調(diào)用request方法,這個方法調(diào)用了source.read(buffer, Segment.SIZE)

接下來看下這個read方法,如下

@Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

可以看到這里通過InputStream的read方法,將數(shù)據(jù)讀取到Segment數(shù)據(jù)中。

接下來看一下Buffer的readInt方法,如下

  @Override public int readInt() {
    if (size < 4) throw new IllegalStateException("size < 4: " + size);

    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;

    // If the int is split across multiple segments, delegate to readByte().
    if (limit - pos < 4) {
      return (readByte() & 0xff) << 24
          |  (readByte() & 0xff) << 16
          |  (readByte() & 0xff) <<  8
          |  (readByte() & 0xff);
    }

    byte[] data = segment.data;
    int i = (data[pos++] & 0xff) << 24
        |   (data[pos++] & 0xff) << 16
        |   (data[pos++] & 0xff) <<  8
        |   (data[pos++] & 0xff);
    size -= 4;

    if (pos == limit) {
      head = segment.pop();
      SegmentPool.recycle(segment);
    } else {
      segment.pos = pos;
    }

    return i;
  }

可以看到這里從之前存儲在Segment中獲取數(shù)據(jù)并返回。

其他的讀取方法與上述的方法類型,不概述了

Segment

Segment字面翻譯就是片段,Okio將數(shù)據(jù)也就是Buffer分割成一塊塊的片段,同時segment擁有前置節(jié)點和后置節(jié)點,構(gòu)成一個雙向循環(huán)鏈表,就像下面這個圖的方式。

![](/Users/admin/Documents/Life/LibReadProgejct/2799272-acef05808a0c1e58.png =500x200)

這樣采取分片使用鏈表連接,片中使用數(shù)組存儲,兼具讀的連續(xù)性,以及寫的可插入性,對比單一使用鏈表或者數(shù)組,是一種折中的方案,讀寫更快,而且有個好處根據(jù)需求改動分片的大小來權(quán)衡讀寫的業(yè)務(wù)操作,另外,segment也有一些內(nèi)置的優(yōu)化操作,綜合這些Okio才能大放異彩

Segment中成員變量如下

static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment pre;
Segment next;

SIZE就是一個segment的最大字節(jié)數(shù),其中還有一個SHARE_MINIMUM,這個涉及到segment優(yōu)化中的另一個技巧,共享內(nèi)存,然后data就是保存的字節(jié)數(shù)組,pos,limit就是開始和結(jié)束點的index,shared和owner用來設(shè)置狀態(tài)判斷是否可寫,一個有共享內(nèi)存的segment是不能寫入的,pre,next就是前置后置節(jié)點。

Segment方法分析

既然是雙向循環(huán)鏈表,其中也會有一些操作的方法,比如

public Segment pop(){
        Segment result = next != this ? next : null;
        pre.next = next;
        next.pre = pre;
        next = null;
        pre = null;
        return result;
    }

pop方法移除了自己,首先將自己的前后兩個節(jié)點連接起來,然后將自己的前后引用置空,這樣就脫離了整個雙向鏈表,然后返回next

public Segment push(Segment segment){
    segment.pre = this;
    segment.next = next;
    next.pre = segment;
    next = segment;
    return segment;
}

push方法就是在當(dāng)前和next引用中間插入一個segment進來,并且返回插入的segment,這兩個都是尋常的雙向鏈表的操作,我們再來看看如何寫入數(shù)據(jù)

public void writeTo(Segment sink , int byteCount){
    if (!sink.owner)
        throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE){  //limit和需要寫的字節(jié)總和大于SIZE
        if (sink.shared)  //共享無法寫
            throw new IllegalArgumentException();
        if (sink.limit + byteCount - sink.pos > SIZE){  //如果減去頭依然比SIZE大 那么就無法寫拋異常
            throw new IllegalArgumentException();
        }
        //否則我們需要先移動要寫的文件地址  然后置limit pos的地址
        System.arraycopy(sink.data,sink.pos,sink.data,0,sink.limit - sink.pos);
        sink.limit = sink.limit - sink.pos;
        sink.pos = 0;
    }
    //開始尾部寫入 寫完置limit地址
    System.arraycopy(data,pos,sink.data,sink.limit,byteCount);
    sink.limit = sink.limit + byteCount;
    pos = pos + byteCount; //當(dāng)前索引后移
}

owner和Shared這兩個狀態(tài)目前看來是完全相反的,賦值都是同步賦值的,這里有點不明白存在兩個參數(shù)的意義,現(xiàn)在的功能主要是用來判斷如果是共享就無法寫,以免污染數(shù)據(jù),會拋出異常。當(dāng)然,如果要寫的字節(jié)大小加上原來的字節(jié)數(shù)大于單個segment的最大值也是會拋出異常,也存在一種情況就是雖然尾節(jié)點索引和寫入字節(jié)大小加起來超過,但是由于前面的pos索引可能因為read方法取出數(shù)據(jù),pos索引后移這樣導(dǎo)致可以容納數(shù)據(jù),這時就先執(zhí)行移動操作,使用系統(tǒng)的 System.arraycopy 方法來移動到pos為0的狀態(tài),更改pos和limit索引后再在尾部寫入byteCount數(shù)的數(shù)據(jù),寫完之后實際上原segment讀了byteCount的數(shù)據(jù),所以pos需要后移這么多。過程十分的清晰,比較好理解。

除了寫入數(shù)據(jù)之外,segment還有一個優(yōu)化的技巧,因為每個segment的片段size是固定的,為了防止經(jīng)過長時間的使用后,每個segment中的數(shù)據(jù)千瘡百孔,可能十分短的數(shù)據(jù)卻占據(jù)了一整個segment,所以有了一個壓縮機制。

public void compact(){
    if (pre == this)
        throw new IllegalStateException();
    if (!pre.owner) // pre不可寫 
        return;
    int byteCount = limit - pos;
    int availableByteCount = SIZE - pre.limit + (pre.shared ? 0 : pre.pos);  //前一個的剩余大小
    if (byteCount > availableByteCount)
        return;
    writeTo(pre,byteCount);   //將數(shù)據(jù)寫入到前一個的片段中
    pop();  // 從雙向鏈表中移除當(dāng)前
    SegmentPool.recycle(this);   //加入到對象池中
}

照例如果前面是共享的那么不可寫,也就不能壓縮了,然后判斷前一個的剩余大小是否比當(dāng)前的大,有足夠的空間來容納數(shù)據(jù),調(diào)用前面的 writeTo 方法來寫數(shù)據(jù),寫完后移除當(dāng)前segment,然后通過 SegmentPool 來回收。

另一個技巧就是共享機制,為了減少數(shù)據(jù)復(fù)制帶來的性能開銷,segment存在一個共享機制。

public Segment split(int byteCount){
    if (byteCount <= 0 || byteCount > limit - pos )
        throw new IllegalArgumentException();
    Segment prefix;
    if (byteCount >= SHARE_MINIMUM){  //如果byteCount大于最小的共享要求大小
        prefix = new Segment(this); //this這個構(gòu)造函數(shù)會
    }else {
        prefix = SegmentPool.take();
        System.arraycopy(data,pos,prefix,0,byteCount);
    }
    prefix.limit = prefix.pos + byteCount;
    pos = pos + byteCount;
    pre.push(prefix);
    return  prefix;
}

再回頭看剛才的 compact 中出現(xiàn)的 SegmentPool ,這個實際上是一個segment的對象池

    static final long MAX_SIZE = 64 * 1024;
    static Segment next;
    static long byteCount;

同樣的有一個池子的上限,也就是64k,相當(dāng)于8個segment,next這個節(jié)點可以看出這個 SegmentPool 是按照單鏈表的方式進行存儲的,byteCount則是目前已有的大小。

SegmentPool方法分析

SegmentPool的方法十分的少,一個取,一個回收,十分簡潔。

    /**
     * take方法用來取數(shù)據(jù)
     * 如果池子為空就創(chuàng)建一個空對象 owner true | share false
     * next是鏈表的頭 就是一個簡單的取表頭的操作
     * @return
     */
    static Segment take(){
        synchronized (SegmentPool.class){
            if (next != null){
                Segment result = next;
                next = result.next;
                result.next = null;
                byteCount = byteCount - Segment.SIZE;
                return result;
            }
        }
        return new Segment();
    }

為了防止多線程同時操作造成數(shù)據(jù)的錯亂,這里加了鎖,這里的next命名雖然是next,但是實際上是整個對象池的頭,但是next為空,表示池子為空,直接返回一個空對象,否則從里面拿出next,并將next的下一個節(jié)點賦為next,置一下狀態(tài),這個方法就結(jié)束了

    /**
     * 如果當(dāng)前要回收的segment有前后引用或者是共享的 那么就回收失敗
     * 如果加入后的大小超過了最大大小 也會失敗
     * 然后將新回收的next指向表頭 也就是加到的鏈表的頭 并且將回收的segment置為next也就是head
     * @param segment
     */
    static void recycle(Segment segment){
        if (segment.next != null || segment.pre != null)
            throw new IllegalArgumentException();
        if (segment.shared)
            return;
        synchronized (SegmentPool.class){
            if (byteCount + Segment.SIZE > MAX_SIZE){
                return;
            }
            byteCount += Segment.SIZE;
            segment.next = next;
            segment.pos = segment.limit = 0;
            next = segment;
        }
    }

如果要回收的segment有前后引用或者是共享的,就不能被回收,所以要回收前先將引用置空,同樣這里也加了鎖,以免那個同時回收超過池子最大的大小,然后就是將回收的插到表頭的操作。

所以SegmentPool無論是回收和取對象都是在表頭操作。

裝飾流

1.Forwarding流

public abstract class ForwardingSink implements Sink {
  private final Sink delegate;

  public ForwardingSink(Sink delegate) {
    if (delegate == null) throw new IllegalArgumentException("delegate == null");
    this.delegate = delegate;
  }

  /** {@link Sink} to which this instance is delegating. */
  public final Sink delegate() {
    return delegate;
  }

  @Override public void write(Buffer source, long byteCount) throws IOException {
    delegate.write(source, byteCount);
  }

  @Override public void flush() throws IOException {
    delegate.flush();
  }

  @Override public Timeout timeout() {
    return delegate.timeout();
  }

  @Override public void close() throws IOException {
    delegate.close();
  }

  @Override public String toString() {
    return getClass().getSimpleName() + "(" + delegate.toString() + ")";
  }
}

Forwarding流乍看之下似乎沒有什么實際用處,但是在實際作用中,常常用來作為匿名的代理對象,由于傳入的sink本身我們無法繼承和復(fù)寫,這樣我們可以直接采用裝飾Forwarding流的方式來監(jiān)控和攔截一些操作。

2.DeflaterSink和InflaterSource流

這對流主要對應(yīng)于zip壓縮流,類似ZipInputStream和ZipOutputStream,而實際上再ZipInputStream中實際上也用到了相同的API。

//code ZipInputStream.java
public class ZipInputStream {

    public void read(...) {
          try {
            read = inf.inflate(buffer, byteOffset, byteCount);//引用了一個protected Inflater inf;對象
        } catch (DataFormatException e) {
            throw new ZipException(e.getMessage());
        }
    }
}

我們看到在ZipInputStream的流讀取處理的時候,用到了一個Inflater類型的對象,這個對象是專門用來處理Zip格式編碼的工具類。而在InflaterSource這個源去讀取數(shù)據(jù)的時候,也一樣用到了這個類:

while (true) {
    boolean sourceExhausted = refill();

    // Decompress the inflater's compressed data into the sink.
    try {
      Segment tail = sink.writableSegment(1);
      int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit);
      if (bytesInflated > 0) {
        tail.limit += bytesInflated;
        sink.size += bytesInflated;
        return bytesInflated;
      }
      if (inflater.finished() || inflater.needsDictionary()) {
        releaseInflatedBytes();
        if (tail.pos == tail.limit) {
          // We allocated a tail segment, but didn't end up needing it. Recycle!
          sink.head = tail.pop();
          SegmentPool.recycle(tail);
        }
        return -1;
      }
      if (sourceExhausted) throw new EOFException("source exhausted prematurely");
    } catch (DataFormatException e) {
      throw new IOException(e);
    }
}

3.GzipSource和GzipSink流

Gzip和zip的主要區(qū)別在于平臺通用性和壓縮率,一般情況下,Gzip的壓縮率更高點,Gzip是基于zip算法上的再改造。因此,在Gzip流中必須包裝原始流為一個InflaterSource流:

public GzipSink(Sink sink) {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    this.deflater = new Deflater(DEFAULT_COMPRESSION, true /* No wrap */);
    this.sink = Okio.buffer(sink);
    this.deflaterSink = new DeflaterSink(this.sink, deflater);

    writeHeader();
}

4.Pipe流

Okio中的pipe流類似生產(chǎn)者消費者的模式,例如在管道流PipeSource讀取的時候,發(fā)現(xiàn)buffer.size,也就是緩沖池數(shù)據(jù)長度為0的時候,管道PipeSource流陷入等待。一直等到PipeSink流往Buffer中再輸出的時候,阻塞消失。并且,管道的流一定是成對出現(xiàn)的。

final class PipeSource implements Source {
  final Timeout timeout = new Timeout();

  @Override public long read(Buffer sink, long byteCount) throws IOException {
    synchronized (buffer) {
      if (sourceClosed) throw new IllegalStateException("closed");

      while (buffer.size() == 0) {// buffer為空
        if (sinkClosed) return -1L;
        timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.//陷入等待
      }

      long result = buffer.read(sink, byteCount);
      buffer.notifyAll(); // Notify the sink that it can resume writing.
      return result;
    }
  }

  @Override public void close() throws IOException {
    synchronized (buffer) {
      sourceClosed = true;
      buffer.notifyAll(); // Notify the sink that no more bytes are desired.
    }
  }

  @Override public Timeout timeout() {
    return timeout;
  }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容