gRPC-go服務(wù)發(fā)現(xiàn)&負(fù)載均衡

前言

以下示例基于 https://github.com/grpc/grpc-go v1.30.0,關(guān)于proto文件定義,代碼生成參考gRPC 官方文檔中文版

client

grpc使用的是客戶端負(fù)載均衡模式,每次新建連接的時候會根據(jù)負(fù)載均衡算法選出服務(wù)端的IP然后建立連接?,F(xiàn)在grpc默認(rèn)支持兩種算法pick_first(第一次地址) 和 round_robin(輪詢)
pick_first:pick_first每次都是嘗試連接第一個地址,如果連接失敗就會嘗試下一個,直到連接成功為止,之后的RPC請求都會使用這個連接
round_robin:round_robin會對每個地址建立連接,之后的RPC請求會依次通過這些連接發(fā)送到后端

客戶端新建一個連接

conn, err := grpc.Dial(
        fmt.Sprintf("%s:///%s", "game", baseService),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        //grpc.WithUnaryInterceptor(unaryClientInterceptor),
        //grpc.WithBlock(),
               //grpc.WithCompressor  Deprecated
    )

客戶端每次發(fā)起請求都需要通過grpc.dail創(chuàng)建一個ClientConn,然后通過ClientConn.XXXX發(fā)送請求。

建立連接的各項參數(shù):
grpc.WithInsecure:禁用傳輸認(rèn)證,沒有這個選項必須設(shè)置一種認(rèn)證方式
grpc.WithCompressor:在grpc.Dial參數(shù)中設(shè)置壓縮的方式將要被廢棄,推薦使用UseCompressor

        grpc.UseCompressor(gzip.Name)
        conn, err := grpc.Dial(
              //...
        )

PS:壓縮方式客戶端應(yīng)該和服務(wù)端對應(yīng)

grpc.WithBlock():grpc.Dial默認(rèn)建立連接是異步的,加了這個參數(shù)后會等待所有連接建立成功后再返回

grpc.WithUnaryInterceptor:一元攔截器,適用于普通rpc連接,相應(yīng)的還有流攔截器。攔截器只有第一個生效,所以一般設(shè)置一個。攔截器是對請求的一次封裝,客戶端和服務(wù)端都可以設(shè)置攔截器,請求的發(fā)送/執(zhí)行都是在攔截器內(nèi)操作的,所以在請求的前后都可以嵌入用戶自定義的代碼,類似hook

//客戶端攔截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    var credsConfigured bool
    for _, o := range opts {
        _, ok := o.(grpc.PerRPCCredsCallOption)
        if ok {
            credsConfigured = true
            break
        }
    }
    if !credsConfigured {
        opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
            AccessToken: fallbackToken,
        })))
    }
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    end := time.Now()
    logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err)
    return err
}
//服務(wù)端攔截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // authentication (token verification)
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, errMissingMetadata
    }
    if !valid(md["authorization"]) {
        return nil, errInvalidToken
    }
    m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }
    return m, err
}

grpc.WithDefaultServiceConfig: 舊的版本可以通過grpc.RoundRobin(),和grpc.WithBalancer()來設(shè)置負(fù)載均衡,這個版本grpc.RoundRobin()已經(jīng)取消了,grpc.WithBalancer()和grpc. 也WithBalancerName()標(biāo)記為廢棄。現(xiàn)在改為讀取外部配置,主要是方便服務(wù)啟動后動態(tài)更新(設(shè)計初衷應(yīng)該是主要用在服務(wù)端)

//service config example
{
  "loadBalancingConfig": [ { "round_robin": {} } ],
  "methodConfig": [
    {
      "name": [
        { "service": "foo", "method": "bar" },
        { "service": "baz" }
      ],
      "timeout": "1.0000000001s"
    }
  ]
}
   grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name))

可以這樣設(shè)置BalancingPolicy

target: grpc.Dial:的第一個參數(shù),這個參數(shù)的主要作用的通過它來找到對應(yīng)的服務(wù)端地址,target傳入是一個字符串,統(tǒng)一格式為scheme://authority/endpoint,然后通過以下方式解析為Target struct

type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
}

func parseTarget(target string) (ret resolver.Target) {
    var ok bool
    ret.Scheme, ret.Endpoint, ok = split2(target, "://")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    return ret
}

解析target的時候有以下幾種情況:

  • 當(dāng)前參數(shù)有沒有直接設(shè)置resolverBuilder,如果設(shè)置了,直接設(shè)置Endpoint=target
  • 如果未直接設(shè)置resolverBuilder,則通過Scheme來找到resolverBuilder
  • 如果通過Scheme沒有找到resolverBuilder,resolverBuilder為默認(rèn)的dns builder,設(shè)置
    Endpoint=target

所以,真正獲取IP地址是通過resolverBuilder這個接口

type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}

Build():為給定目標(biāo)創(chuàng)建一個新的resolver,當(dāng)調(diào)用grpc.Dial()時執(zhí)行。
Scheme():返回此resolver方案的名稱

type Resolver interface {
    ResolveNow(ResolveNowOptions)
    Close()
}

ResolveNow():被 gRPC 調(diào)用,以嘗試再次解析目標(biāo)名稱。只用于提示,可忽略該方法。
Close方法:關(guān)閉resolver

下面我們看一個示例


func init() {
    resolver.Register(&exampleResolverBuilder{})  
/*
//注冊的時候?qū)cheme => builder保存到m
func Register(b Builder) {
    m[b.Scheme()] = b
}
*/
}

const (
    exampleScheme      = "example"
    exampleServiceName = "lb.example.grpc.io"
)

var addrs = []string{"localhost:50051", "localhost:50052"}

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    r := &exampleResolver{
        target: target,
        cc:     cc,
        addrsStore: map[string][]string{
            exampleServiceName: addrs,
        },
    }
    r.start()
    return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) start() {
    addrStrs := r.addrsStore[r.target.Endpoint]
    addrs := make([]resolver.Address, len(addrStrs))
    for i, s := range addrStrs {
        addrs[i] = resolver.Address{Addr: s}
    }
    r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

func main() {
//...
roundrobinConn, err := grpc.Dial(
        // Target{Scheme:exampleScheme,Endpoint:exampleServiceName}
        fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
//...
}

grpc.Dial() 會調(diào)用Scheme=>builder 的Build() 方法,之后調(diào)用r.start()

        r.cc.UpdateState(resolver.State{Addresses: addrs})

UpdateState()將addr更新到cc,也就是外部的連接中,供其他接口使用。

server

server相對來說啟動比較簡單,一般都會加攔截器來獲取matedata或者去recover() panic,又或者打印一些日志

        grpc.UseCompressor(gzip.Name)
        s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor))
//...

matedata: matedata是一個map[string][]string的結(jié)構(gòu),用來在客戶端和服務(wù)器之間傳輸數(shù)據(jù)。其中的一個作用是可以傳遞分布式調(diào)用環(huán)境中的鏈路id,方便跟蹤調(diào)試。另外也可以傳一些業(yè)務(wù)相關(guān)的數(shù)據(jù)

客戶端攔截器中設(shè)置metedata

        md := metadata.Pairs("XXX_id",xxxID, "YYY_id", yyyID)
        mdOld, _ := metadata.FromIncomingContext(ctx)
        md = metadata.Join(mdOld, md)
        ctx = metadata.NewOutgoingContext(ctx, md)
          //...
       invoker(ctx, method, req, reply, cc, opts...)

服務(wù)端攔截器獲取metadata

    var xxxID,yyyID
    md, _ := metadata.FromIncomingContext(ctx)
    if arr := md["XXX_id"]; len(arr) > 0 {
        xxxID = arr[0]
    }
    if arr := md["YYY_id"]; len(arr) > 0 {
        yyyID = arr[0]
    }
        m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }

在server啟動之后,需要將這個服務(wù)注冊到etcd 。
用etcd3在編譯的時候出現(xiàn)了和groc-go版本不兼容的問題

首先當(dāng)前用的etcd 版本是 3.4.9,支持的grpc-go最高版本是v1.26.0,于是需要將grpc-go降級
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
降級之后之前生成的proto.pb.go 又出現(xiàn)了錯誤,于是將protobuf降級
replace github.com/golang/protobuf => github.com/golang/protobuf v1.2.0
以上的問題網(wǎng)上其他人也遇到過,下面的這個不清楚是我本地環(huán)境有問題還是其他原因
報錯原因是 google.golang.org/genproto這個包下面生成的proto.pb.go里面指定了protobuf1.4的版本變量,解決辦法還是降級,版本號是在$GOPATH/pkg/mod/... 下面找到的
replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8

關(guān)于etcd的內(nèi)容之后再整理吧。

小結(jié)

結(jié)合etcd 的watch功能,很容易檢測某一個路徑節(jié)點的變化,如果,server端注冊兩個服務(wù)到etcd
key = /project/service/user/1 val = 127.0.0.1:9999
key = /project/service/user/2 val = 127.0.0.1:9998

在客戶端,如果我們自定義了一個名叫example的resolverBuilder,
同時開啟一個watch協(xié)程 ,監(jiān)測/project/service下面的節(jié)點,動態(tài)維護Build()中addrsStore,這個時候我們設(shè)置addrsStore[user] = {127.0.0.1:9999,127.0.0.1:9998}。

然后在客戶端grpc.Dai中令target = example:///user
那么在r.start()中就可以獲取到 {127.0.0.1:9999,127.0.0.1:9998}(具體可以看上面示例中r.start()方法)

server注冊的key,Build()中addrsStore中的key,以及target 后面的endPoint 的不同選擇可以實現(xiàn)不通粒度的服務(wù)劃分。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容