場(chǎng)景
假設(shè)我們需要上傳一組動(dòng)態(tài)增加的數(shù)據(jù), 輸入端可以看作inputSteam, 輸入端是outputSteam, 但是輸入和輸出端不能直接對(duì)接, 那么我們要怎樣實(shí)現(xiàn)呢?
我希望的解決方案時(shí), 輸入和輸出通過一個(gè)"數(shù)據(jù)池"間接連接, 輸入端把數(shù)據(jù)寫到數(shù)據(jù)池中, 輸出端從數(shù)據(jù)池中讀數(shù)據(jù), 這里要求數(shù)據(jù)池有"阻塞"功能, 即數(shù)據(jù)池滿了阻塞輸入端, 數(shù)據(jù)池空了, 阻塞輸出端.
以上效果可以使用PipedInputStream和PipedOutputStream實(shí)現(xiàn).
前言
- 這兩個(gè)類需要配套使用, 可以實(shí)現(xiàn)管道(pipe)傳輸數(shù)據(jù)
- 默認(rèn)的使用方式是, 通過管道連接線程A和B, 在A線程使用
PipedOutputStream寫數(shù)據(jù), 數(shù)據(jù)緩存到"管道"后, B線程使用PipedInputStream讀取數(shù)據(jù), 以此完成數(shù)據(jù)傳輸, 如果在同一個(gè)線程使用這兩個(gè)類, 可能導(dǎo)致死鎖
PipedOutputStream
PipedOutputStream是管道的發(fā)送端. 寫線程通過它來往"管道"填充數(shù)據(jù).
我們先看看它有哪幾個(gè)方法, 從命名和注釋基本就能知道每個(gè)方法的作用
// 關(guān)聯(lián)PipedInputStream
public void connect(PipedInputStream snk)
// 寫一個(gè)數(shù)據(jù)
public void write(int b)
// 寫一段數(shù)據(jù)
public void write(byte b[], int off, int len)
// 通知讀線程, 管道中有數(shù)據(jù)等待讀取
public void flush()
// 關(guān)閉發(fā)送端, 不再發(fā)送數(shù)據(jù)
public void close()
以上注釋已經(jīng)大致說明了這個(gè)類的功能了, 接著我們逐個(gè)方法分析
connect
public synchronized void connect(PipedInputStream snk) throws IOException {
// 先確保
// 1. 連接對(duì)象(輸入的snk)不能為空
// 2. 不能重復(fù)連接
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
從上可以看出, connect方法就是修改連接的PipedInputStream的成員變量, 使其處于已連接狀態(tài).
write
public void write(int b) throws IOException {
// 確保sink不為空, 即確保已經(jīng)連接
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
// 先確保
// 1. 已經(jīng)連接
// 2. 輸出數(shù)組b不為空
// 3. off和len不會(huì)導(dǎo)致數(shù)組越界
if (sink == null) {
// ...
} else if (len == 0) {
// 如果len == 0, 表示不讀取數(shù)據(jù), 所以可以直接返回
return;
}
sink.receive(b, off, len);
}
從上可以看出, 兩個(gè)write方法, 最后都調(diào)用了響應(yīng)的PipedInputStream#receive方法, 這表明
數(shù)據(jù)存儲(chǔ)的地方和寫數(shù)據(jù)的具體邏輯都在
PipedInputStream中
后面我們?cè)僭敿?xì)分析.
flush
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
這個(gè)方法先嘗試獲取sink的鎖, 然后通過notifyAll()來調(diào)度線程, 在這里, 具體就是使讀線程開始讀取數(shù)據(jù), 這里涉及讀寫線程間的溝通調(diào)度問題, 在了解完PipedInputStream之后我們?cè)僦匦驴催@個(gè)問題.
close
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
這個(gè)方法就是簡單的調(diào)用了PipedInputStream#receivedLast()方法, 從方法名可以判斷出, 這個(gè)方法就是通知PipedInputStream, 數(shù)據(jù)已經(jīng)填充完畢.
總結(jié)
從上面的分析可以看出, PipedOutputStream基本就是一個(gè)"接口"類, 不會(huì)對(duì)數(shù)據(jù)進(jìn)行實(shí)際的操作, 也不承擔(dān)具體的職責(zé), 只負(fù)責(zé)把數(shù)據(jù)交給PipedInputStream處理.
下面我們接著分析最關(guān)鍵的PipedInputStream的源碼
PipedInputStream
成員變量
我們先看下關(guān)鍵的幾個(gè)變量
// 緩存數(shù)組, "管道"數(shù)據(jù)的存儲(chǔ)的地方
protected byte buffer[];
// 寫下一個(gè)數(shù)據(jù)時(shí), 保存到緩存數(shù)組的位置
// 小于0表示無可讀數(shù)據(jù), 緩存數(shù)組為空
// in == out時(shí)表示緩存數(shù)組已滿
protected int in = -1;
// 下一個(gè)被讀數(shù)據(jù)在緩存數(shù)組的位置
protected int out = 0;
看上面3個(gè)成員變量我們基本可以知道
"管道"內(nèi)部使用了數(shù)組來緩存寫入的數(shù)據(jù), 等待讀取. 通過
in和out兩個(gè)值來記錄數(shù)組的寫位置和讀位置
其余變量都是一些狀態(tài)標(biāo)識(shí)
// 寫數(shù)據(jù)端(輸入端)是否已經(jīng)關(guān)閉
boolean closedByWriter = false;
// 讀數(shù)據(jù)端(輸出端)是否已經(jīng)關(guān)閉
volatile boolean closedByReader = false;
// 是否處于已連接狀態(tài)
boolean connected = false;
// 記錄讀線程
Thread readSide;
// 記錄寫線程
Thread writeSide;
這些變量都是用于判斷當(dāng)前"管道"的狀態(tài)
其中readSide和writeSide是一種簡單的標(biāo)記讀寫線程的方式, 源碼注釋中也有說明這種方式并不可靠, 這種方式針對(duì)的應(yīng)該是兩條線程的情況, 所以我們使用的時(shí)候應(yīng)該盡量按照設(shè)計(jì)意圖來使用
在兩條線程中建立"管道"傳遞數(shù)據(jù), 寫線程寫數(shù)據(jù), 讀線程讀數(shù)據(jù).
構(gòu)造函數(shù)
它包含了好幾個(gè)構(gòu)造函數(shù), 我們只看參數(shù)最多的那個(gè)
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
最終都會(huì)要求我們調(diào)用上面的兩個(gè)方法, 都比較簡單就不貼代碼了
-
initPipe()里面對(duì)byte數(shù)組buffer變量進(jìn)行賦值, 也就是初始化緩沖區(qū)域 -
connect()方法直接調(diào)用了PipedOutputStream#connect, 上面已經(jīng)分析過了, 最終效果就是指明PipedOutputStream的連接對(duì)象, 改變connected變量的值, 使得PipedInputStream處于連接狀態(tài).
receive
通過上面PipedOutputStream的分析可以知道, 寫數(shù)據(jù)的方法會(huì)調(diào)用PipedInputStream的reveive方法, 所以我們首先分析這個(gè)方法, 了解寫數(shù)據(jù)的邏輯. 注意閱讀注釋!
// 寫單個(gè)數(shù)據(jù)
protected synchronized void receive(int b) throws IOException {
// 檢查當(dāng)前"管道"狀態(tài), 確保能夠讀寫數(shù)據(jù)
checkStateForReceive();
// 本方法由PipedOutputStream調(diào)用, 所以線程是寫線程, 記錄該線程
writeSide = Thread.currentThread();
if (in == out)
// in == out表示緩存數(shù)組已經(jīng)滿了, 阻塞線程等待
// 這里確保了未讀的緩存數(shù)據(jù)不會(huì)丟失
awaitSpace();
// 當(dāng)檢測(cè)到緩存數(shù)組有空間, 等待結(jié)束后, 會(huì)繼續(xù)執(zhí)行以下代碼
if (in < 0) {
// in小于0表示當(dāng)前無數(shù)據(jù), 設(shè)置讀, 寫位置都是0
in = 0;
out = 0;
}
// 寫操作
// 1. 把數(shù)據(jù)寫到目標(biāo)位置(in)
// 2. 后移in, 指明下一個(gè)寫數(shù)據(jù)的位置
buffer[in++] = (byte)(b & 0xFF);
// 如果in超出緩存長度, 回到0, 循環(huán)利用緩存數(shù)組
if (in >= buffer.length) {
in = 0;
}
}
注意該方法帶有synchronized關(guān)鍵字, 表明在該方法內(nèi), 會(huì)持有對(duì)象鎖, 我們留到最后再分析各個(gè)環(huán)節(jié)中, 對(duì)象鎖的歸屬問題.
在寫數(shù)據(jù)前會(huì)先通過checkStateForReceive檢查"管道"狀態(tài), 確保
- 當(dāng)前處于連接狀態(tài)
- 管道讀寫兩端都沒有被關(guān)閉
- 讀線程狀態(tài)正常
接著用writeSide記錄當(dāng)前線程為寫線程, 用來后續(xù)判斷線程狀態(tài);
然后判斷目標(biāo)位置(in), 如果in == out表明當(dāng)前緩存數(shù)組已經(jīng)滿了, 不能再寫數(shù)據(jù)了, 所以會(huì)通過awaitSpace()方法阻塞寫線程;
// 此時(shí)寫線程持有鎖
private void awaitSpace() throws IOException {
// 只有緩存數(shù)組已滿才需要等待
while (in == out) {
// 檢查管道狀態(tài), 防止在等待的過程中狀態(tài)發(fā)生變化
checkStateForReceive();
// 標(biāo)準(zhǔn)用法中僅涉及兩條線程, 所以這里可以認(rèn)為是通知讀線程讀數(shù)據(jù)
notifyAll();
try {
// 釋放對(duì)象鎖, 等待讀線程讀數(shù)據(jù), 調(diào)用后就會(huì)阻塞寫線程
// 1s后取消等待是為了再次檢查管道狀態(tài)
// 注意等待結(jié)束后, 鎖仍然在寫線程
wait(1000);
} catch (InterruptedException ex) {
// 直接拋出異常
IoUtils.throwInterruptedIoException();
}
}
}
以上基本可以概括為
緩存數(shù)組有空間時(shí)直接寫數(shù)據(jù), 無空間時(shí)阻塞寫線程, 直至有空間可以寫數(shù)據(jù)
接著分析寫一段數(shù)據(jù)的receive(byte[], int, int)方法, 注意閱讀注釋!
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
// len是需要寫進(jìn)緩存數(shù)據(jù)的總長度
// bytesToTransfer用來記錄剩余個(gè)數(shù)
int bytesToTransfer = len;
// 循環(huán)寫數(shù)據(jù)過程, 直至需要寫的數(shù)據(jù)全部處理完畢
while (bytesToTransfer > 0) {
if (in == out)
// in == out表示緩存區(qū)域已經(jīng)滿了, 阻塞線程等待
awaitSpace();
// nextTransferAmount用來記錄本次過程寫進(jìn)緩存的個(gè)數(shù)
int nextTransferAmount = 0;
if (out < in) {
// 因?yàn)閛ut必然大于等于0, 所以這里 0 <= out < int
// out < in 表示[in, buffer.length)和[0, out)兩個(gè)區(qū)間可以寫數(shù)據(jù)
// 先寫數(shù)據(jù)進(jìn)[in, buffer.length)區(qū)間, 避免處理頭尾連接的邏輯, 如果還有數(shù)據(jù)剩余, 留到下一個(gè)循環(huán)處理
nextTransferAmount = buffer.length - in;
} else if (in < out) {
// 注意in有可能為-1, 所以特殊判斷下
if (in == -1) {
// in == -1表示緩存數(shù)組為空, 整個(gè)數(shù)組都可以寫數(shù)據(jù)
// 從這里可知, 單次寫數(shù)據(jù)最大長度就是緩存數(shù)組的長度
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
// in < out 表示[in, out)區(qū)間可以寫數(shù)據(jù)
nextTransferAmount = out - in;
}
}
// 到這里nextTransferAmount表示本次過程**最多**可以寫的數(shù)據(jù)
if (nextTransferAmount > bytesToTransfer)
// 位置比需要的多, 所以修改nextTransferAmount
nextTransferAmount = bytesToTransfer;
// 經(jīng)過上面的判斷, nextTransferAmount表示本次過程可以寫進(jìn)緩存的個(gè)數(shù)
assert(nextTransferAmount > 0);
// 把數(shù)據(jù)寫進(jìn)緩存
System.arraycopy(b, off, buffer, in, nextTransferAmount);
// 計(jì)算剩余個(gè)數(shù)
bytesToTransfer -= nextTransferAmount;
// 移動(dòng)數(shù)據(jù)起點(diǎn)
off += nextTransferAmount;
// 后移in
in += nextTransferAmount;
// 如果in超出緩存長度, 回到0
if (in >= buffer.length) {
in = 0;
}
}
}
代碼邏輯注釋已經(jīng)說明得很清楚了, 當(dāng)你需要處理頭尾相連的數(shù)組時(shí), 可以學(xué)習(xí)上面循環(huán)處理數(shù)據(jù)的方法, 邏輯清晰, 不需要太多的邊界判斷.
receiveLast
當(dāng)輸入端關(guān)閉時(shí)(調(diào)用PipedOutputStream#close()), 會(huì)調(diào)用receivedLast()
synchronized void receivedLast() {
// 標(biāo)記輸入端關(guān)閉
closedByWriter = true;
// 通知讀線程讀數(shù)據(jù)
notifyAll();
}
該方法使用變量標(biāo)記輸入端已經(jīng)關(guān)閉, 表示不會(huì)有新數(shù)據(jù)寫入了.
read
分析完寫數(shù)據(jù), 接下來該分析讀數(shù)據(jù)了.
public synchronized int read() throws IOException {
// synchronized關(guān)鍵字, 讀線程需要持有鎖才能讀數(shù)據(jù)
// 先檢查管道狀態(tài)
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
// 只要in >= 0, 表示還有數(shù)據(jù)沒有讀, 所以不拋出異常
// 這個(gè)判斷表明了, 即使輸入端已經(jīng)調(diào)用了close, 也能繼續(xù)讀已經(jīng)寫入的數(shù)據(jù)
throw new IOException("Write end dead");
}
// 記錄讀線程
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
// in<0表示緩存區(qū)域?yàn)榭? 只要輸入端沒有被關(guān)閉, 阻塞線程等待數(shù)據(jù)寫入, 即等待in >= 0
if (closedByWriter) {
// 輸入端關(guān)閉了, 同時(shí)in < 0, 表示數(shù)據(jù)傳輸完畢了, 返回-1
return -1;
}
// 檢查寫線程的狀態(tài), 線程狀態(tài)異常則認(rèn)為管道異常, 檢查2次
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
// 這里可以認(rèn)為是通知寫線程寫數(shù)據(jù)
notifyAll();
try {
// 阻塞線程, 等待1s, 這里會(huì)釋放鎖, 給機(jī)會(huì)寫線程獲取鎖, 寫數(shù)據(jù)
wait(1000);
} catch (InterruptedException ex) {
IoUtils.throwInterruptedIoException();
}
}
// 執(zhí)行到這里證明in >= 0, 即緩存數(shù)組中有數(shù)據(jù)
// 關(guān)鍵的讀操作
// 1. 讀取out指向的byte數(shù)據(jù)
// 2. 后移out
// 3. 把byte轉(zhuǎn)成int, 高位補(bǔ)0
int ret = buffer[out++] & 0xFF;
// out超出長度則回到位置0
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// 讀取的數(shù)據(jù)追上了輸入的數(shù)據(jù), 則當(dāng)前緩存區(qū)域?yàn)榭? 所以設(shè)置in = -1
in = -1;
}
return ret;
}
從上面的注釋分析可以知道
- 即使調(diào)用了
PipedOutputStream#close(), 只要管道中還有數(shù)據(jù), 仍可以讀數(shù)據(jù), 所以實(shí)際使用時(shí), 輸入端輸入完畢后可以直接close輸入端. - 當(dāng)管道中沒有數(shù)據(jù)時(shí), 會(huì)阻塞讀線程, 直至管道被關(guān)閉, 線程異常或者數(shù)據(jù)被寫入到管道中.
接著看看讀取一段數(shù)據(jù)的方法
public synchronized int read(byte b[], int off, int len) throws IOException {
// 參數(shù)byte[](下面稱輸出數(shù)組)是數(shù)據(jù)讀取后存放的地方, 所以要先檢查該數(shù)組
if (b == null) {
// 確保輸出數(shù)組不為null, 否則讀出的數(shù)據(jù)不能寫入
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
// 確保下標(biāo)不會(huì)越界
throw new IndexOutOfBoundsException();
} else if (len == 0) {
// len參數(shù)表示需要讀取的長度, 等于0時(shí)相當(dāng)于不讀數(shù)據(jù), 所以直接返回
return 0;
}
// 先單獨(dú)讀一個(gè)數(shù)據(jù)是為了確保已經(jīng)有數(shù)據(jù)寫入, 因?yàn)槿绻?dāng)前無數(shù)據(jù), 則會(huì)阻塞當(dāng)前的讀線程
int c = read();
// 返回值小于0(實(shí)際上只能是-1), 表示管道已經(jīng)沒有數(shù)據(jù)了, 所以這里也直接返回-1
if (c < 0) {
return -1;
}
// 把讀取到的第一個(gè)數(shù)據(jù)放到輸出數(shù)組, 看后面的代碼時(shí)緊記這里已經(jīng)讀了1個(gè)數(shù)據(jù)
b[off] = (byte) c;
// 記錄讀取到的數(shù)據(jù)長度
int rlen = 1;
// 循環(huán)條件:
// in >= 0確保還有數(shù)據(jù)可以讀
// len > 1確保只讀取外部請(qǐng)求的數(shù)據(jù)長度, 因?yàn)樯厦嬉呀?jīng)讀了1個(gè)數(shù)據(jù), 所以是大于1, 而不是大于0
while ((in >= 0) && (len > 1)) {
// available用來記錄當(dāng)前可以讀取的數(shù)據(jù)
int available;
if (in > out) {
// in > out表示[out, in)區(qū)間數(shù)據(jù)可讀
// in的值正常情況下是不會(huì)大于buffer.length的, 因?yàn)楫?dāng) in == buffer.length時(shí), in就會(huì)賦值0
// 這里的Math.min顯得有點(diǎn)多余, 可能是為了以防萬一吧
available = Math.min((buffer.length - out), (in - out));
} else {
// 首先in是不會(huì)等于out的, 因?yàn)槿绻嗟? 在上面讀第一個(gè)數(shù)據(jù)的時(shí)候就會(huì)把in賦值-1, 也就不會(huì)進(jìn)入這個(gè)循環(huán)
// 當(dāng)in < out表示[out, buffer.length)和[0, in)兩個(gè)區(qū)間的數(shù)據(jù)可讀
// 和receive方法類似, 為了不處理跨邊界的情況, 先讀[out, buffer.length)區(qū)間數(shù)據(jù)
available = buffer.length - out;
}
// 外部已經(jīng)讀了一個(gè)數(shù)據(jù), 所以只需要讀(len - 1)個(gè)數(shù)據(jù)了
if (available > (len - 1)) {
available = len - 1;
}
// 經(jīng)過上面的判斷, available表示本次需要讀的數(shù)據(jù)長度
// 復(fù)制數(shù)據(jù)到輸出數(shù)組
System.arraycopy(buffer, out, b, off + rlen, available);
// 后移out變量
out += available;
// 記錄已經(jīng)讀到的數(shù)據(jù)量
rlen += available;
// 計(jì)算剩余需要讀的數(shù)據(jù)
len -= available;
// 如果已經(jīng)讀到緩存數(shù)組的尾部, 回到開頭
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// in == out表示已經(jīng)沒有數(shù)據(jù)可以讀了, 所以in賦值-1
in = -1;
}
}
return rlen;
}
上面的方法我們需要注意:
while方法體內(nèi)是不會(huì)阻塞讀線程的!while方法體內(nèi)是不會(huì)阻塞讀線程的!while方法體內(nèi)是不會(huì)阻塞讀線程的! 重要的事情說3遍~ 所以如果管道內(nèi)只有1個(gè)數(shù)據(jù), 那么讀取到輸出數(shù)組的就只有這1個(gè)數(shù)據(jù),read方法返回值會(huì)是1, 在讀取數(shù)據(jù)后處理輸出數(shù)組時(shí)需要特別注意這點(diǎn).
available
我們?cè)谧x數(shù)據(jù)前可以利用available()先看看管道中的數(shù)據(jù)個(gè)數(shù).
public synchronized int available() throws IOException {
if(in < 0)
// 管道中無數(shù)據(jù)
return 0;
else if(in == out)
// 緩存數(shù)組已滿
return buffer.length;
else if (in > out)
// [out, in)區(qū)間內(nèi)為有效數(shù)據(jù)
return in - out;
else
// in < out
// [in, out)區(qū)間為無效數(shù)據(jù), 其余為有效數(shù)據(jù), 所以長度為 buffer.length - (out - in)
return in + buffer.length - out;
}
到這里我們已經(jīng)把所有PipedOutputStream和PipedInputStream的所有方法分析完畢了~ 接著我們?cè)俜治鱿伦x寫過程中對(duì)象鎖的歸屬問題.
鎖
分析這部分我們先要了解下wait和notifyAll的作用, 可以參考知乎上這個(gè)回答java中的notify和notifyAll有什么區(qū)別? - 文龍的回答 - 知乎, 本文不再說明了, 重點(diǎn)理解鎖池和等待池概念
鎖池:假設(shè)線程A已經(jīng)擁有了某個(gè)對(duì)象(注意:不是類)的鎖,而其它的線程想要調(diào)用這個(gè)對(duì)象的某個(gè)synchronized方法(或者synchronized塊),由于這些線程在進(jìn)入對(duì)象的synchronized方法之前必須先獲得該對(duì)象的鎖的擁有權(quán),但是該對(duì)象的鎖目前正被線程A擁有,所以這些線程就進(jìn)入了該對(duì)象的鎖池中。
等待池:假設(shè)一個(gè)線程A調(diào)用了某個(gè)對(duì)象的wait()方法,線程A就會(huì)釋放該對(duì)象的鎖后,進(jìn)入到了該對(duì)象的等待池中
首先, 需要注意, PipedOutputStream中, 兩個(gè)write方法都沒有synchronized關(guān)鍵字, 所以我們不需要關(guān)心PipedOutputStream的對(duì)象鎖.
我們重點(diǎn)分析PipedInputStream里面, read和receive方法.
假設(shè)我們先調(diào)用receive寫數(shù)據(jù), 后調(diào)用read讀數(shù)據(jù)
當(dāng)我們寫數(shù)據(jù)時(shí), 進(jìn)入了receive方法, 因?yàn)?code>synchronized關(guān)鍵字, 此時(shí)寫線程會(huì)獲取到了對(duì)象鎖, 然后寫數(shù)據(jù)到管道中, 注意, 在這個(gè)過程中, 讀線程是不能通過read方法讀取數(shù)據(jù)的, 因?yàn)樽x線程獲取不了對(duì)象鎖, 如果這次寫操作中, 管道中的緩存數(shù)組滿了, 此時(shí)寫線程會(huì)進(jìn)入awaitSpace()方法, 在該方法內(nèi), 寫線程先調(diào)用了notifyAll方法, 使讀線程進(jìn)入鎖池準(zhǔn)備競(jìng)爭對(duì)象鎖, 然后調(diào)用wait(1000)方法, 在這1s內(nèi), 寫線程釋放了對(duì)象鎖, 然后進(jìn)入等待池.
寫線程釋放對(duì)象鎖后, 讀線程就能夠獲取對(duì)象鎖, 進(jìn)入read方法內(nèi)了, 然后讀數(shù)據(jù), 只要管道中存在至少一個(gè)數(shù)據(jù), 就不會(huì)阻塞線程, 讀取數(shù)據(jù)后直接退出方法, 釋放對(duì)象鎖, 如果這次讀操作中, 管道中的緩存數(shù)組沒有任何數(shù)據(jù), 此時(shí)讀線程就會(huì)調(diào)用notifyAll方法, 使寫線程從等待池移到鎖池, 準(zhǔn)備競(jìng)爭對(duì)象鎖, 然后再調(diào)用wait(1000)方法, 在這1s內(nèi), 讀線程釋放對(duì)象鎖, 自己進(jìn)入等待池.
以上就是一次讀寫中, 對(duì)象鎖的轉(zhuǎn)移過程, 但是在實(shí)際過程中, 我們都是兩個(gè)線程在各自的循環(huán)體內(nèi)一直讀數(shù)據(jù)和一直寫數(shù)據(jù)的, 所以每一次循環(huán)的時(shí)候都會(huì)競(jìng)爭鎖, 可能先讀后寫, 或者先寫后讀.
總結(jié)
分析這兩個(gè)類的源碼我們應(yīng)該可以學(xué)習(xí)到
-
InputSteam和OutputSteam的接口含義 - 使用數(shù)組緩存數(shù)據(jù)的方法, 使用
while循環(huán)避免處理邊界問題 -
wait和notifyAll協(xié)調(diào)讀寫線程的邏輯 - 使用這兩個(gè)類實(shí)現(xiàn)傳輸數(shù)據(jù)流