Eino中Pipe實現(xiàn)

Pipe 的reader 和 writer 封裝的為同一個Steam, Steam基于channel 進行數(shù)據(jù)的recv和send, 并通過一個獨立的channel 來控制closed狀態(tài), 代碼實現(xiàn)如下:

package schema

import "io"

func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
    stm := newStream[T](cap)
    return stm.asReader(), &StreamWriter[T]{stm: stm}
}

type StreamReader[T any] struct {
    st *stream[T]
}

type StreamWriter[T any] struct {
    stm *stream[T]
}

// stream is a channel-based stream with 1 sender and 1 receiver.
// The sender calls closeSend() to notify the receiver that the stream sender has finished.
// The receiver calls closeRecv() to notify the sender that the receiver stop receiving.
type stream[T any] struct {
    items chan streamItem[T]

    closed chan struct{}
}

func (s *stream[T]) asReader() *StreamReader[T] {
    return &StreamReader[T]{st: s}
}

func (s *stream[T]) recv() (chunk T, err error) {
    item, ok := <-s.items

    if !ok {
        item.err = io.EOF
    }

    return item.chunk, item.err
}

func (s *stream[T]) send(chunk T, err error) (closed bool) {
    // if the stream is closed, return immediately
    select {
    case <-s.closed:
        return true
    default:
    }

    item := streamItem[T]{chunk, err}

    select {
    case <-s.closed:
        return true
    case s.items <- item:
        return false
    }
}

func (s *stream[T]) closeSend() {
    close(s.items)
}

func (s *stream[T]) closeRecv() {
    close(s.closed)
}

type streamItem[T any] struct {
    chunk T
    err   error
}

func newStream[T any](cap int) *stream[T] {
    return &stream[T]{
        items:  make(chan streamItem[T], cap),
        closed: make(chan struct{}),
    }
}

注意stream 的Send方法實現(xiàn), 首先判斷stream是否closed, 不同于使用if判斷, 這里直接使用select來讀取closed通道中數(shù)據(jù), 如果有數(shù)據(jù)方法返回closed狀態(tài), 否則default分支什么都不執(zhí)行, 程序繼續(xù)執(zhí)行下面send操作,

select {
    case <-s.closed:
        return true
    default:
    }

send發(fā)送操作同樣在select 中進行, 第一個case分支讀取closed通道數(shù)據(jù)在非closed狀態(tài)下會阻塞,select 會選擇第二個case分支的數(shù)據(jù)發(fā)送操作。

select {
    case <-s.closed:
        return true
    case s.items <- item:
        return false
    }

測試用例:

func TestPipe(t *testing.T) {
    sr, sw := Pipe[int](10)
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            closed := sr.st.send(i, nil)
            if closed {
                break
            }
        }
        sr.st.closeSend()
    }()

    i := 0
    for {
        i++
        if i > 5 {
            sw.stm.closeRecv()
            break
        }
        v, err := sw.stm.recv()
        if err != nil {
            assert.ErrorIs(t, err, io.EOF)
            break
        }
        t.Log(v)
    }

    wg.Wait()

}
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容