grpc服務(wù)發(fā)現(xiàn)&負(fù)載均衡

構(gòu)建高可用、高性能的通信服務(wù),通常采用服務(wù)注冊(cè)與發(fā)現(xiàn)、負(fù)載均衡和容錯(cuò)處理等機(jī)制實(shí)現(xiàn)。根據(jù)負(fù)載均衡實(shí)現(xiàn)所在的位置不同,通??煞譃橐韵氯N解決方案: # 1、集中式LB(Proxy Model) ![](https://upload-images.jianshu.io/upload_images/28338950-8a65c6dd35ec6f80.png) 在服務(wù)消費(fèi)者和服務(wù)提供者之間有一個(gè)獨(dú)立的LB,通常是專門(mén)的硬件設(shè)備如 F5,或者基于軟件如 LVS,HAproxy等實(shí)現(xiàn)。LB上有所有服務(wù)的地址映射表,通常由運(yùn)維配置注冊(cè),當(dāng)服務(wù)消費(fèi)方調(diào)用某個(gè)目標(biāo)服務(wù)時(shí),它向LB發(fā)起請(qǐng)求,由LB以某種策略,比如輪詢(Round-Robin)做負(fù)載均衡后將請(qǐng)求轉(zhuǎn)發(fā)到目標(biāo)服務(wù)。LB一般具備健康檢查能力,能自動(dòng)摘除不健康的服務(wù)實(shí)例。 該方案主要問(wèn)題: 1. 單點(diǎn)問(wèn)題,所有服務(wù)調(diào)用流量都經(jīng)過(guò)LB,當(dāng)服務(wù)數(shù)量和調(diào)用量大的時(shí)候,LB容易成為瓶頸,且一旦LB發(fā)生故障影響整個(gè)系統(tǒng); 2. 服務(wù)消費(fèi)方、提供方之間增加了一級(jí),有一定性能開(kāi)銷。 # 2、進(jìn)程內(nèi)LB(Balancing-aware Client) ![](https://upload-images.jianshu.io/upload_images/28338950-f1aa15d3f63ac68e.png) 針對(duì)第一個(gè)方案的不足,此方案將LB的功能集成到服務(wù)消費(fèi)方進(jìn)程里,也被稱為軟負(fù)載或者客戶端負(fù)載方案。服務(wù)提供方啟動(dòng)時(shí),首先將服務(wù)地址注冊(cè)到服務(wù)注冊(cè)表,同時(shí)定期報(bào)心跳到服務(wù)注冊(cè)表以表明服務(wù)的存活狀態(tài),相當(dāng)于健康檢查,服務(wù)消費(fèi)方要訪問(wèn)某個(gè)服務(wù)時(shí),它通過(guò)內(nèi)置的LB組件向服務(wù)注冊(cè)表查詢,同時(shí)緩存并定期刷新目標(biāo)服務(wù)地址列表,然后以某種負(fù)載均衡策略選擇一個(gè)目標(biāo)服務(wù)地址,最后向目標(biāo)服務(wù)發(fā)起請(qǐng)求。LB和服務(wù)發(fā)現(xiàn)能力被分散到每一個(gè)服務(wù)消費(fèi)者的進(jìn)程內(nèi)部,同時(shí)服務(wù)消費(fèi)方和服務(wù)提供方之間是直接調(diào)用,沒(méi)有額外開(kāi)銷,性能比較好。該方案主要問(wèn)題: 1. 開(kāi)發(fā)成本,該方案將服務(wù)調(diào)用方集成到客戶端的進(jìn)程里頭,如果有多種不同的語(yǔ)言棧,就要配合開(kāi)發(fā)多種不同的客戶端,有一定的研發(fā)和維護(hù)成本; 2. 另外生產(chǎn)環(huán)境中,后續(xù)如果要對(duì)客戶庫(kù)進(jìn)行升級(jí),勢(shì)必要求服務(wù)調(diào)用方修改代碼并重新發(fā)布,升級(jí)較復(fù)雜。 # 3、獨(dú)立 LB 進(jìn)程(External Load Balancing Service) ![](https://upload-images.jianshu.io/upload_images/28338950-20ecc8b2d62cb6d9.png) 該方案是針對(duì)第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。 不同之處是將LB和服務(wù)發(fā)現(xiàn)功能從進(jìn)程內(nèi)移出來(lái),變成主機(jī)上的一個(gè)獨(dú)立進(jìn)程。主機(jī)上的一個(gè)或者多個(gè)服務(wù)要訪問(wèn)目標(biāo)服務(wù)時(shí),他們都通過(guò)同一主機(jī)上的獨(dú)立LB進(jìn)程做服務(wù)發(fā)現(xiàn)和負(fù)載均衡。該方案也是一種分布式方案沒(méi)有單點(diǎn)問(wèn)題,一個(gè)LB進(jìn)程掛了只影響該主機(jī)上的服務(wù)調(diào)用方,服務(wù)調(diào)用方和LB之間是進(jìn)程內(nèi)調(diào)用性能好,同時(shí)該方案還簡(jiǎn)化了服務(wù)調(diào)用方,不需要為不同語(yǔ)言開(kāi)發(fā)客戶庫(kù),LB的升級(jí)不需要服務(wù)調(diào)用方改代碼。 該方案主要問(wèn)題:部署較復(fù)雜,環(huán)節(jié)多,出錯(cuò)調(diào)試排查問(wèn)題不方便。 # gRPC服務(wù)發(fā)現(xiàn)及負(fù)載均衡實(shí)現(xiàn) gRPC開(kāi)源組件官方并未直接提供服務(wù)注冊(cè)與發(fā)現(xiàn)的功能實(shí)現(xiàn),但其設(shè)計(jì)文檔已提供實(shí)現(xiàn)的思路,并在不同語(yǔ)言的gRPC代碼API中已提供了命名解析和負(fù)載均衡接口供擴(kuò)展。 ![](https://upload-images.jianshu.io/upload_images/28338950-c5b3804ae7f465e4.png) 其基本實(shí)現(xiàn)原理: 1. 服務(wù)啟動(dòng)后gRPC客戶端向命名服務(wù)器發(fā)出名稱解析請(qǐng)求,名稱將解析為一個(gè)或多個(gè)IP地址,每個(gè)IP地址標(biāo)示它是服務(wù)器地址還是負(fù)載均衡器地址,以及標(biāo)示要使用那個(gè)客戶端負(fù)載均衡策略或服務(wù)配置。 2. 客戶端實(shí)例化負(fù)載均衡策略,如果解析返回的地址是負(fù)載均衡器地址,則客戶端將使用grpclb策略,否則客戶端使用服務(wù)配置請(qǐng)求的負(fù)載均衡策略。 3. 負(fù)載均衡策略為每個(gè)服務(wù)器地址創(chuàng)建一個(gè)子通道(channel)。 4. 當(dāng)有rpc請(qǐng)求時(shí),負(fù)載均衡策略決定那個(gè)子通道即grpc服務(wù)器將接收請(qǐng)求,當(dāng)可用服務(wù)器為空時(shí)客戶端的請(qǐng)求將被阻塞。 根據(jù)gRPC官方提供的設(shè)計(jì)思路,基于進(jìn)程內(nèi)LB方案(即第2個(gè)案,阿里開(kāi)源的服務(wù)框架 Dubbo 也是采用類似機(jī)制),結(jié)合分布式一致的組件(如Zookeeper、Consul、Etcd),可找到gRPC服務(wù)發(fā)現(xiàn)和負(fù)載均衡的可行解決方案。接下來(lái)以GO語(yǔ)言為例,簡(jiǎn)單介紹下基于Etcd3的關(guān)鍵代碼實(shí)現(xiàn): 1)命名解析實(shí)現(xiàn):resolver.go resolver.go ``` package etcdv3 import ( "errors" "fmt" "strings" etcd3 "github.com/coreos/etcd/clientv3" "google.golang.org/grpc/naming" ) // resolver is the implementaion of grpc.naming.Resolver type resolver struct { serviceName string // service name to resolve } // NewResolver return resolver with service name func NewResolver(serviceName string) *resolver { return &resolver{serviceName: serviceName} } // Resolve to resolve the service from etcd, target is the dial address of etcd // target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379" func (re *resolver) Resolve(target string) (naming.Watcher, error) { if re.serviceName == "" { return nil, errors.New("grpclb: no service name provided") } // generate etcd client client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err != nil { return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error()) } // Return watcher return &watcher{re: re, client: *client}, nil } ``` 2)服務(wù)發(fā)現(xiàn)實(shí)現(xiàn):watcher.go watcher.go ``` package etcdv3 import ( "fmt" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "google.golang.org/grpc/naming" "github.com/coreos/etcd/mvcc/mvccpb" ) // watcher is the implementaion of grpc.naming.Watcher type watcher struct { re *resolver // re: Etcd Resolver client etcd3.Client isInitialized bool } // Close do nothing func (w *watcher) Close() { } // Next to return the updates func (w *watcher) Next() ([]*naming.Update, error) { // prefix is the etcd prefix/value to watch prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName) // check if is initialized if !w.isInitialized { // query addresses from etcd resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix()) w.isInitialized = true if err == nil { addrs := extractAddrs(resp) //if not empty, return the updates or watcher new dir if l := len(addrs); l != 0 { updates := make([]*naming.Update, l) for i := range addrs { updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]} } return updates, nil } } } // generate etcd Watcher rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: return []*naming.Update{ {Op: naming.Add, Addr: string(ev.Kv.Value)} }, nil case mvccpb.DELETE: return []*naming.Update{ {Op: naming.Delete, Addr: string(ev.Kv.Value)} }, nil } } } return nil, nil } func extractAddrs(resp *etcd3.GetResponse) []string { addrs := []string{} if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { addrs = append(addrs, string(v)) } } return addrs } ``` 3)服務(wù)注冊(cè)實(shí)現(xiàn):register.go register.go ``` package etcdv3 import ( "fmt" "log" "strings" "time" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" ) // Prefix should start and end with no slash var Prefix = "etcd3_naming" var client etcd3.Client var serviceKey string var stopSignal = make(chan bool, 1) // Register func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error { serviceValue := fmt.Sprintf("%s:%d", host, port) serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue) // get endpoints for register dial address var err error client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err != nil { return fmt.Errorf("grpclb: create etcd3 client failed: %v", err) } go func() { // invoke self-register with ticker ticker := time.NewTicker(interval) for { // minimum lease TTL is ttl-second resp, _ := client.Grant(context.TODO(), int64(ttl)) // should get first, if not exist, set it _, err := client.Get(context.Background(), serviceKey) if err != nil { if err == rpctypes.ErrKeyNotFound { if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil { log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } else { log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error()) } } else { // refresh set to true for not notifying the watcher if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil { log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } select { case <-stopSignal: return case <-ticker.C: } } }() return nil } // UnRegister delete registered service from etcd func UnRegister() error { stopSignal <- true stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock var err error; if _, err := client.Delete(context.Background(), serviceKey); err != nil { log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error()) } else { log.Printf("grpclb: deregister '%s' ok.", serviceKey) } return err } ``` 4)接口描述文件:helloworld.proto helloworld.proto ``` syntax = "proto3"; option java_multiple_files = true; option java_package = "com.midea.jr.test.grpc"; option java_outer_classname = "HelloWorldProto"; option objc_class_prefix = "HLW"; package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) { } } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; } ``` 5)實(shí)現(xiàn)服務(wù)端接口:helloworldserver.go helloworldserver.go ``` package main import ( "flag" "fmt" "log" "net" "os" "os/signal" "syscall" "time" "golang.org/x/net/context" "google.golang.org/grpc" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" ) var ( serv = flag.String("service", "hello_service", "service name") port = flag.Int("port", 50001, "listening port") reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address") ) func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port)) if err != nil { panic(err) } err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15) if err != nil { panic(err) } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch log.Printf("receive signal '%v'", s) grpclb.UnRegister() os.Exit(1) }() log.Printf("starting hello service at %d", *port) s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) s.Serve(lis) } // server is used to implement helloworld.GreeterServer. type server struct{} // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name) return &pb.HelloReply{Message: "Hello " + in.Name}, nil } ``` 6)實(shí)現(xiàn)客戶端接口:helloworldclient.go helloworldclient.go ``` package main import ( "flag" "fmt" "time" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" "golang.org/x/net/context" "google.golang.org/grpc" "strconv" ) var ( serv = flag.String("service", "hello_service", "service name") reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address") ) func main() { flag.Parse() r := grpclb.NewResolver(*serv) b := grpc.RoundRobin(r) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { panic(err) } ticker := time.NewTicker(1 * time.Second) for t := range ticker.C { client := pb.NewGreeterClient(conn) resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())}) if err == nil { fmt.Printf("%v: Reply is %s\n", t, resp.Message) } } } ``` 7)運(yùn)行測(cè)試 1、運(yùn)行3個(gè)服務(wù)端S1、S2、S3,1個(gè)客戶端C,觀察各服務(wù)端接收的請(qǐng)求數(shù)是否相等? ![](https://upload-images.jianshu.io/upload_images/28338950-15a0eb845537d0bd.png) 2、關(guān)閉1個(gè)服務(wù)端S1,觀察請(qǐng)求是否會(huì)轉(zhuǎn)移到另外2個(gè)服務(wù)端? ![](https://upload-images.jianshu.io/upload_images/28338950-370ac7c531b4c6c9.png) 3、重新啟動(dòng)S1服務(wù)端,觀察另外2個(gè)服務(wù)端請(qǐng)求是否會(huì)平均分配到S1? ![](https://upload-images.jianshu.io/upload_images/28338950-b6e575fb3e7c57ef.png) 4、關(guān)閉Etcd3服務(wù)器,觀察客戶端與服務(wù)端通信是否正常? 關(guān)閉通信仍然正常,但新服務(wù)端不會(huì)注冊(cè)進(jìn)來(lái),服務(wù)端掉線了也無(wú)法摘除掉。 5、重新啟動(dòng)Etcd3服務(wù)器,服務(wù)端上下線可自動(dòng)恢復(fù)正常。 6、關(guān)閉所有服務(wù)端,客戶端請(qǐng)求將被阻塞。 參考 http://www.grpc.io/docs/ https://github.com/grpc/grpc/blob/master/doc/load-balancing.md > 原文地址 https://segmentfault.com/a/1190000008672912 關(guān)注 獲取更多好文 ![](https://upload-images.jianshu.io/upload_images/28338950-f886b6a1ad0a57f9.png) 本文由[mdnice](https://mdnice.com/?platform=6)多平臺(tái)發(fā)布
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容