gRPC是基于HTTP/2標(biāo)準(zhǔn)和proto協(xié)議開發(fā)的,gRPC的很多特性都依賴于HTTP/2標(biāo)準(zhǔn)提供。gRPC設(shè)計(jì)的四種模式是基于底層HTTP/2的流的概念。transport包是基于HTTP/2標(biāo)準(zhǔn)的實(shí)現(xiàn),提供了流控等特性。
流控
transport提供基于connection和stream的兩級(jí)流控。
-------------------------------------gRPC流控默認(rèn)值----------------------------------------------
defaultWindowSize = 65535 //64K
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
-------------------------------------流控?cái)?shù)據(jù)結(jié)構(gòu)------------------------------------------------
type inFlow struct {
//流控限制未處理的數(shù)據(jù)的數(shù)量
limit uint32
mu sync.Mutex
//pendingData包含所有收到但未被應(yīng)用消費(fèi)的數(shù)據(jù)
pendingData uint32
//pendingUpdate包含被消費(fèi)但為發(fā)送更新窗口的數(shù)量,減少窗口更新的頻率
pendingUpdate uint32
}
//真實(shí)的流控處理函數(shù),server在接收到client的請求后會(huì)先
//檢查pendingData+pendingUpdate是否超過limit限制
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit {
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
}
return nil
}
//http2標(biāo)準(zhǔn)中規(guī)定:針對(duì)控制類的frame,為了確保能夠得到高優(yōu)先級(jí)的處理不做流控。DataFrame的流控處理在如下的函數(shù)中進(jìn)行處理。
----------------------------------server端處理流------------------------------------------------
//server端handleData用于接收dataFrame
func (t *http2Server) handleData(f *http2.DataFrame) {
size := len(f.Data())
//針對(duì)connection的流控,如果client和server在該connection的負(fù)載大于16 * 64K,server會(huì)主動(dòng)斷開與client之間的連接。
if err := t.fc.onData(uint32(size)); err != nil {
//onData函數(shù)實(shí)現(xiàn)見流控的數(shù)據(jù)結(jié)構(gòu)
grpclog.Printf("transport: http2Server %v", err)
//超過負(fù)載,直接關(guān)閉connection
t.Close()
return
}
// 選擇正確的流進(jìn)行處理
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
//更新流控窗口的大小
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
// stream已經(jīng)被關(guān)閉,需要更新流控窗口
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
//同一連接上的不同stream具有競爭關(guān)系,提供了strean級(jí)的流控
if err := s.fc.onData(uint32(size)); err != nil {
//onData()函數(shù)實(shí)現(xiàn)見流控?cái)?shù)據(jù)結(jié)構(gòu)
s.mu.Unlock()
//關(guān)閉超過流控限制的stream
t.closeStream(s)
//通知client再建立streamID相同的stream
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
s.mu.Unlock()
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock()
if s.state != streamDone {
s.state = streamReadDone
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
}
RPC調(diào)用的執(zhí)行過程
以u(píng)nary模式的rpc調(diào)用為例分析一次RPC請求在gRPC中的流轉(zhuǎn)過程,其他三種模式底層調(diào)用的函數(shù)與unary模式相同(四種模式從底層的HTTP/2分析都是stream,并且仍然是一套request和response的實(shí)現(xiàn))。
注: 以下源碼分析部分均是以grpc/example/route_guide為例進(jìn)行分析。對(duì)其他模式感興趣的讀者可自行分析。
unary模式的RPC請求在gRPC中的執(zhí)行過程
------------------------------------------proto的聲明-------------------------------------------
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
}
------------------------------------------pb.go源碼---------------------------------------------
func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
out := new(Feature)
// -->/routeguide.RouteGuide/GetFeature ->/package/server/method
err := grpc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
//以下代碼去掉錯(cuò)誤處理和非關(guān)鍵函數(shù)的調(diào)用
//以下代碼分析的是grpc client端如何發(fā)送request到server
-----------------------------------------grpc-client代碼----------------------------------------
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
c := defaultCallInfo //構(gòu)造rpc調(diào)用的defaultCallInfo并根據(jù)用戶傳入的信息進(jìn)行填充
topts := &transport.Options{
Last: true,
Delay: false,
}
for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
put func()
)
//callHdr攜帶詳細(xì)的RPC調(diào)用信息,如Method->/routeguide.RouteGuide/GetFeature
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
if _, ok := err.(*rpcError); ok {
return err
}
//非failFast情況下,err為以下兩種情況會(huì)重試
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, "%v", err)
}
continue
}
return Errorf(codes.Internal, "%v", err)
}
//將client請求信息發(fā)送,并等待server返回
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
//在sendRequest創(chuàng)建的stream上等待server返回response
err = recvResponse(cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
//關(guān)閉創(chuàng)建的stream
t.CloseStream(stream, nil)
if put != nil {
put()
put = nil
}
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}
----------------------------------------------sendRequest()說明--------------------------------
func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
//根據(jù)callHdr中包含的host和method信息創(chuàng)建對(duì)應(yīng)的stream
//函數(shù)具體實(shí)現(xiàn)-transport/http2_client.go/http2Client.NewStream()
stream, err := t.NewStream(ctx, callHdr)
//序列化消息并定義消息頭
//消息頭=5yte=1byte(msg是否壓縮) + 4byte(msg長度)
//函數(shù)具體實(shí)現(xiàn)-rpc_util.go
outBuf, err := encode(codec, args, compressor, cbuf)
//將outBuf按照http2幀的大小分幀并發(fā)送到對(duì)端,下面會(huì)對(duì)該函數(shù)具體分析
err = t.Write(stream, outBuf, opts)
//發(fā)送成功,返回該stream,用于接收response
return stream, nil
}
------------------------------------ClientTransport.Write()說明---------------------------------
//真正將message分幀在指定的stream上傳輸?shù)暮瘮?shù)如下,將對(duì)該函數(shù)進(jìn)行詳細(xì)分析
func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
r := bytes.NewBuffer(data)
for {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// 等待stream的流控上有配額發(fā)送數(shù)據(jù),stream.sendQuotaPool=65535
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// 等待connection的流控有配額去發(fā)送數(shù)據(jù),t.sendQuotaPool= 65535 * 16
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
}
if sq < size {
size = sq
}
if tq < size {
size = tq
}
p = r.Next(size)
ps := len(p)
if ps < sq {
// 返回stream預(yù)留超額的配額數(shù)量
s.sendQuotaPool.add(sq - ps)
}
if ps < tq {
// 返回connection預(yù)留超額的配額數(shù)量
t.sendQuotaPool.add(tq - ps)
}
}
var (
endStream bool
forceFlush bool
)
//判斷是否為最后一幀l
if opts.Last && r.Len() == 0 {
endStream = true
}
// 表明這將有一個(gè)writer將要去寫data frame
t.framer.adjustNumWriters(1)
// 釋放t.writableChan上加的鎖,獲得在該transport上寫的權(quán)利,確保只有一個(gè)調(diào)用者可以調(diào)用t.framer.writeData()函數(shù)。
if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
// 釋放connection上預(yù)留的配額數(shù)量
t.sendQuotaPool.add(len(p))
}
if t.framer.adjustNumWriters(-1) == 0 {
// 如果該Writer是這一批的最后一個(gè)有責(zé)任去刷新http2.frames的緩存區(qū)
//將刷新的請求排入一個(gè)隊(duì)列而不是直接刷新合一避免和其他的Writer或者刷新請求的競爭
t.controlBuf.put(&flushIO{})
}
return err
}
select {
case <-s.ctx.Done():
t.sendQuotaPool.add(len(p))
if t.framer.adjustNumWriters(-1) == 0 {
t.controlBuf.put(&flushIO{})
}
//再次為該transport加鎖
t.writableChan <- 0
return ContextErr(s.ctx.Err())
default:
}
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
// 強(qiáng)制刷新因?yàn)檫@是grpc message的最后一個(gè)數(shù)據(jù)幀
//對(duì)于調(diào)用者來說此刻僅僅只有一個(gè)writer
forceFlush = true
}
//如果t.framer.writeData失敗,所有等待處理的stream將會(huì)在http2Clinet.Close()函數(shù)中進(jìn)行處理,此處不必顯示調(diào)用CloseStream()
//writeData()不會(huì)并發(fā)被調(diào)用,確保server端收到的frame不會(huì)亂序(不會(huì)出現(xiàn)dataframe早于headerframe先到)
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
//writeData()增加二進(jìn)制幀的頭部,函數(shù)實(shí)現(xiàn)-net/http2/frame.go
t.notifyError(err)
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
}
//再次為該transport加鎖
t.writableChan <- 0
if r.Len() == 0 {
break
}
}
if !opts.Last {
return nil
}
s.mu.Lock()
if s.state != streamDone {
//更新stream的狀態(tài)
s.state = streamWriteDone
}
s.mu.Unlock()
return nil
}
//以下代碼是分析grpc-server接收client的請求后內(nèi)部的處理流程
---------------------------------------grpc-server代碼------------------------------------------
//serve函數(shù)在net.Listener接收客戶端的連接,創(chuàng)建一個(gè)新的ServerTransport和service goroutine為每個(gè)連接,服務(wù)goroutine讀取gRPC請求,然后調(diào)用server中注冊的函數(shù)。
func (s *Server) Serve(lis net.Listener) error {
s.lis[lis] = true
for {
rawConn, err := lis.Accept()
if err != nil {
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err
}
//開始一個(gè)單獨(dú)的goroutine處理client的連接-rawConn
//繼續(xù)for循環(huán)等待其他client的到來
go s.handleRawConn(rawConn)
}
}
//handleRawConn運(yùn)行在獨(dú)立的goroutine,并且處理已經(jīng)接收連接但未執(zhí)行任何I/O操作的連接
func (s *Server) handleRawConn(rawConn net.Conn) {
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
return
}
if s.opts.useHandlerImpl {
s.serveUsingHandler(conn)
} else {
s.serveNewHTTP2Transport(conn, authInfo)
}
}
//serveNewHTTP2Transport建立一個(gè)新的HTTP/2 tranport并且為在該transport上的流提供服務(wù)
func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
//調(diào)用transport/http2_server.go
st, err := transport.NewServerTransport("http2", c, 2, authInfo)
if !s.addConn(st) {
st.Close()
return
}
//在transport上接收client發(fā)送stream并進(jìn)行處理的函數(shù)
s.serveStreams(st)
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer s.removeConn(st)
defer st.Close()
var wg sync.WaitGroup
//transport.ServerTranport下的st.HandleStreams處理client發(fā)送的stream
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
})
wg.Wait()
}
----------------------------transport/http2Server.HanleStreams()分析----------------------------
func (t *http2Server) HandleStreams(handle func(*Stream)) {
// 檢查client 發(fā)送的preface是否合法
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
t.Close()
return
}
if !bytes.Equal(preface, clientPreface) {
grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
t.Close()
return
}
frame, err := t.framer.readFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
if err != nil {
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
//讀取client發(fā)送的SettingFrame
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
t.Close()
return
}
//根據(jù)SettingFrame的內(nèi)容進(jìn)行設(shè)置
t.handleSettings(sf)
//讀取client發(fā)送的request內(nèi)容
for {
frame, err := t.framer.readFrame()
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s)
}
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
//t.operateHeaders函數(shù)解碼headers內(nèi)容,并將傳輸該frame的stream進(jìn)行記錄
//函數(shù)實(shí)現(xiàn)包括根據(jù)stream攜帶的callHdr信息,如何路由到grpc.Server中注冊server具體實(shí)現(xiàn)method的過程
//函數(shù)實(shí)現(xiàn)-transport/http2_server.go operateHeader()函數(shù)
if t.operateHeaders(frame, handle) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
default:
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
buf := newRecvBuffer()
//保存client傳輸?shù)膕tream信息
s := &Stream{
id: frame.Header().StreamID,
st: t,
buf: buf,
fc: &inFlow{limit: initialWindowSize},
}
var state decodeState
for _, hf := range frame.Fields {
state.processHeaderField(hf)
}
if err := state.err; err != nil {
if se, ok := err.(StreamError); ok {
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
}
return
}
if frame.StreamEnded() {
s.state = streamReadDone
}
s.recvCompress = state.encoding
if state.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(context.TODO())
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
//對(duì)stream的合法性進(jìn)行檢查
if s.id%2 != 1 || s.id <= t.maxStreamID {
t.mu.Unlock()
grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
return true
}
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
//調(diào)用server.go serveStreams()傳入的handle去處理server端接收的stream
//handle()會(huì)調(diào)用server.go handleStream()路由到server端真正實(shí)現(xiàn)的函數(shù)
handle(s)
return
}
//handleData處理server端接收到數(shù)據(jù)幀
func (t *http2Server) handleData(f *http2.DataFrame) {
size := len(f.Data())
//檢查transport的流控
if err := t.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
return
}
s, ok := t.getStream(f)
if !ok {
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
//檢查stream的流控
if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
return
}
if err := s.fc.onData(uint32(size)); err != nil {
s.mu.Unlock()
t.closeStream(s)
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
s.mu.Unlock()
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock()
if s.state != streamDone {
s.state = streamReadDone
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
}
以上源碼分析一次gRPC調(diào)用,從client端如何發(fā)送請求到grpc.server端如何路由到server端注冊函數(shù)的所有過程。
問題總結(jié):
1.grpc的http/2的stream流是如何變化的?
答:unary模式的stream的創(chuàng)建、刪除都是由gRPC控制的,剩下的三種模式是將stream的很多操作暴露給用戶層,由用戶自行控制,但sendRequset和recvResponse的流程和unary模式處理相同。筆者測試發(fā)現(xiàn)grpc用到的都是client端的stream,server端的stream在gRPC中并未使用。client端發(fā)起的stream都是基數(shù)開始的,并且最大值為2^31-1,如果client的streamID超過限制,server端會(huì)斷開與client的連接。測試結(jié)果如下:
//2^31的最大取值2147483648
client stream id 2147483649
2017/08/04 10:44:17 transport: http2Client.notifyError got notified that the client transport was broken invalid stream ID.
2017/08/04 10:44:17 &{0xc4201787e0}.RouteChat(_) = _, rpc error: code = 13 desc = transport: invalid stream ID
exit status 1