源碼分析: PipedInputStream和PipedOutputStream

場(chǎng)景

假設(shè)我們需要上傳一組動(dòng)態(tài)增加的數(shù)據(jù), 輸入端可以看作inputSteam, 輸入端是outputSteam, 但是輸入和輸出端不能直接對(duì)接, 那么我們要怎樣實(shí)現(xiàn)呢?

我希望的解決方案時(shí), 輸入和輸出通過一個(gè)"數(shù)據(jù)池"間接連接, 輸入端把數(shù)據(jù)寫到數(shù)據(jù)池中, 輸出端從數(shù)據(jù)池中讀數(shù)據(jù), 這里要求數(shù)據(jù)池有"阻塞"功能, 即數(shù)據(jù)池滿了阻塞輸入端, 數(shù)據(jù)池空了, 阻塞輸出端.

以上效果可以使用PipedInputStreamPipedOutputStream實(shí)現(xiàn).

前言

  1. 這兩個(gè)類需要配套使用, 可以實(shí)現(xiàn)管道(pipe)傳輸數(shù)據(jù)
  2. 默認(rèn)的使用方式是, 通過管道連接線程A和B, 在A線程使用PipedOutputStream寫數(shù)據(jù), 數(shù)據(jù)緩存到"管道"后, B線程使用PipedInputStream讀取數(shù)據(jù), 以此完成數(shù)據(jù)傳輸, 如果在同一個(gè)線程使用這兩個(gè)類, 可能導(dǎo)致死鎖

PipedOutputStream

PipedOutputStream是管道的發(fā)送端. 寫線程通過它來往"管道"填充數(shù)據(jù).

我們先看看它有哪幾個(gè)方法, 從命名和注釋基本就能知道每個(gè)方法的作用

// 關(guān)聯(lián)PipedInputStream
public void connect(PipedInputStream snk)

// 寫一個(gè)數(shù)據(jù)
public void write(int b)

// 寫一段數(shù)據(jù)
public void write(byte b[], int off, int len)

// 通知讀線程, 管道中有數(shù)據(jù)等待讀取
public void flush()

// 關(guān)閉發(fā)送端, 不再發(fā)送數(shù)據(jù)
public void close()

以上注釋已經(jīng)大致說明了這個(gè)類的功能了, 接著我們逐個(gè)方法分析

connect

public synchronized void connect(PipedInputStream snk) throws IOException {
    // 先確保
    // 1. 連接對(duì)象(輸入的snk)不能為空
    // 2. 不能重復(fù)連接
    sink = snk;
    snk.in = -1;
    snk.out = 0;
    snk.connected = true;
}

從上可以看出, connect方法就是修改連接的PipedInputStream的成員變量, 使其處于已連接狀態(tài).

write

public void write(int b)  throws IOException {
    // 確保sink不為空, 即確保已經(jīng)連接
    sink.receive(b);
}

public void write(byte b[], int off, int len) throws IOException {
    // 先確保
    // 1. 已經(jīng)連接
    // 2. 輸出數(shù)組b不為空
    // 3. off和len不會(huì)導(dǎo)致數(shù)組越界
    if (sink == null) {
        // ...
    } else if (len == 0) {
        // 如果len == 0, 表示不讀取數(shù)據(jù), 所以可以直接返回
        return;
    }
        
    sink.receive(b, off, len);
}

從上可以看出, 兩個(gè)write方法, 最后都調(diào)用了響應(yīng)的PipedInputStream#receive方法, 這表明

數(shù)據(jù)存儲(chǔ)的地方寫數(shù)據(jù)的具體邏輯都在PipedInputStream

后面我們?cè)僭敿?xì)分析.

flush

public synchronized void flush() throws IOException {
    if (sink != null) {
        synchronized (sink) {
            sink.notifyAll();
        }
    }
}

這個(gè)方法先嘗試獲取sink的鎖, 然后通過notifyAll()來調(diào)度線程, 在這里, 具體就是使讀線程開始讀取數(shù)據(jù), 這里涉及讀寫線程間的溝通調(diào)度問題, 在了解完PipedInputStream之后我們?cè)僦匦驴催@個(gè)問題.

close

public void close() throws IOException {
    if (sink != null) {
        sink.receivedLast();
    }
}

這個(gè)方法就是簡單的調(diào)用了PipedInputStream#receivedLast()方法, 從方法名可以判斷出, 這個(gè)方法就是通知PipedInputStream, 數(shù)據(jù)已經(jīng)填充完畢.

總結(jié)

從上面的分析可以看出, PipedOutputStream基本就是一個(gè)"接口"類, 不會(huì)對(duì)數(shù)據(jù)進(jìn)行實(shí)際的操作, 也不承擔(dān)具體的職責(zé), 只負(fù)責(zé)把數(shù)據(jù)交給PipedInputStream處理.

下面我們接著分析最關(guān)鍵的PipedInputStream的源碼

PipedInputStream

成員變量

我們先看下關(guān)鍵的幾個(gè)變量

// 緩存數(shù)組, "管道"數(shù)據(jù)的存儲(chǔ)的地方
protected byte buffer[];

// 寫下一個(gè)數(shù)據(jù)時(shí), 保存到緩存數(shù)組的位置
// 小于0表示無可讀數(shù)據(jù), 緩存數(shù)組為空
// in == out時(shí)表示緩存數(shù)組已滿
protected int in = -1;

// 下一個(gè)被讀數(shù)據(jù)在緩存數(shù)組的位置
protected int out = 0;

看上面3個(gè)成員變量我們基本可以知道

"管道"內(nèi)部使用了數(shù)組來緩存寫入的數(shù)據(jù), 等待讀取. 通過inout兩個(gè)值來記錄數(shù)組的寫位置和讀位置

其余變量都是一些狀態(tài)標(biāo)識(shí)

// 寫數(shù)據(jù)端(輸入端)是否已經(jīng)關(guān)閉
boolean closedByWriter = false;
// 讀數(shù)據(jù)端(輸出端)是否已經(jīng)關(guān)閉
volatile boolean closedByReader = false;
// 是否處于已連接狀態(tài)
boolean connected = false;
// 記錄讀線程
Thread readSide;
// 記錄寫線程
Thread writeSide;

這些變量都是用于判斷當(dāng)前"管道"的狀態(tài)

其中readSidewriteSide是一種簡單的標(biāo)記讀寫線程的方式, 源碼注釋中也有說明這種方式并不可靠, 這種方式針對(duì)的應(yīng)該是兩條線程的情況, 所以我們使用的時(shí)候應(yīng)該盡量按照設(shè)計(jì)意圖來使用

在兩條線程中建立"管道"傳遞數(shù)據(jù), 寫線程寫數(shù)據(jù), 讀線程讀數(shù)據(jù).

構(gòu)造函數(shù)

它包含了好幾個(gè)構(gòu)造函數(shù), 我們只看參數(shù)最多的那個(gè)

public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
    initPipe(pipeSize);
    connect(src);
}

最終都會(huì)要求我們調(diào)用上面的兩個(gè)方法, 都比較簡單就不貼代碼了

  1. initPipe()里面對(duì)byte數(shù)組buffer變量進(jìn)行賦值, 也就是初始化緩沖區(qū)域
  2. connect()方法直接調(diào)用了PipedOutputStream#connect, 上面已經(jīng)分析過了, 最終效果就是指明PipedOutputStream的連接對(duì)象, 改變connected變量的值, 使得PipedInputStream處于連接狀態(tài).

receive

通過上面PipedOutputStream的分析可以知道, 寫數(shù)據(jù)的方法會(huì)調(diào)用PipedInputStreamreveive方法, 所以我們首先分析這個(gè)方法, 了解寫數(shù)據(jù)的邏輯. 注意閱讀注釋!

// 寫單個(gè)數(shù)據(jù)
protected synchronized void receive(int b) throws IOException {
    // 檢查當(dāng)前"管道"狀態(tài), 確保能夠讀寫數(shù)據(jù)
    checkStateForReceive();
    // 本方法由PipedOutputStream調(diào)用, 所以線程是寫線程, 記錄該線程
    writeSide = Thread.currentThread();
    if (in == out)
        // in == out表示緩存數(shù)組已經(jīng)滿了, 阻塞線程等待
        // 這里確保了未讀的緩存數(shù)據(jù)不會(huì)丟失
        awaitSpace();
    // 當(dāng)檢測(cè)到緩存數(shù)組有空間, 等待結(jié)束后, 會(huì)繼續(xù)執(zhí)行以下代碼
    if (in < 0) {
        // in小于0表示當(dāng)前無數(shù)據(jù), 設(shè)置讀, 寫位置都是0
        in = 0;
        out = 0;
    }
    // 寫操作
    // 1. 把數(shù)據(jù)寫到目標(biāo)位置(in)
    // 2. 后移in, 指明下一個(gè)寫數(shù)據(jù)的位置
    buffer[in++] = (byte)(b & 0xFF);
    // 如果in超出緩存長度, 回到0, 循環(huán)利用緩存數(shù)組
    if (in >= buffer.length) {
        in = 0;
    }
}

注意該方法帶有synchronized關(guān)鍵字, 表明在該方法內(nèi), 會(huì)持有對(duì)象鎖, 我們留到最后再分析各個(gè)環(huán)節(jié)中, 對(duì)象鎖的歸屬問題.

在寫數(shù)據(jù)前會(huì)先通過checkStateForReceive檢查"管道"狀態(tài), 確保

  1. 當(dāng)前處于連接狀態(tài)
  2. 管道讀寫兩端都沒有被關(guān)閉
  3. 讀線程狀態(tài)正常

接著用writeSide記錄當(dāng)前線程為寫線程, 用來后續(xù)判斷線程狀態(tài);

然后判斷目標(biāo)位置(in), 如果in == out表明當(dāng)前緩存數(shù)組已經(jīng)滿了, 不能再寫數(shù)據(jù)了, 所以會(huì)通過awaitSpace()方法阻塞寫線程;

// 此時(shí)寫線程持有鎖
private void awaitSpace() throws IOException {
    // 只有緩存數(shù)組已滿才需要等待
    while (in == out) {
        // 檢查管道狀態(tài), 防止在等待的過程中狀態(tài)發(fā)生變化 
        checkStateForReceive();
        // 標(biāo)準(zhǔn)用法中僅涉及兩條線程, 所以這里可以認(rèn)為是通知讀線程讀數(shù)據(jù)
        notifyAll();
        try {
            // 釋放對(duì)象鎖, 等待讀線程讀數(shù)據(jù), 調(diào)用后就會(huì)阻塞寫線程
            // 1s后取消等待是為了再次檢查管道狀態(tài)
            // 注意等待結(jié)束后, 鎖仍然在寫線程
            wait(1000);
        } catch (InterruptedException ex) {
            // 直接拋出異常
            IoUtils.throwInterruptedIoException();
        }
    }
}

以上基本可以概括為

緩存數(shù)組有空間時(shí)直接寫數(shù)據(jù), 無空間時(shí)阻塞寫線程, 直至有空間可以寫數(shù)據(jù)

接著分析寫一段數(shù)據(jù)的receive(byte[], int, int)方法, 注意閱讀注釋!

synchronized void receive(byte b[], int off, int len)  throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    // len是需要寫進(jìn)緩存數(shù)據(jù)的總長度
    // bytesToTransfer用來記錄剩余個(gè)數(shù)
    int bytesToTransfer = len;
    // 循環(huán)寫數(shù)據(jù)過程, 直至需要寫的數(shù)據(jù)全部處理完畢
    while (bytesToTransfer > 0) {
        if (in == out)
            // in == out表示緩存區(qū)域已經(jīng)滿了, 阻塞線程等待
            awaitSpace();
        // nextTransferAmount用來記錄本次過程寫進(jìn)緩存的個(gè)數(shù)
        int nextTransferAmount = 0;
        if (out < in) {
            // 因?yàn)閛ut必然大于等于0, 所以這里 0 <= out < int
            // out < in 表示[in, buffer.length)和[0, out)兩個(gè)區(qū)間可以寫數(shù)據(jù)
            // 先寫數(shù)據(jù)進(jìn)[in, buffer.length)區(qū)間, 避免處理頭尾連接的邏輯, 如果還有數(shù)據(jù)剩余, 留到下一個(gè)循環(huán)處理
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {
            // 注意in有可能為-1, 所以特殊判斷下
            if (in == -1) {
                // in == -1表示緩存數(shù)組為空, 整個(gè)數(shù)組都可以寫數(shù)據(jù)
                // 從這里可知, 單次寫數(shù)據(jù)最大長度就是緩存數(shù)組的長度
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {
                // in < out 表示[in, out)區(qū)間可以寫數(shù)據(jù)
                nextTransferAmount = out - in;
            }
        }
        // 到這里nextTransferAmount表示本次過程**最多**可以寫的數(shù)據(jù)
        if (nextTransferAmount > bytesToTransfer)
            // 位置比需要的多, 所以修改nextTransferAmount
            nextTransferAmount = bytesToTransfer;
        // 經(jīng)過上面的判斷, nextTransferAmount表示本次過程可以寫進(jìn)緩存的個(gè)數(shù)
        assert(nextTransferAmount > 0);
        // 把數(shù)據(jù)寫進(jìn)緩存
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        // 計(jì)算剩余個(gè)數(shù)
        bytesToTransfer -= nextTransferAmount;
        // 移動(dòng)數(shù)據(jù)起點(diǎn)
        off += nextTransferAmount;
        // 后移in
        in += nextTransferAmount;
        // 如果in超出緩存長度, 回到0
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

代碼邏輯注釋已經(jīng)說明得很清楚了, 當(dāng)你需要處理頭尾相連的數(shù)組時(shí), 可以學(xué)習(xí)上面循環(huán)處理數(shù)據(jù)的方法, 邏輯清晰, 不需要太多的邊界判斷.

receiveLast

當(dāng)輸入端關(guān)閉時(shí)(調(diào)用PipedOutputStream#close()), 會(huì)調(diào)用receivedLast()

synchronized void receivedLast() {
    // 標(biāo)記輸入端關(guān)閉
    closedByWriter = true;
    // 通知讀線程讀數(shù)據(jù)
    notifyAll();
}

該方法使用變量標(biāo)記輸入端已經(jīng)關(guān)閉, 表示不會(huì)有新數(shù)據(jù)寫入了.

read

分析完寫數(shù)據(jù), 接下來該分析讀數(shù)據(jù)了.

public synchronized int read()  throws IOException {
    // synchronized關(guān)鍵字, 讀線程需要持有鎖才能讀數(shù)據(jù)
    
    // 先檢查管道狀態(tài)
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByReader) {
        throw new IOException("Pipe closed");
    } else if (writeSide != null && !writeSide.isAlive()
        && !closedByWriter && (in < 0)) {
        // 只要in >= 0, 表示還有數(shù)據(jù)沒有讀, 所以不拋出異常
        // 這個(gè)判斷表明了, 即使輸入端已經(jīng)調(diào)用了close, 也能繼續(xù)讀已經(jīng)寫入的數(shù)據(jù)
        throw new IOException("Write end dead");
    }
    
    // 記錄讀線程
    readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
        // in<0表示緩存區(qū)域?yàn)榭? 只要輸入端沒有被關(guān)閉, 阻塞線程等待數(shù)據(jù)寫入, 即等待in >= 0
        if (closedByWriter) {
            // 輸入端關(guān)閉了, 同時(shí)in < 0, 表示數(shù)據(jù)傳輸完畢了, 返回-1 
            return -1;
        }
        // 檢查寫線程的狀態(tài), 線程狀態(tài)異常則認(rèn)為管道異常, 檢查2次
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
            throw new IOException("Pipe broken");
        }
        // 這里可以認(rèn)為是通知寫線程寫數(shù)據(jù)
        notifyAll();
        try {
            // 阻塞線程, 等待1s, 這里會(huì)釋放鎖, 給機(jī)會(huì)寫線程獲取鎖, 寫數(shù)據(jù)
            wait(1000);
        } catch (InterruptedException ex) {
            IoUtils.throwInterruptedIoException();
        }
    }

    // 執(zhí)行到這里證明in >= 0, 即緩存數(shù)組中有數(shù)據(jù)
    // 關(guān)鍵的讀操作
    // 1. 讀取out指向的byte數(shù)據(jù)
    // 2. 后移out
    // 3. 把byte轉(zhuǎn)成int, 高位補(bǔ)0
    int ret = buffer[out++] & 0xFF;
    // out超出長度則回到位置0
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
        // 讀取的數(shù)據(jù)追上了輸入的數(shù)據(jù), 則當(dāng)前緩存區(qū)域?yàn)榭? 所以設(shè)置in = -1
        in = -1;
    }

    return ret;
}

從上面的注釋分析可以知道

  1. 即使調(diào)用了PipedOutputStream#close(), 只要管道中還有數(shù)據(jù), 仍可以讀數(shù)據(jù), 所以實(shí)際使用時(shí), 輸入端輸入完畢后可以直接close輸入端.
  2. 當(dāng)管道中沒有數(shù)據(jù)時(shí), 會(huì)阻塞讀線程, 直至管道被關(guān)閉, 線程異常或者數(shù)據(jù)被寫入到管道中.

接著看看讀取一段數(shù)據(jù)的方法

public synchronized int read(byte b[], int off, int len)  throws IOException {
    // 參數(shù)byte[](下面稱輸出數(shù)組)是數(shù)據(jù)讀取后存放的地方, 所以要先檢查該數(shù)組
    if (b == null) {
        // 確保輸出數(shù)組不為null, 否則讀出的數(shù)據(jù)不能寫入
        throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
        // 確保下標(biāo)不會(huì)越界
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        // len參數(shù)表示需要讀取的長度, 等于0時(shí)相當(dāng)于不讀數(shù)據(jù), 所以直接返回
        return 0;
    }

    // 先單獨(dú)讀一個(gè)數(shù)據(jù)是為了確保已經(jīng)有數(shù)據(jù)寫入, 因?yàn)槿绻?dāng)前無數(shù)據(jù), 則會(huì)阻塞當(dāng)前的讀線程
    int c = read();
    // 返回值小于0(實(shí)際上只能是-1), 表示管道已經(jīng)沒有數(shù)據(jù)了, 所以這里也直接返回-1
    if (c < 0) {
        return -1;
    }
    // 把讀取到的第一個(gè)數(shù)據(jù)放到輸出數(shù)組, 看后面的代碼時(shí)緊記這里已經(jīng)讀了1個(gè)數(shù)據(jù)
    b[off] = (byte) c;
    // 記錄讀取到的數(shù)據(jù)長度
    int rlen = 1;
    // 循環(huán)條件:
    // in >= 0確保還有數(shù)據(jù)可以讀
    // len > 1確保只讀取外部請(qǐng)求的數(shù)據(jù)長度, 因?yàn)樯厦嬉呀?jīng)讀了1個(gè)數(shù)據(jù), 所以是大于1, 而不是大于0
    while ((in >= 0) && (len > 1)) {

        // available用來記錄當(dāng)前可以讀取的數(shù)據(jù)
        int available;

        if (in > out) {
            // in > out表示[out, in)區(qū)間數(shù)據(jù)可讀
            // in的值正常情況下是不會(huì)大于buffer.length的, 因?yàn)楫?dāng) in == buffer.length時(shí), in就會(huì)賦值0
            // 這里的Math.min顯得有點(diǎn)多余, 可能是為了以防萬一吧
            available = Math.min((buffer.length - out), (in - out));
        } else {
            // 首先in是不會(huì)等于out的, 因?yàn)槿绻嗟? 在上面讀第一個(gè)數(shù)據(jù)的時(shí)候就會(huì)把in賦值-1, 也就不會(huì)進(jìn)入這個(gè)循環(huán)
            // 當(dāng)in < out表示[out, buffer.length)和[0, in)兩個(gè)區(qū)間的數(shù)據(jù)可讀
            // 和receive方法類似, 為了不處理跨邊界的情況, 先讀[out, buffer.length)區(qū)間數(shù)據(jù)
            available = buffer.length - out;
        }

        // 外部已經(jīng)讀了一個(gè)數(shù)據(jù), 所以只需要讀(len - 1)個(gè)數(shù)據(jù)了
        if (available > (len - 1)) {
            available = len - 1;
        }
        // 經(jīng)過上面的判斷, available表示本次需要讀的數(shù)據(jù)長度
        // 復(fù)制數(shù)據(jù)到輸出數(shù)組
        System.arraycopy(buffer, out, b, off + rlen, available);
        // 后移out變量
        out += available;
        // 記錄已經(jīng)讀到的數(shù)據(jù)量
        rlen += available;
        // 計(jì)算剩余需要讀的數(shù)據(jù)
        len -= available;

        // 如果已經(jīng)讀到緩存數(shù)組的尾部, 回到開頭
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            // in == out表示已經(jīng)沒有數(shù)據(jù)可以讀了, 所以in賦值-1
            in = -1;
        }
    }
    return rlen;
}

上面的方法我們需要注意:

while方法體內(nèi)是不會(huì)阻塞讀線程的! while方法體內(nèi)是不會(huì)阻塞讀線程的! while方法體內(nèi)是不會(huì)阻塞讀線程的! 重要的事情說3遍~ 所以如果管道內(nèi)只有1個(gè)數(shù)據(jù), 那么讀取到輸出數(shù)組的就只有這1個(gè)數(shù)據(jù), read方法返回值會(huì)是1, 在讀取數(shù)據(jù)后處理輸出數(shù)組時(shí)需要特別注意這點(diǎn).

available

我們?cè)谧x數(shù)據(jù)前可以利用available()先看看管道中的數(shù)據(jù)個(gè)數(shù).

public synchronized int available() throws IOException {
    if(in < 0)
        // 管道中無數(shù)據(jù)
        return 0;
    else if(in == out)
        // 緩存數(shù)組已滿
        return buffer.length;
    else if (in > out)
        // [out, in)區(qū)間內(nèi)為有效數(shù)據(jù)
        return in - out;
    else
        // in < out
        // [in, out)區(qū)間為無效數(shù)據(jù), 其余為有效數(shù)據(jù), 所以長度為 buffer.length - (out - in)
        return in + buffer.length - out;
}

到這里我們已經(jīng)把所有PipedOutputStreamPipedInputStream的所有方法分析完畢了~ 接著我們?cè)俜治鱿伦x寫過程中對(duì)象鎖的歸屬問題.

分析這部分我們先要了解下waitnotifyAll的作用, 可以參考知乎上這個(gè)回答java中的notify和notifyAll有什么區(qū)別? - 文龍的回答 - 知乎, 本文不再說明了, 重點(diǎn)理解鎖池等待池概念

  • 鎖池:假設(shè)線程A已經(jīng)擁有了某個(gè)對(duì)象(注意:不是類)的鎖,而其它的線程想要調(diào)用這個(gè)對(duì)象的某個(gè)synchronized方法(或者synchronized塊),由于這些線程在進(jìn)入對(duì)象的synchronized方法之前必須先獲得該對(duì)象的鎖的擁有權(quán),但是該對(duì)象的鎖目前正被線程A擁有,所以這些線程就進(jìn)入了該對(duì)象的鎖池中。

  • 等待池:假設(shè)一個(gè)線程A調(diào)用了某個(gè)對(duì)象的wait()方法,線程A就會(huì)釋放該對(duì)象的鎖后,進(jìn)入到了該對(duì)象的等待池中

首先, 需要注意, PipedOutputStream中, 兩個(gè)write方法都沒有synchronized關(guān)鍵字, 所以我們不需要關(guān)心PipedOutputStream的對(duì)象鎖.

我們重點(diǎn)分析PipedInputStream里面, readreceive方法.

假設(shè)我們先調(diào)用receive寫數(shù)據(jù), 后調(diào)用read讀數(shù)據(jù)

當(dāng)我們寫數(shù)據(jù)時(shí), 進(jìn)入了receive方法, 因?yàn)?code>synchronized關(guān)鍵字, 此時(shí)寫線程會(huì)獲取到了對(duì)象鎖, 然后寫數(shù)據(jù)到管道中, 注意, 在這個(gè)過程中, 讀線程是不能通過read方法讀取數(shù)據(jù)的, 因?yàn)樽x線程獲取不了對(duì)象鎖, 如果這次寫操作中, 管道中的緩存數(shù)組滿了, 此時(shí)寫線程會(huì)進(jìn)入awaitSpace()方法, 在該方法內(nèi), 寫線程先調(diào)用了notifyAll方法, 使讀線程進(jìn)入鎖池準(zhǔn)備競(jìng)爭對(duì)象鎖, 然后調(diào)用wait(1000)方法, 在這1s內(nèi), 寫線程釋放了對(duì)象鎖, 然后進(jìn)入等待池.

寫線程釋放對(duì)象鎖后, 讀線程就能夠獲取對(duì)象鎖, 進(jìn)入read方法內(nèi)了, 然后讀數(shù)據(jù), 只要管道中存在至少一個(gè)數(shù)據(jù), 就不會(huì)阻塞線程, 讀取數(shù)據(jù)后直接退出方法, 釋放對(duì)象鎖, 如果這次讀操作中, 管道中的緩存數(shù)組沒有任何數(shù)據(jù), 此時(shí)讀線程就會(huì)調(diào)用notifyAll方法, 使寫線程從等待池移到鎖池, 準(zhǔn)備競(jìng)爭對(duì)象鎖, 然后再調(diào)用wait(1000)方法, 在這1s內(nèi), 讀線程釋放對(duì)象鎖, 自己進(jìn)入等待池.

以上就是一次讀寫中, 對(duì)象鎖的轉(zhuǎn)移過程, 但是在實(shí)際過程中, 我們都是兩個(gè)線程在各自的循環(huán)體內(nèi)一直讀數(shù)據(jù)和一直寫數(shù)據(jù)的, 所以每一次循環(huán)的時(shí)候都會(huì)競(jìng)爭鎖, 可能先讀后寫, 或者先寫后讀.

總結(jié)

分析這兩個(gè)類的源碼我們應(yīng)該可以學(xué)習(xí)到

  1. InputSteamOutputSteam的接口含義
  2. 使用數(shù)組緩存數(shù)據(jù)的方法, 使用while循環(huán)避免處理邊界問題
  3. waitnotifyAll協(xié)調(diào)讀寫線程的邏輯
  4. 使用這兩個(gè)類實(shí)現(xiàn)傳輸數(shù)據(jù)流
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容