Pipe 的reader 和 writer 封裝的為同一個Steam, Steam基于channel 進行數(shù)據(jù)的recv和send, 并通過一個獨立的channel 來控制closed狀態(tài), 代碼實現(xiàn)如下:
package schema
import "io"
func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
stm := newStream[T](cap)
return stm.asReader(), &StreamWriter[T]{stm: stm}
}
type StreamReader[T any] struct {
st *stream[T]
}
type StreamWriter[T any] struct {
stm *stream[T]
}
// stream is a channel-based stream with 1 sender and 1 receiver.
// The sender calls closeSend() to notify the receiver that the stream sender has finished.
// The receiver calls closeRecv() to notify the sender that the receiver stop receiving.
type stream[T any] struct {
items chan streamItem[T]
closed chan struct{}
}
func (s *stream[T]) asReader() *StreamReader[T] {
return &StreamReader[T]{st: s}
}
func (s *stream[T]) recv() (chunk T, err error) {
item, ok := <-s.items
if !ok {
item.err = io.EOF
}
return item.chunk, item.err
}
func (s *stream[T]) send(chunk T, err error) (closed bool) {
// if the stream is closed, return immediately
select {
case <-s.closed:
return true
default:
}
item := streamItem[T]{chunk, err}
select {
case <-s.closed:
return true
case s.items <- item:
return false
}
}
func (s *stream[T]) closeSend() {
close(s.items)
}
func (s *stream[T]) closeRecv() {
close(s.closed)
}
type streamItem[T any] struct {
chunk T
err error
}
func newStream[T any](cap int) *stream[T] {
return &stream[T]{
items: make(chan streamItem[T], cap),
closed: make(chan struct{}),
}
}
注意stream 的Send方法實現(xiàn), 首先判斷stream是否closed, 不同于使用if判斷, 這里直接使用select來讀取closed通道中數(shù)據(jù), 如果有數(shù)據(jù)方法返回closed狀態(tài), 否則default分支什么都不執(zhí)行, 程序繼續(xù)執(zhí)行下面send操作,
select {
case <-s.closed:
return true
default:
}
send發(fā)送操作同樣在select 中進行, 第一個case分支讀取closed通道數(shù)據(jù)在非closed狀態(tài)下會阻塞,select 會選擇第二個case分支的數(shù)據(jù)發(fā)送操作。
select {
case <-s.closed:
return true
case s.items <- item:
return false
}
測試用例:
func TestPipe(t *testing.T) {
sr, sw := Pipe[int](10)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
closed := sr.st.send(i, nil)
if closed {
break
}
}
sr.st.closeSend()
}()
i := 0
for {
i++
if i > 5 {
sw.stm.closeRecv()
break
}
v, err := sw.stm.recv()
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
t.Log(v)
}
wg.Wait()
}

image.png