概述
管道流是用來在多個線程之間進(jìn)行信息傳遞的Java流。
管道流分為字節(jié)流管道流和字符管道流。
字節(jié)管道流:PipedOutputStream 和 PipedInputStream。
字符管道流:PipedWriter 和 PipedReader。
PipedOutputStream、PipedWriter 是寫入者/生產(chǎn)者/發(fā)送者;
PipedInputStream、PipedReader 是讀取者/消費(fèi)者/接收者。
字節(jié)管道流
這里我們只分析字節(jié)管道流,字符管道流原理跟字節(jié)管道流一樣,只不過底層一個是 byte 數(shù)組存儲 一個是 char 數(shù)組存儲的。
java的管道輸入與輸出實際上使用的是一個循環(huán)緩沖數(shù)來實現(xiàn)的。輸入流PipedInputStream從這個循環(huán)緩沖數(shù)組中讀數(shù)據(jù),輸出流PipedOutputStream往這個循環(huán)緩沖數(shù)組中寫入數(shù)據(jù)。當(dāng)這個緩沖數(shù)組已滿的時候,輸出流PipedOutputStream所在的線程將阻塞;當(dāng)這個緩沖數(shù)組為空的時候,輸入流PipedInputStream所在的線程將阻塞。
注意事項
在使用管道流之前,需要注意以下要點:
- 管道流僅用于多個線程之間傳遞信息,若用在同一個線程中可能會造成死鎖;
- 管道流的輸入輸出是成對的,一個輸出流只能對應(yīng)一個輸入流,使用構(gòu)造函數(shù)或者connect函數(shù)進(jìn)行連接;
- 一對管道流包含一個緩沖區(qū),其默認(rèn)值為1024個字節(jié),若要改變緩沖區(qū)大小,可以使用帶有參數(shù)的構(gòu)造函數(shù);
- 管道的讀寫操作是互相阻塞的,當(dāng)緩沖區(qū)為空時,讀操作阻塞;當(dāng)緩沖區(qū)滿時,寫操作阻塞;
- 管道依附于線程,因此若線程結(jié)束,則雖然管道流對象還在,仍然會報錯“read dead end”;
- 管道流的讀取方法與普通流不同,只有輸出流正確close時,輸出流才能讀到-1值。
示例
public class PipedStreamDemo {
public static void main(String[] args) {
//創(chuàng)建一個線程池
ExecutorService executorService = Executors.newCachedThreadPool();
try {
//創(chuàng)建輸入和輸出管道流
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
//創(chuàng)建發(fā)送線程和接收線程
Sender sender = new Sender(pos);
Reciever reciever = new Reciever(pis);
//提交給線程池運(yùn)行發(fā)送線程和接收線程
executorService.execute(sender);
executorService.execute(reciever);
} catch (IOException e) {
e.printStackTrace();
}
//通知線程池,不再接受新的任務(wù),并執(zhí)行完成當(dāng)前正在運(yùn)行的線程后關(guān)閉線程池。
executorService.shutdown();
try {
//shutdown 后可能正在運(yùn)行的線程很長時間都運(yùn)行不完成,這里設(shè)置超過1小時,強(qiáng)制執(zhí)行 Interruptor 結(jié)束線程。
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Sender extends Thread {
private PipedOutputStream pos;
public Sender(PipedOutputStream pos) {
super();
this.pos = pos;
}
@Override
public void run() {
try {
String s = "hello world, amazing java !";
System.out.println("Sender:" + s);
byte[] buf = s.getBytes();
pos.write(buf, 0, buf.length);
pos.close();
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class Reciever extends Thread {
private PipedInputStream pis;
public Reciever(PipedInputStream pis) {
super();
this.pis = pis;
}
@Override
public void run() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int len = 0;
while ((len = pis.read(buf)) != -1) {
baos.write(buf, 0, len);
}
byte[] result = baos.toByteArray();
String s = new String(result, 0, result.length);
System.out.println("Reciever:" + s);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
輸出結(jié)果:

源碼分析
因為數(shù)據(jù)是從 PipedOutputStream 寫入,然后通過 PipedInputStream 讀取的,所以下面我們先來分析下 生產(chǎn)者 PipedOutputStream 的源碼。
PipedOutputStream 源碼分析
初始化

1、定義了一個 PipedInputStream 成員變量 sink。用來保存需要寫入到的目標(biāo)管道流中。
2、一個代參數(shù)的構(gòu)造,一個無參的構(gòu)造。
有參的構(gòu)造調(diào)用 connect() 方法把兩個管道流連接在一起,
-
無參的構(gòu)造函數(shù)更靈活,不必在創(chuàng)建一個 PipedOutputStream 的對象時指定 PipedInputStream 對象,可以在后面代碼,自己調(diào)用 connect() 自己指定。使用方式如下:
write 方法

write 方法就是調(diào)用 PipedInputStream的 receive 的方法,把要寫入的數(shù)據(jù)寫入進(jìn)去。
PipedOutputStream 總結(jié)
通過源碼分析,發(fā)現(xiàn)該類沒有什么特別的,通過構(gòu)造或者 connect() 方法接收一個 PipedInputStream對象,然后把要輸出信息,交給 PipedInputStream.receive() 方法去接收。
PipedInputStream 源碼分析
打開該類后發(fā)現(xiàn)比 PipedInputStream 類復(fù)雜了好多。
類結(jié)構(gòu)

PipedInputStream 中定義了很多成員變量
1、closedByWriter 是否關(guān)閉 PipedOutputStream 流。
2、closedByReader 是否關(guān)閉 PipedInputStream 流。
3、connected 輸入輸出管道流是否成功連接了。
4、readSide、writeSide 讀線程和寫線程
5、DEFAULT_PIPE_SIZE 默認(rèn)讀寫的緩沖區(qū)大小為 1024.
6、PIPE_SIZE 對外暴露管道流的讀寫緩沖區(qū)大?。ó?dāng)前包可見)
7、buffer 緩沖區(qū)大小
8、in 寫入緩沖區(qū)下標(biāo)
9、out 寫出緩沖區(qū)下標(biāo)
PipedInputStream 構(gòu)造及初始化

PipedInputStream 支持有4種構(gòu)造方法。
1、public PipedInputStream(PipedOutputStream src)
傳入一個 PipedOutputStream 參數(shù),并調(diào)用 initPipe() 方法創(chuàng)建默認(rèn)大?。?024)的 buffer。
2、public PipedInputStream(PipedOutputStream src, int pipeSize)
傳入一個 PipedOutputStream 參數(shù)和 pipeSize參數(shù),調(diào)用 initPipe() 方法創(chuàng)建指定大小的 buffer
3、public PipedInputStream()
調(diào)用 initPipe() 方法,創(chuàng)建一個默認(rèn)大小的buffer
4、public PipedInputStream(int pipeSize)
調(diào)用 initPipe() 方法,創(chuàng)建一個指定大小的bufferinitPipe 方法
private void initPipe(int pipeSize)
根據(jù) pipeSize 創(chuàng)建 buffer 。-
connect 方法
public void connect(PipedOutputStream src)
connect方法其實還是調(diào)用的 PipedOutputStream 類種的 connect 方法。
所以下面這樣寫法,是等價的,都是調(diào)用 PipedOutputStream 類種的 connect 方法。
receive 方法

通過分析 PipedOutputStream 的源碼,我們知道,該方法是在 PipedOutputStream.write() 方法種調(diào)用的。
- 1、checkStateForReceive()檢查是否可以接受數(shù)據(jù)。(是否可向 buffer 種寫入數(shù)據(jù));
- 2、獲取寫線程。PipedOutputStream.write() 中調(diào)用的,所以獲取的是PipedOutStream 所在的線程;
- 3、判斷 in==out。如果相等說明,已經(jīng)緩沖區(qū)已經(jīng)被填充滿數(shù)據(jù)了。這時調(diào)用 awaitSpace() 方法,喚醒讀線程(讀線程可能 wait 狀態(tài)),讓當(dāng)前線程 wait ,如果沒有讀線程喚醒寫線程,那么寫線程會在 awaitSpace() 方法種每隔1秒檢查一次是否可寫;
為什么 in == out 的時候就是寫滿緩沖區(qū)呢?
比如: buffer 長度為10,現(xiàn)在寫了5個字節(jié),又讀了5個字節(jié),是不是 in 也等于 out?
其實不會的,為什么?
因為讀的時候如果 in==out時,他把 in 的值置為了 -1。詳見 read() 方法。
- 4、如果 in<0,就是第一次寫或者已經(jīng)讀完 buffer 中已寫的數(shù)據(jù),這是,把 in 和 out 置為0;
- 5、向buffer 種寫入數(shù)據(jù)。
- 6、如果 in 達(dá)到 buffer 的最大長度,則把in 置為 0, 下次開始從0 開始填充。(這里,可以把 buffer 當(dāng)成一個環(huán)形隊列)。
awaitSpace() 源碼

read() 方法

1、執(zhí)行各種檢查,是否可讀。
2、獲取讀線程并賦值給 readSide 變量。
3、while 循環(huán)監(jiān)聽判斷是否有寫線程寫數(shù)據(jù),如果沒有則等待(每秒檢查一次),并喚醒寫線程(寫線程可能 wait )。
4、讀取 buffer 中的數(shù)據(jù)。 如果讀到 buffer 的最后一個元素,則把 out 置為0,下次從下標(biāo)0開始繼續(xù)讀(循環(huán)隊列表)。
5、如果 in == out,則把 in 置為 -1 。置為初始狀態(tài)。相當(dāng)于清空了緩沖區(qū),從緩沖區(qū)的下標(biāo) 0 開始讀寫。
available() 方法
獲取當(dāng)前可讀的字節(jié)數(shù)

1、如果 in<0; 說明當(dāng)前沒有可讀的數(shù)據(jù)
2、如果 in == out; 說明數(shù)據(jù)已經(jīng)填充滿了。
3、如果 in > out; 那么in - out 就是 可寫的字節(jié)數(shù)。
4、否則,就是 in < out 的情況。因為它是環(huán)形寫入的,可能出現(xiàn) in < out 的情況,所以需要 in + buffer.length - out,才能獲取可讀字節(jié)長度。
PipedInputStream 總結(jié)
PipedInputStream 原理其實也很簡單,但代碼看起來有點懵,它就是通過 wait() 和 notifyAll() 來控制 buffer 是否可讀,或可寫的。
管道流,做開發(fā)這么多年,現(xiàn)在都沒有遇到可用的場景。管道流能用到的場景,在并發(fā)包種,很多方式都可以實現(xiàn)或代替。比如 java.util.concurrent.Exchanger 類。
java.util.concurrent.Exchanger 的使用場景比管道流使用場景更廣泛些。
喜歡本文的朋友們,歡迎長按下圖關(guān)注訂閱號 java404,收聽更多精彩的內(nèi)容

