Golang grpc server響應(yīng)請求

朋友偶然問了一句,golang的grpc在接到請求后是如何調(diào)到對應(yīng)的實(shí)現(xiàn)函數(shù)的?

當(dāng)時(shí)對著代碼講了一通。后來想想覺得這是個(gè)好問題,寫下來記錄一下。

注冊:

func main() {
    listen, err := net.Listen("tcp", ":2008")
    if err != nil {
        fmt.Println("net.Listen tcp :2008 err", err)
        return
    }
    s := grpc.NewServer()
    hServer := xxx.Server{}
        xxxxxx.RegisterXXXXXXServer(s, &hServer)
    s.Serve(listen)
}

golang起grpc server的代碼很簡單,一個(gè)proto接口的實(shí)現(xiàn) :xxx.Server{},然后register server,listen就可以了

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
   ht := reflect.TypeOf(sd.HandlerType).Elem()
   st := reflect.TypeOf(ss)
   if !st.Implements(ht) {
      grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
   }
   s.register(sd, ss)
}

這是grpc server register的入口,首先對服務(wù)接口定義ServiceDesc和接口實(shí)現(xiàn)ss取type,判斷ss是否實(shí)現(xiàn)了ServiceDesc中的接口

type service struct {
   server interface{} // the server for service methods
   md     map[string]*MethodDesc
   sd     map[string]*StreamDesc
   mdata  interface{}
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
   s.mu.Lock()
   defer s.mu.Unlock()
   s.printf("RegisterService(%q)", sd.ServiceName)
   if s.serve {
      grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
   }
   if _, ok := s.m[sd.ServiceName]; ok {
      grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
   }
   srv := &service{
      server: ss,
      md:     make(map[string]*MethodDesc),
      sd:     make(map[string]*StreamDesc),
      mdata:  sd.Metadata,
   }
   for i := range sd.Methods {
      d := &sd.Methods[i]
      srv.md[d.MethodName] = d
   }
   for i := range sd.Streams {
      d := &sd.Streams[i]
      srv.sd[d.StreamName] = d
   }
   s.m[sd.ServiceName] = srv
}

service 結(jié)構(gòu)中的server就是實(shí)現(xiàn)了接口定義的實(shí)體,就是我們的響應(yīng)服務(wù)。
md 保存了響應(yīng)func的映射"MethodName -- func"
sd 保存了rpc流服務(wù)的映射"StreamName -- func"
mdata 保存了proto的位置

響應(yīng):

func (s *Server) Serve(lis net.Listener) error {
    // ......
    ls := &listenSocket{Listener: lis}
    s.lis[ls] = true
    // ......
    var tempDelay time.Duration // how long to sleep on accept failure

    for {
        rawConn, err := lis.Accept()
        if err != nil {
            // ......
        }
        tempDelay = 0
        // Start a new goroutine to deal with rawConn so we don't stall this Accept
        // loop goroutine.
        //
        // Make sure we account for the goroutine so GracefulStop doesn't nil out
        // s.conns before this conn can be added.
        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }
}
func (s *Server) handleRawConn(rawConn net.Conn) {
    rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
    conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    if err != nil {
        // ......
        return
    }

    s.mu.Lock()
    if s.conns == nil {
        s.mu.Unlock()
        conn.Close()
        return
    }
    s.mu.Unlock()

    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(conn, authInfo)
    if st == nil {
        return
    }

    rawConn.SetDeadline(time.Time{})
    if !s.addConn(st) {
        return
    }
    go func() {
        s.serveStreams(st)
        s.removeConn(st)
    }()
}

servers監(jiān)聽端口listenSocket,Accept到請求后起goroutine 處理。然后把 建立http2鏈接。這里的ServerTransport是指 所有g(shù)RPC服務(wù)器端傳輸?shù)耐ㄓ媒涌趯?shí)現(xiàn)。
然后起goroutine 繼續(xù)serveStreams

func (s *Server) serveStreams(st transport.ServerTransport) {
    defer st.Close()
    var wg sync.WaitGroup
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }, func(ctx context.Context, method string) context.Context {
        if !EnableTracing {
            return ctx
        }
        tr := trace.New("grpc.Recv."+methodFamily(method), method)
        return trace.NewContext(ctx, tr)
    })
    wg.Wait()
}

func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    defer close(t.readerDone)
    for {
        frame, err := t.framer.fr.ReadFrame()
        atomic.StoreUint32(&t.activity, 1)
        if err != nil {
            if se, ok := err.(http2.StreamError); ok {
                warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
                t.mu.Lock()
                s := t.activeStreams[se.StreamID]
                t.mu.Unlock()
                if s != nil {
                    t.closeStream(s, true, se.Code, false)
                } else {
                    t.controlBuf.put(&cleanupStream{
                        streamID: se.StreamID,
                        rst:      true,
                        rstCode:  se.Code,
                        onWrite:  func() {},
                    })
                }
                continue
            }
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                t.Close()
                return
            }
            warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
            t.Close()
            return
        }
        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
        case *http2.DataFrame:
            t.handleData(frame)
        case *http2.RSTStreamFrame:
            t.handleRSTStream(frame)
        case *http2.SettingsFrame:
            t.handleSettings(frame)
        case *http2.PingFrame:
            t.handlePing(frame)
        case *http2.WindowUpdateFrame:
            t.handleWindowUpdate(frame)
        case *http2.GoAwayFrame:
            // TODO: Handle GoAway from the client appropriately.
        default:
            errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
        }
    }
}

st.HandleStreams中解析了 ServerTransport中的frame,traceCtx將trace附加到ctx并返回新上下文,調(diào)用s.handleStream處理請求(s.handleStream中是真正調(diào)用服務(wù)響應(yīng)函數(shù)地方)

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    sm := stream.Method()
    if sm != "" && sm[0] == '/' {
        sm = sm[1:]
    }
    pos := strings.LastIndex(sm, "/")
    if pos == -1 {
        if trInfo != nil {
            trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
            trInfo.tr.SetError()
        }
        errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
        if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
            if trInfo != nil {
                trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
                trInfo.tr.SetError()
            }
            grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
        }
        if trInfo != nil {
            trInfo.tr.Finish()
        }
        return
    }
    service := sm[:pos]
    method := sm[pos+1:]

    srv, knownService := s.m[service]
    if knownService {
        if md, ok := srv.md[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
        if sd, ok := srv.sd[method]; ok {
            s.processStreamingRPC(t, stream, srv, sd, trInfo)
            return
        }
    }
    // Unknown service, or known server unknown method.
    if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
        s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
        return
    }
    var errDesc string
    if !knownService {
        errDesc = fmt.Sprintf("unknown service %v", service)
    } else {
        errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
    }
    if trInfo != nil {
        trInfo.tr.LazyPrintf("%s", errDesc)
        trInfo.tr.SetError()
    }
    if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
        if trInfo != nil {
            trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
            trInfo.tr.SetError()
        }
        grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
    }
    if trInfo != nil {
        trInfo.tr.Finish()
    }
}

server.handleStream中根據(jù)ServerTransport中帶來的server name, method name在最開始注冊時(shí)記錄的map中找到對應(yīng)的handle func 執(zhí)行processUnaryRPC(如果是流服務(wù) 那么會(huì)執(zhí)行processStreamingRPC)。

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
    //...
      這里有一大段代碼,都是在執(zhí)行數(shù)據(jù)的comp/decomp操作
    //..
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
    if appErr != nil {
        appStatus, ok := status.FromError(appErr)
        if !ok {
            // Convert appErr if it is not a grpc status error.
            appErr = status.Error(codes.Unknown, appErr.Error())
            appStatus, _ = status.FromError(appErr)
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
            trInfo.tr.SetError()
        }
        if e := t.WriteStatus(stream, appStatus); e != nil {
            grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
        }
        if binlog != nil {
            if h, _ := stream.Header(); h.Len() > 0 {
                // Only log serverHeader if there was header. Otherwise it can
                // be trailer only.
                binlog.Log(&binarylog.ServerHeader{
                    Header: h,
                })
            }
            binlog.Log(&binarylog.ServerTrailer{
                Trailer: stream.Trailer(),
                Err:     appErr,
            })
        }
        return appErr
    }
    if trInfo != nil {
        trInfo.tr.LazyLog(stringer("OK"), false)
    }
    opts := &transport.Options{Last: true}

    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
        if err == io.EOF {
            // The entire stream is done (for unary RPC only).
            return err
        }
        if s, ok := status.FromError(err); ok {
            if e := t.WriteStatus(stream, s); e != nil {
                grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
            }
        } else {
            switch st := err.(type) {
            case transport.ConnectionError:
                // Nothing to do here.
            default:
                panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
            }
        }
        if binlog != nil {
            h, _ := stream.Header()
            binlog.Log(&binarylog.ServerHeader{
                Header: h,
            })
            binlog.Log(&binarylog.ServerTrailer{
                Trailer: stream.Trailer(),
                Err:     appErr,
            })
        }
        return err
    }
    if binlog != nil {
        h, _ := stream.Header()
        binlog.Log(&binarylog.ServerHeader{
            Header: h,
        })
        binlog.Log(&binarylog.ServerMessage{
            Message: reply,
        })
    }
    if channelz.IsOn() {
        t.IncrMsgSent()
    }
    if trInfo != nil {
        trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
    }
    // TODO: Should we be logging if writing status failed here, like above?
    // Should the logging be in WriteStatus?  Should we ignore the WriteStatus
    // error or allow the stats handler to see it?
    err = t.WriteStatus(stream, status.New(codes.OK, ""))
    if binlog != nil {
        binlog.Log(&binarylog.ServerTrailer{
            Trailer: stream.Trailer(),
            Err:     appErr,
        })
    }
    return err
}

上面省略了一大段構(gòu)造context,decomp的代碼。
經(jīng)過一系列context的構(gòu)造,decomp 終于到了調(diào)請求對應(yīng)實(shí)現(xiàn)方法的地方了:reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt),返回的reply就是給client的data。最下面WriteStatus codes.OK就是client _stub函數(shù)返回的status.ok()了。

本來想畫個(gè)圖的,但是最近時(shí)間有點(diǎn)緊,等有空了再補(bǔ)吧,就當(dāng)又溫習(xí)一遍??
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容