【gRPC】基礎(chǔ)教程 | GO

  • 在 .proto 文件定義一個 service
  • 使用 protocol buffer 編譯器生成服務(wù)端、客戶端代碼
  • 使用 Go gRPC API 為你的 service 編寫一個簡單的客戶端、服務(wù)端

獲取示例代碼

$ git clone -b v1.46.0 --depth 1 https://github.com/grpc/grpc-go
$ cd grpc-go/examples/route_guide

定義 service、message

  1. 我們的第一步是使用 protocol buffers 定義 gRPC 服務(wù)以及方法的請求和響應(yīng)類型。
    要定義服務(wù),請在 .proto 文件中指定命名 service
service RouteGuide {
  ...
}
  1. 然后在服務(wù)定義中定義 rpc 方法,指定它們的請求和響應(yīng)類型。
    gRPC 允許您定義四種服務(wù)方法,它們都在 RouteGuide 服務(wù)中使用:

    • 一個簡單的 RPC,其中客戶端使用存根向服務(wù)器發(fā)送請求并等待響應(yīng)返回,就像正常的函數(shù)調(diào)用一樣。

      // 獲取給定位置的特征
      rpc GetFeature(Point) returns (Feature) {}
      
    • 服務(wù)器端流式 RPC??蛻舳讼蚍?wù)器發(fā)送請求,并獲取流以讀回一系列消息。
      客戶端從返回的流中讀取,直到?jīng)]有更多消息為止。
      正如您在我們的示例中所看到的,您可以通過將 stream 關(guān)鍵字放在響應(yīng)類型之前,來指定服務(wù)器端流方法。

      // 獲取給定矩形內(nèi)可用的特征。 
      // 結(jié)果是流式傳輸而不是立即返回(例如,在具有重復(fù)字段的響應(yīng)消息中),因為矩形可能覆蓋大面積并包含大量特征。
      rpc ListFeatures(Rectangle) returns (stream Feature) {}
      
    • 客戶端流式 RPC,其中客戶端寫入一系列消息并將它們發(fā)送到服務(wù)器,再次使用提供的流。
      一旦客戶端完成了消息的寫入,它會等待服務(wù)器讀取所有消息并返回其響應(yīng)。
      可以通過將 stream 關(guān)鍵字放在請求類型之前來指定客戶端流式處理方法。

      // Accepts a stream of Points on a route being traversed, returning a
      // RouteSummary when traversal is completed.
      rpc RecordRoute(stream Point) returns (RouteSummary) {}
      
    • 雙向流式 RPC,雙方使用讀寫流發(fā)送一系列消息。
      這兩個流獨立運行,因此客戶端和服務(wù)器可以按照他們喜歡的任何順序讀取和寫入。
      例如,服務(wù)器可以在寫入響應(yīng)之前等待接收所有客戶端消息,或者它可以交替讀取消息然后寫入消息, 或其他一些讀取和寫入的組合。 保留每個流中消息的順序。
      您可以通過在請求和響應(yīng)之前放置 stream 關(guān)鍵字來指定這種類型的方法。

      rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
      
  2. 我們的 .proto 文件還包含我們 service 方法中使用的 所有請求和響應(yīng)類型的 protocol buffer message 類型定義。
    例如,這里是 Point message 類型:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客戶端、服務(wù)端代碼

在 examples/route_guide 目錄中,運行以下命令:

$ protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    routeguide/route_guide.proto

運行此命令會在 routeguide 目錄中生成以下文件:

  • route_guide.pb.go
    其中包含用于填充、序列化、檢索請求和響應(yīng)消息類型 的所有 protocol buffer 代碼
  • route_guide_grpc.pb.go
    • 一個接口類型(或存根),包含客戶端可以調(diào)用的 RouteGuide 服務(wù)中定義的方法。
    • 服務(wù)端要實現(xiàn)的接口類型,包含 RouteGuide 服務(wù)中定義的方法。

創(chuàng)建 server

讓我們的 RouteGuide 服務(wù)完成它的工作,有兩個部分:

  • 實現(xiàn) proto 服務(wù)定義中生成的服務(wù)接口:doing the actual “work” of our service.
  • 運行 gRPC 服務(wù)器 以偵聽來自客戶端的請求,并將它們分派到正確的服務(wù)實現(xiàn)。

可以在 server/server.go 中找到示例 RouteGuide 服務(wù)器。 讓我們仔細(xì)看看它是如何工作的。

實現(xiàn) RouteGuide

如您所見,我們的服務(wù)器有一個 routeGuideServer 結(jié)構(gòu)類型,它實現(xiàn)了生成的 RouteGuideServer 接口:

type routeGuideServer struct {
        ...
}
...

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
        ...
}
...

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}
...

簡單 RPC

routeGuideServer 實現(xiàn)了我們所有的服務(wù)方法。
我們先來看最簡單的類型:GetFeature,它只是從客戶端獲取一個Point,并從其數(shù)據(jù)庫中返回對應(yīng)的特征信息,通過 Feature 返回。

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
  for _, feature := range s.savedFeatures {
    if proto.Equal(feature.Location, point) {
      return feature, nil
    }
  }
  // No feature was found, return an unnamed feature
  return &pb.Feature{Location: point}, nil
}

服務(wù)端流 RPC

現(xiàn)在讓我們看看我們的一個流式 RPC。
ListFeatures 是一個服務(wù)器端流式 RPC,因此我們需要將多個 Feature 發(fā)送回我們的客戶端。

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

正如你所看到的,這次我們沒有在我們的方法參數(shù)中獲取簡單的請求和響應(yīng)對象,而是獲取了一個請求對象(Rectangle,我們的客戶想要在其中找到特征)和一個特殊的 RouteGuide_ListFeaturesServer 對象來寫我們的響應(yīng)。

在該方法中,我們根據(jù)需要返回填充盡可能多的 Feature 對象,并使用其 Send() 方法將它們寫入 RouteGuide_ListFeaturesServer。 最后,就像在我們的簡單 RPC 中一樣,我們返回一個 nil 錯誤來告訴 gRPC 我們已經(jīng)完成了響應(yīng)的編寫。
如果此調(diào)用發(fā)生任何錯誤,我們將返回一個非nil錯誤; gRPC 層會將其轉(zhuǎn)換為適當(dāng)?shù)?RPC 狀態(tài)進(jìn)行發(fā)送。

客戶端流 RPC

現(xiàn)在讓我們看一些更復(fù)雜的東西:客戶端流式傳輸方法 RecordRoute。
我們從客戶端獲取 Point 流,并返回一個包含旅行信息的 RouteSummary。

如您所見,這一次該方法根本沒有請求參數(shù)。 相反,它獲取一個 RouteGuide_RecordRouteServer 流,服務(wù)器可以使用它來讀取和寫入消息。它可以使用其 Recv() 方法接收客戶端消息,并使用其 SendAndClose() 方法返回其單個響應(yīng)。

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

在方法體中,我們使用 RouteGuide_RecordRouteServer 的 Recv() 方法將客戶端的請求重復(fù)讀入到請求對象(在本例中為 Point),直到?jīng)]有更多消息:每次調(diào)用后,服務(wù)器需要檢查從 Recv() 返回的錯誤 。

  • 如果這是 nil,流仍然是好的,它可以繼續(xù)閱讀;
  • 如果是 io.EOF,則消息流已經(jīng)結(jié)束,服務(wù)器可以返回其 RouteSummary;
  • 如果它有任何其他值,我們會“按原樣”返回錯誤,以便 gRPC 層將其轉(zhuǎn)換為 RPC 狀態(tài)。

雙向流 RPC

最后,讓我們看看我們的雙向流式 RPC RouteChat()。

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)
                ... // look for notes to be sent to client
    for _, note := range s.routeNotes[key] {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

這次我們得到一個 RouteGuide_RouteChatServer 流,就像在我們的客戶端流示例中一樣,可用于讀取和寫入消息。 但是,這一次我們通過方法的流返回值,而客戶端仍在將消息寫入其消息流。

這里的讀寫語法與我們的客戶端流方法非常相似,除了服務(wù)器使用流的 Send() 方法而不是 SendAndClose() 方法,因為它正在寫入多個響應(yīng)。
盡管每一方總是按照寫入的順序獲取對方的消息,但客戶端和服務(wù)器都可以按任何順序讀取和寫入——流完全獨立運行。

啟動服務(wù)端

一旦我們實現(xiàn)了所有方法,我們還需要啟動一個 gRPC 服務(wù)器,以便客戶端可以實際使用我們的服務(wù)。 以下片段顯示了我們?nèi)绾螢?RouteGuide 服務(wù)執(zhí)行此操作:

flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
  1. 指定我們要用來監(jiān)聽客戶端請求的端口:
    lis, err := net.Listen(...).

  2. 使用 grpc.NewServer(...) 創(chuàng)建一個 gRPC 服務(wù)器實例。

  3. 向 gRPC 服務(wù)器注冊我們的服務(wù)實現(xiàn)

  4. 在服務(wù)器上調(diào)用 Serve() 以進(jìn)行阻塞等待,直到進(jìn)程被殺死或調(diào)用 Stop()

創(chuàng)建 client

在本節(jié)中,我們將著眼于為我們的 RouteGuide 服務(wù)創(chuàng)建一個 Go 客戶端。
可以在 grpc-go/examples/route_guide/client/client.go 中看到完整的客戶端示例代碼

創(chuàng)建 stub

要調(diào)用服務(wù)方法,我們首先需要創(chuàng)建一個 gRPC 通道來與服務(wù)器通信。
我們通過將服務(wù)器地址和端口號傳遞給 grpc.Dial() 來創(chuàng)建它,如下所示:

var opts []grpc.DialOption
...
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
  ...
}
defer conn.Close()

當(dāng)服務(wù)需要時,您可以使用 DialOptions 在 grpc.Dial 中設(shè)置身份驗證憑據(jù)(例如,TLS、GCE 憑據(jù)或 JWT 憑據(jù))。 RouteGuide 服務(wù)不需要任何憑據(jù)。

設(shè)置 gRPC 通道后,我們需要一個客戶端存根來執(zhí)行 RPC。
我們使用從示例 .proto 文件生成的 pb 包 提供的 NewRouteGuideClient 方法獲取它。

client := pb.NewRouteGuideClient(conn)

調(diào)用服務(wù)方法

現(xiàn)在讓我們看看我們?nèi)绾握{(diào)用我們的服務(wù)方法。
請注意,在 gRPC-Go 中,RPC 以 阻塞/同步 模式運行,這意味著 RPC 調(diào)用等待服務(wù)器響應(yīng),并且將返回響應(yīng)或錯誤。

簡單 RPC

調(diào)用簡單的 RPC GetFeature 幾乎與調(diào)用本地方法一樣簡單。

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
  ...
}

如您所見,我們在之前獲得的 存根 上調(diào)用該方法。
在我們的方法參數(shù)中,我們創(chuàng)建并填充了一個請求 protocol buffer 對象(在我們的例子中是 Point)。 我們還傳遞了一個 context.Context 對象,它允許我們在必要時更改 RPC 的行為,例如 timeout / cancel 運行中的 RPC。
如果調(diào)用沒有返回錯誤,那么我們可以從第一個返回值中讀取服務(wù)器的響應(yīng)信息。

log.Println(feature)

服務(wù)端流 RPC

這里是我們調(diào)用服務(wù)器端流方法 ListFeatures 的地方,它返回地理特征流。

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}

就像在簡單的 RPC 中一樣,我們向方法傳遞一個上下文和一個請求。 但是,我們沒有返回響應(yīng)對象,而是返回 RouteGuide_ListFeaturesClient 的實例。 客戶端可以使用 RouteGuide_ListFeaturesClient 流來讀取服務(wù)器的響應(yīng)。

我們使用 RouteGuide_ListFeaturesClient 的 Recv() 方法重復(fù)讀入服務(wù)器的響應(yīng)(在本例中為 Feature),直到?jīng)]有更多消息:客戶端需要檢查每次從 Recv() 返回的錯誤 err 。

  • 如果為 nil,則流仍然是好的,它可以繼續(xù)讀;
  • 如果是 io.EOF 則消息流結(jié)束;
  • 否則一定有RPC錯誤,通過err傳遞過來。

客戶端流 RPC

客戶端流方法 RecordRoute 類似于服務(wù)器端方法,不同之處在于我們只向該方法傳遞一個上下文并返回一個 RouteGuide_RecordRouteClient 流,我們可以使用它來寫入和讀取消息。

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
  log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
  if err := stream.Send(point); err != nil {
    log.Fatalf("%v.Send(%v) = %v", stream, point, err)
  }
}
reply, err := stream.CloseAndRecv()
if err != nil {
  log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)

RouteGuide_RecordRouteClient 有一個 Send() 方法,我們可以使用它向服務(wù)器發(fā)送請求。
一旦我們使用 Send() 將客戶端的請求寫入流中,我們需要在流上調(diào)用 CloseAndRecv() 來讓 gRPC 知道我們已經(jīng)完成了寫入并期待收到響應(yīng)。
我們從 CloseAndRecv() 返回的錯誤中獲取 RPC 狀態(tài)。 如果狀態(tài)為 nil,則 CloseAndRecv() 的第一個返回值將是有效的服務(wù)器響應(yīng)。

雙向流 RPC

最后,讓我們看看我們的雙向流式 RPC RouteChat()。
與 RecordRoute 的情況一樣,我們只向該方法傳遞一個上下文對象,并返回一個我們可以用來寫入和讀取消息的流。 但是,這一次我們通過方法的流 返回值,而服務(wù)器仍在將消息寫入其消息流。

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      // read done.
      close(waitc)
      return
    }
    if err != nil {
      log.Fatalf("Failed to receive a note : %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
for _, note := range notes {
  if err := stream.Send(note); err != nil {
    log.Fatalf("Failed to send a note: %v", err)
  }
}
stream.CloseSend()
<-waitc

這里的讀寫語法與我們的客戶端流方法非常相似,只是我們在完成調(diào)用后使用流的 CloseSend() 方法。
盡管每一方總是按照寫入的順序獲取對方的消息,但客戶端和服務(wù)器都可以按任何順序讀取和寫入——流完全獨立運行。

測試

  1. 運行server
go run server/server.go
  1. 運行客戶端
go run client/client.go
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容