關(guān)于OKio,之前在OKHttp中有提及到,事實上除去OKHttp以外,OKio也是一個非常好的庫,那么作為OKHttp底層IO操作庫究竟有什么特殊的本領(lǐng)呢?我們通過這篇文章做一個詳盡的了解。
OKio使用
Okio.sink(new File("***"));
Okio.sink(new FileOutputStream(new File("***")));
Okio.sink(new Socket("****",888));
Okio.source(new File("***"));
Okio.source(new FileInputStream(new File("***")));
Okio.source(new Socket("****",888));
BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));
bufferedSink.writeInt(1);
BufferedSource bufferedSource = Okio.buffer(Okio.source(new File("***")));
bufferedSource.readInt();
//網(wǎng)絡(luò)請求中可能會用到Gzip功能
GzipSink gzipSink = new GzipSink(Okio.sink(new File("***")));
BufferedSink bufferedSink1 = Okio.buffer(gzipSink);
bufferedSink1.writeInt(1);
使用就是如上了
寫入分析
BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));
bufferedSink.writeInt(1);
1.從這里分析首先是生成Sink對象
即
Okio.sink(new File("***"))
通過查看代碼發(fā)現(xiàn)最后調(diào)用了方法如下
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
可以看到這里返回了一個Sink類,當(dāng)然這里實現(xiàn)了Sink接口中的幾個方法,這些方法在后面都是有作用的
2.接下來是生成RealBufferedSink,并進行寫入操作
即
BufferedSink bufferedSink = Okio.buffer(Okio.sink(new File("***")));
這里調(diào)用了Okio的buffer方法如下
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
這里生成了RealBufferedSink對象,并將前面生成的sink對象傳進去了
RealBufferedSink類中有一個Buffer變量,而這個Buffer類則是整個寫入操作的實際操作部分
接下來我們看一下寫入操作,如下
bufferedSink.writeInt(1);
RealBufferedSink中writeInt實現(xiàn)方法如下
@Override public BufferedSink writeInt(int i) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeInt(i);
return emitCompleteSegments();
}
可以看到這里調(diào)用了Buffer的writeInt方法,如下
@Override public Buffer writeInt(int i) {
Segment tail = writableSegment(4);
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((i >>> 24) & 0xff);
data[limit++] = (byte) ((i >>> 16) & 0xff);
data[limit++] = (byte) ((i >>> 8) & 0xff);
data[limit++] = (byte) (i & 0xff);
tail.limit = limit;
size += 4;
return this;
}
這里實際是將數(shù)據(jù)寫入了Segment中,至于Segment是什么,我們后續(xù)在進行說明
我們看下調(diào)用的writableSegment方法,如下
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
可以看到這里的Segment是鏈表的結(jié)構(gòu),當(dāng)新寫入數(shù)據(jù),就通過鏈表加入數(shù)據(jù)
所以到這里,我們寫入的數(shù)據(jù)都是存儲在Segment中的
接下來看一下emitCompleteSegments方法,如下
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
可以看到這里我們寫入數(shù)據(jù)成功后,會調(diào)用sink的write方法,而這個write方法,就是前面Sink類中的方法,如下
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
可以看到這里獲取之前存儲在Segment中的數(shù)據(jù),然后用OutputStream的write方法,將數(shù)據(jù)寫入到需要寫入的地方,然后調(diào)用Segment的pop方法,釋放之前存儲的消息
其他的寫入方法與上述的方法類型,不概述了
讀取分析
同樣的流程就不分析了,就從讀取方法開始分析,如下
bufferedSource.readInt();
具體實現(xiàn)在RealBufferedSource中的readInt方法
@Override public int readInt() throws IOException {
require(4);
return buffer.readInt();
}
這里調(diào)用了require方法,而require方法調(diào)用request方法,這個方法調(diào)用了source.read(buffer, Segment.SIZE)
接下來看下這個read方法,如下
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
可以看到這里通過InputStream的read方法,將數(shù)據(jù)讀取到Segment數(shù)據(jù)中。
接下來看一下Buffer的readInt方法,如下
@Override public int readInt() {
if (size < 4) throw new IllegalStateException("size < 4: " + size);
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
// If the int is split across multiple segments, delegate to readByte().
if (limit - pos < 4) {
return (readByte() & 0xff) << 24
| (readByte() & 0xff) << 16
| (readByte() & 0xff) << 8
| (readByte() & 0xff);
}
byte[] data = segment.data;
int i = (data[pos++] & 0xff) << 24
| (data[pos++] & 0xff) << 16
| (data[pos++] & 0xff) << 8
| (data[pos++] & 0xff);
size -= 4;
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
return i;
}
可以看到這里從之前存儲在Segment中獲取數(shù)據(jù)并返回。
其他的讀取方法與上述的方法類型,不概述了
Segment
Segment字面翻譯就是片段,Okio將數(shù)據(jù)也就是Buffer分割成一塊塊的片段,同時segment擁有前置節(jié)點和后置節(jié)點,構(gòu)成一個雙向循環(huán)鏈表,就像下面這個圖的方式。

這樣采取分片使用鏈表連接,片中使用數(shù)組存儲,兼具讀的連續(xù)性,以及寫的可插入性,對比單一使用鏈表或者數(shù)組,是一種折中的方案,讀寫更快,而且有個好處根據(jù)需求改動分片的大小來權(quán)衡讀寫的業(yè)務(wù)操作,另外,segment也有一些內(nèi)置的優(yōu)化操作,綜合這些Okio才能大放異彩
Segment中成員變量如下
static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment pre;
Segment next;
SIZE就是一個segment的最大字節(jié)數(shù),其中還有一個SHARE_MINIMUM,這個涉及到segment優(yōu)化中的另一個技巧,共享內(nèi)存,然后data就是保存的字節(jié)數(shù)組,pos,limit就是開始和結(jié)束點的index,shared和owner用來設(shè)置狀態(tài)判斷是否可寫,一個有共享內(nèi)存的segment是不能寫入的,pre,next就是前置后置節(jié)點。
Segment方法分析
既然是雙向循環(huán)鏈表,其中也會有一些操作的方法,比如
public Segment pop(){
Segment result = next != this ? next : null;
pre.next = next;
next.pre = pre;
next = null;
pre = null;
return result;
}
pop方法移除了自己,首先將自己的前后兩個節(jié)點連接起來,然后將自己的前后引用置空,這樣就脫離了整個雙向鏈表,然后返回next
public Segment push(Segment segment){
segment.pre = this;
segment.next = next;
next.pre = segment;
next = segment;
return segment;
}
push方法就是在當(dāng)前和next引用中間插入一個segment進來,并且返回插入的segment,這兩個都是尋常的雙向鏈表的操作,我們再來看看如何寫入數(shù)據(jù)
public void writeTo(Segment sink , int byteCount){
if (!sink.owner)
throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE){ //limit和需要寫的字節(jié)總和大于SIZE
if (sink.shared) //共享無法寫
throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE){ //如果減去頭依然比SIZE大 那么就無法寫拋異常
throw new IllegalArgumentException();
}
//否則我們需要先移動要寫的文件地址 然后置limit pos的地址
System.arraycopy(sink.data,sink.pos,sink.data,0,sink.limit - sink.pos);
sink.limit = sink.limit - sink.pos;
sink.pos = 0;
}
//開始尾部寫入 寫完置limit地址
System.arraycopy(data,pos,sink.data,sink.limit,byteCount);
sink.limit = sink.limit + byteCount;
pos = pos + byteCount; //當(dāng)前索引后移
}
owner和Shared這兩個狀態(tài)目前看來是完全相反的,賦值都是同步賦值的,這里有點不明白存在兩個參數(shù)的意義,現(xiàn)在的功能主要是用來判斷如果是共享就無法寫,以免污染數(shù)據(jù),會拋出異常。當(dāng)然,如果要寫的字節(jié)大小加上原來的字節(jié)數(shù)大于單個segment的最大值也是會拋出異常,也存在一種情況就是雖然尾節(jié)點索引和寫入字節(jié)大小加起來超過,但是由于前面的pos索引可能因為read方法取出數(shù)據(jù),pos索引后移這樣導(dǎo)致可以容納數(shù)據(jù),這時就先執(zhí)行移動操作,使用系統(tǒng)的 System.arraycopy 方法來移動到pos為0的狀態(tài),更改pos和limit索引后再在尾部寫入byteCount數(shù)的數(shù)據(jù),寫完之后實際上原segment讀了byteCount的數(shù)據(jù),所以pos需要后移這么多。過程十分的清晰,比較好理解。
除了寫入數(shù)據(jù)之外,segment還有一個優(yōu)化的技巧,因為每個segment的片段size是固定的,為了防止經(jīng)過長時間的使用后,每個segment中的數(shù)據(jù)千瘡百孔,可能十分短的數(shù)據(jù)卻占據(jù)了一整個segment,所以有了一個壓縮機制。
public void compact(){
if (pre == this)
throw new IllegalStateException();
if (!pre.owner) // pre不可寫
return;
int byteCount = limit - pos;
int availableByteCount = SIZE - pre.limit + (pre.shared ? 0 : pre.pos); //前一個的剩余大小
if (byteCount > availableByteCount)
return;
writeTo(pre,byteCount); //將數(shù)據(jù)寫入到前一個的片段中
pop(); // 從雙向鏈表中移除當(dāng)前
SegmentPool.recycle(this); //加入到對象池中
}
照例如果前面是共享的那么不可寫,也就不能壓縮了,然后判斷前一個的剩余大小是否比當(dāng)前的大,有足夠的空間來容納數(shù)據(jù),調(diào)用前面的 writeTo 方法來寫數(shù)據(jù),寫完后移除當(dāng)前segment,然后通過 SegmentPool 來回收。
另一個技巧就是共享機制,為了減少數(shù)據(jù)復(fù)制帶來的性能開銷,segment存在一個共享機制。
public Segment split(int byteCount){
if (byteCount <= 0 || byteCount > limit - pos )
throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM){ //如果byteCount大于最小的共享要求大小
prefix = new Segment(this); //this這個構(gòu)造函數(shù)會
}else {
prefix = SegmentPool.take();
System.arraycopy(data,pos,prefix,0,byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos = pos + byteCount;
pre.push(prefix);
return prefix;
}
再回頭看剛才的 compact 中出現(xiàn)的 SegmentPool ,這個實際上是一個segment的對象池
static final long MAX_SIZE = 64 * 1024;
static Segment next;
static long byteCount;
同樣的有一個池子的上限,也就是64k,相當(dāng)于8個segment,next這個節(jié)點可以看出這個 SegmentPool 是按照單鏈表的方式進行存儲的,byteCount則是目前已有的大小。
SegmentPool方法分析
SegmentPool的方法十分的少,一個取,一個回收,十分簡潔。
/**
* take方法用來取數(shù)據(jù)
* 如果池子為空就創(chuàng)建一個空對象 owner true | share false
* next是鏈表的頭 就是一個簡單的取表頭的操作
* @return
*/
static Segment take(){
synchronized (SegmentPool.class){
if (next != null){
Segment result = next;
next = result.next;
result.next = null;
byteCount = byteCount - Segment.SIZE;
return result;
}
}
return new Segment();
}
為了防止多線程同時操作造成數(shù)據(jù)的錯亂,這里加了鎖,這里的next命名雖然是next,但是實際上是整個對象池的頭,但是next為空,表示池子為空,直接返回一個空對象,否則從里面拿出next,并將next的下一個節(jié)點賦為next,置一下狀態(tài),這個方法就結(jié)束了
/**
* 如果當(dāng)前要回收的segment有前后引用或者是共享的 那么就回收失敗
* 如果加入后的大小超過了最大大小 也會失敗
* 然后將新回收的next指向表頭 也就是加到的鏈表的頭 并且將回收的segment置為next也就是head
* @param segment
*/
static void recycle(Segment segment){
if (segment.next != null || segment.pre != null)
throw new IllegalArgumentException();
if (segment.shared)
return;
synchronized (SegmentPool.class){
if (byteCount + Segment.SIZE > MAX_SIZE){
return;
}
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
如果要回收的segment有前后引用或者是共享的,就不能被回收,所以要回收前先將引用置空,同樣這里也加了鎖,以免那個同時回收超過池子最大的大小,然后就是將回收的插到表頭的操作。
所以SegmentPool無論是回收和取對象都是在表頭操作。
裝飾流
1.Forwarding流
public abstract class ForwardingSink implements Sink {
private final Sink delegate;
public ForwardingSink(Sink delegate) {
if (delegate == null) throw new IllegalArgumentException("delegate == null");
this.delegate = delegate;
}
/** {@link Sink} to which this instance is delegating. */
public final Sink delegate() {
return delegate;
}
@Override public void write(Buffer source, long byteCount) throws IOException {
delegate.write(source, byteCount);
}
@Override public void flush() throws IOException {
delegate.flush();
}
@Override public Timeout timeout() {
return delegate.timeout();
}
@Override public void close() throws IOException {
delegate.close();
}
@Override public String toString() {
return getClass().getSimpleName() + "(" + delegate.toString() + ")";
}
}
Forwarding流乍看之下似乎沒有什么實際用處,但是在實際作用中,常常用來作為匿名的代理對象,由于傳入的sink本身我們無法繼承和復(fù)寫,這樣我們可以直接采用裝飾Forwarding流的方式來監(jiān)控和攔截一些操作。
2.DeflaterSink和InflaterSource流
這對流主要對應(yīng)于zip壓縮流,類似ZipInputStream和ZipOutputStream,而實際上再ZipInputStream中實際上也用到了相同的API。
//code ZipInputStream.java
public class ZipInputStream {
public void read(...) {
try {
read = inf.inflate(buffer, byteOffset, byteCount);//引用了一個protected Inflater inf;對象
} catch (DataFormatException e) {
throw new ZipException(e.getMessage());
}
}
}
我們看到在ZipInputStream的流讀取處理的時候,用到了一個Inflater類型的對象,這個對象是專門用來處理Zip格式編碼的工具類。而在InflaterSource這個源去讀取數(shù)據(jù)的時候,也一樣用到了這個類:
while (true) {
boolean sourceExhausted = refill();
// Decompress the inflater's compressed data into the sink.
try {
Segment tail = sink.writableSegment(1);
int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit);
if (bytesInflated > 0) {
tail.limit += bytesInflated;
sink.size += bytesInflated;
return bytesInflated;
}
if (inflater.finished() || inflater.needsDictionary()) {
releaseInflatedBytes();
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop();
SegmentPool.recycle(tail);
}
return -1;
}
if (sourceExhausted) throw new EOFException("source exhausted prematurely");
} catch (DataFormatException e) {
throw new IOException(e);
}
}
3.GzipSource和GzipSink流
Gzip和zip的主要區(qū)別在于平臺通用性和壓縮率,一般情況下,Gzip的壓縮率更高點,Gzip是基于zip算法上的再改造。因此,在Gzip流中必須包裝原始流為一個InflaterSource流:
public GzipSink(Sink sink) {
if (sink == null) throw new IllegalArgumentException("sink == null");
this.deflater = new Deflater(DEFAULT_COMPRESSION, true /* No wrap */);
this.sink = Okio.buffer(sink);
this.deflaterSink = new DeflaterSink(this.sink, deflater);
writeHeader();
}
4.Pipe流
Okio中的pipe流類似生產(chǎn)者消費者的模式,例如在管道流PipeSource讀取的時候,發(fā)現(xiàn)buffer.size,也就是緩沖池數(shù)據(jù)長度為0的時候,管道PipeSource流陷入等待。一直等到PipeSink流往Buffer中再輸出的時候,阻塞消失。并且,管道的流一定是成對出現(xiàn)的。
final class PipeSource implements Source {
final Timeout timeout = new Timeout();
@Override public long read(Buffer sink, long byteCount) throws IOException {
synchronized (buffer) {
if (sourceClosed) throw new IllegalStateException("closed");
while (buffer.size() == 0) {// buffer為空
if (sinkClosed) return -1L;
timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.//陷入等待
}
long result = buffer.read(sink, byteCount);
buffer.notifyAll(); // Notify the sink that it can resume writing.
return result;
}
}
@Override public void close() throws IOException {
synchronized (buffer) {
sourceClosed = true;
buffer.notifyAll(); // Notify the sink that no more bytes are desired.
}
}
@Override public Timeout timeout() {
return timeout;
}
}