java版gRPC實(shí)戰(zhàn)之六:客戶端動(dòng)態(tài)獲取服務(wù)端地址

歡迎訪問(wèn)我的GitHub

https://github.com/zq2599/blog_demos

內(nèi)容:所有原創(chuàng)文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC實(shí)戰(zhàn)》全系列鏈接

  1. 用proto生成代碼
  2. 服務(wù)發(fā)布和調(diào)用
  3. 服務(wù)端流
  4. 客戶端流
  5. 雙向流
  6. 客戶端動(dòng)態(tài)獲取服務(wù)端地址
  7. 基于eureka的注冊(cè)發(fā)現(xiàn)

客戶端為什么要?jiǎng)討B(tài)獲取服務(wù)端地址

本文是《java版gRPC實(shí)戰(zhàn)》系列的第六篇,前面咱們?cè)陂_(kāi)發(fā)客戶端應(yīng)用時(shí),所需的服務(wù)端地址都是按如下步驟設(shè)置的:

  • 在application.yml中配置,如下圖:
在這里插入圖片描述
  • 在用到gRPC的bean中,使用注解<font color="blue">GrpcClient</font>即可將Stub類注入到成員變量中:
在這里插入圖片描述
  • 上述操作方式的優(yōu)點(diǎn)是簡(jiǎn)單易用好配置,缺點(diǎn)也很明顯:服務(wù)端的IP地址或者端口一旦有變化,就必須修改application.yml并重啟客戶端應(yīng)用;

為什么不用注冊(cè)中心

  • 您一定會(huì)想到解決上述問(wèn)題最簡(jiǎn)單的方法就是使用注冊(cè)中心,如nacos、eureka等,其實(shí)我也是這么想的,直到有一天,由于工作原因,我要在一個(gè)已有的gRPC微服務(wù)環(huán)境部署自己的應(yīng)用,這個(gè)微服務(wù)環(huán)境并非java技術(shù)棧,而是基于golang的,他們都使用了go-zero框架( 老扎心了),這個(gè)go-zero框架沒(méi)有提供java語(yǔ)言的SDK,因此,我只能服從go-zero框架的規(guī)則,從etcd中取得其他微服務(wù)的地址信息,才能調(diào)用其他gRPC服務(wù)端,如下圖所示:
在這里插入圖片描述
  • 如此一來(lái),咱們之前那種在application.yml中配置服務(wù)端信息的方法就用不上了,本篇咱們來(lái)開(kāi)發(fā)一個(gè)新的gRPC客戶端應(yīng)用,滿足以下需求:
  1. 創(chuàng)建Stub對(duì)象的時(shí)候,服務(wù)端的信息不再來(lái)自注解<font color="blue">GrpcClient</font>,而是來(lái)自查詢etcd的結(jié)果;
  2. etcd上的服務(wù)端信息有變化的時(shí)候,客戶端可以及時(shí)更新,而不用重啟應(yīng)用;

本篇概覽

  • 本篇要開(kāi)發(fā)名為<font color="blue">get-service-addr-from-etcd</font>的springboot應(yīng)用,該應(yīng)用從etcd取得<font color="blue">local-server</font>應(yīng)用的IP和端口,然后調(diào)用local-server的<font color="red">sayHello</font>接口,如下圖:
在這里插入圖片描述
  1. 開(kāi)發(fā)客戶端應(yīng)用;
  2. 部署gRPC服務(wù)端應(yīng)用;
  3. 部署etcd;
  4. 模擬go-zero的規(guī)則,將服務(wù)端應(yīng)用的IP地址和端口寫入etcd;
  5. 啟動(dòng)客戶端應(yīng)用,驗(yàn)證能否正常調(diào)用服務(wù)端的服務(wù);
  6. 重啟服務(wù)端,重啟的時(shí)候修改端口;
  7. 修改etcd中服務(wù)端的端口信息;
  8. 調(diào)用接口觸發(fā)客戶端重新實(shí)例化Stub對(duì)象;
  9. 驗(yàn)證客戶端能否正常調(diào)用修改了端口的服務(wù)端服務(wù);

源碼下載

名稱 鏈接 備注
項(xiàng)目主頁(yè) https://github.com/zq2599/blog_demos 該項(xiàng)目在GitHub上的主頁(yè)
git倉(cāng)庫(kù)地址(https) https://github.com/zq2599/blog_demos.git 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,https協(xié)議
git倉(cāng)庫(kù)地址(ssh) git@github.com:zq2599/blog_demos.git 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,ssh協(xié)議
  • 這個(gè)git項(xiàng)目中有多個(gè)文件夾,《java版gRPC實(shí)戰(zhàn)》系列的源碼在<font color="blue">grpc-tutorials</font>文件夾下,如下圖紅框所示:
在這里插入圖片描述
  • <font color="blue">grpc-tutorials</font>文件夾下有多個(gè)目錄,本篇文章對(duì)應(yīng)的客戶端代碼在<font color="blue">get-service-addr-from-etcd</font>目錄下,如下圖:
在這里插入圖片描述

開(kāi)發(fā)客戶端應(yīng)用

  • 在父工程的build.gradle文件中新增一行,這是etcd相關(guān)的庫(kù),如下圖紅框所示:
在這里插入圖片描述
  • 在父工程grpc-turtorials下面新建名為<font color="blue">get-service-addr-from-etcd</font>的模塊,其build.gradle內(nèi)容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation 'io.etcd:jetcd-core'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,設(shè)置自己的web端口號(hào)和應(yīng)用名,另外grpc.etcdendpoints是etcd集群的地址信息:
server:
  port: 8084
spring:
  application:
    name: get-service-addr-from-etcd

grpc:
  # etcd的地址,從此處取得gRPC服務(wù)端的IP和端口
  etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
  • 啟動(dòng)類DynamicServerAddressDemoApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;

  • 新增StubWrapper.java文件,這是個(gè)spring bean,要重點(diǎn)關(guān)注的是simpleBlockingStub方法,當(dāng)bean在spring注冊(cè)的時(shí)候simpleBlockingStub方法會(huì)被執(zhí)行,這樣每當(dāng)bean在spring注冊(cè)時(shí),都會(huì)從etcd查詢gRPC服務(wù)端信息,然后創(chuàng)建SimpleBlockingStub對(duì)象:

package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8;

/**
 * @author will (zq2599@gmail.com)
 * @version 1.0
 * @description: 包裝了SimpleBlockingStub實(shí)例的類,發(fā)起gRPC請(qǐng)求時(shí)需要用到SimpleBlockingStub實(shí)例
 * @date 2021/5/8 19:34
 */
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper {

    /**
     * 這是etcd中的一個(gè)key,該key對(duì)應(yīng)的值是grpc服務(wù)端的地址信息
     */
    private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server";

    /**
     * 配置文件中寫好的etcd地址
     */
    private String etcdendpoints;

    private SimpleGrpc.SimpleBlockingStub simpleBlockingStub;

    /**
     * 從etcd查詢gRPC服務(wù)端的地址
     * @return
     */
    public String[] getGrpcServerInfo() {
        // 創(chuàng)建client類
        KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient();

        GetResponse response = null;

        // 去etcd查詢/grpc/local-server這個(gè)key的值
        try {
            response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
        } catch (Exception exception) {
            log.error("get grpc key from etcd error", exception);
        }

        if (null==response || response.getKvs().isEmpty()) {
            log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
            return null;
        }

        // 從response中取得值
        String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8);

        // rawAddrInfo是“192.169.0.1:8080”這樣的字符串,即一個(gè)IP和一個(gè)端口,用":"分割,
        // 這里用":"分割成數(shù)組返回
        return null==rawAddrInfo ? null : rawAddrInfo.split(":");
    }

    /**
     * 每次注冊(cè)bean都會(huì)執(zhí)行的方法,
     * 該方法從etcd取得gRPC服務(wù)端地址,
     * 用于實(shí)例化成員變量SimpleBlockingStub
     */
    @PostConstruct
    public void simpleBlockingStub() {
        // 從etcd獲取地址信息
        String[] array = getGrpcServerInfo();

        log.info("create stub bean, array info from etcd {}", Arrays.toString(array));

        // 數(shù)組的第一個(gè)元素是gRPC服務(wù)端的IP地址,第二個(gè)元素是端口
        if (null==array || array.length<2) {
            log.error("can not get valid grpc address from etcd");
            return;
        }

        // 數(shù)組的第一個(gè)元素是gRPC服務(wù)端的IP地址
        String addr = array[0];
        // 數(shù)組的第二個(gè)元素是端口
        int port = Integer.parseInt(array[1]);

        // 根據(jù)剛才獲取的gRPC服務(wù)端的地址和端口,創(chuàng)建channel
        Channel channel = ManagedChannelBuilder
                .forAddress(addr, port)
                .usePlaintext()
                .build();

        // 根據(jù)channel創(chuàng)建stub
        simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
    }
}
  • GrpcClientService是封裝了StubWrapper的服務(wù)類:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class GrpcClientService {

    @Autowired(required = false)
    @Setter
    private StubWrapper stubWrapper;

    public String sendMessage(final String name) {
        // 很有可能simpleStub對(duì)象為null
        if (null==stubWrapper) {
            return "invalid SimpleBlockingStub, please check etcd configuration";
        }

        try {
            final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
            return response.getMessage();
        } catch (final StatusRuntimeException e) {
            return "FAILED with " + e.getStatus().getCode().name();
        }
    }
}
  • 新增一個(gè)controller類GrpcClientController,提供一個(gè)http接口,里面會(huì)調(diào)用GrpcClientService的方法,最終完成遠(yuǎn)程gRPC調(diào)用:
package com.bolingcavalry.dynamicrpcaddr;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "will") String name) {
        return grpcClientService.sendMessage(name);
    }
}
  • 接下來(lái)新增一個(gè)controller類RefreshStubInstanceController,對(duì)外提供一個(gè)http接口refreshstub,作用是刪掉stubWrapper這個(gè)bean,再重新注冊(cè)一次,這樣每當(dāng)外部調(diào)用refreshstub接口,就可以從etcd取得服務(wù)端信息再重新實(shí)例化SimpleBlockingStub成員變量,這樣就達(dá)到了客戶端動(dòng)態(tài)獲取服務(wù)端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RefreshStubInstanceController implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Autowired
    private GrpcClientService grpcClientService;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @RequestMapping("/refreshstub")
    public String refreshstub() {

        String beanName = "stubWrapper";

        //獲取BeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

        // 刪除已有bean
        defaultListableBeanFactory.removeBeanDefinition(beanName);

        //創(chuàng)建bean信息.
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class);

        //動(dòng)態(tài)注冊(cè)bean.
        defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());

        // 更新引用關(guān)系(注意,applicationContext.getBean方法很重要,會(huì)觸發(fā)StubWrapper實(shí)例化操作)
        grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class));

        return "Refresh success";
    }
}
  • 編碼完成,開(kāi)始驗(yàn)證;

部署gRPC服務(wù)端應(yīng)用

部署gRPC服務(wù)端應(yīng)用很簡(jiǎn)單,啟動(dòng)<font color="blue">local-server</font>應(yīng)用即可:

在這里插入圖片描述

部署etcd

  • 為了簡(jiǎn)化操作,我這里的etcd集群是用docker部署的,對(duì)應(yīng)的docker-compose.yml文件內(nèi)容如下:
version: '3'
services:
  etcd1:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd1'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd1:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd1:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2379:2379
    volumes:
      - ./store/etcd1/data:/etcd_data
  etcd2:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd2'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd2:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd2:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2380:2379
    volumes:
      - ./store/etcd2/data:/etcd_data
  etcd3:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd3'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd3:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd3:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2381:2379
    volumes:
      - ./store/etcd3/data:/etcd_data
  • 準(zhǔn)備好上述文件后,執(zhí)行<font color="blue">docker-compose up -d</font>即可創(chuàng)建集群;

將服務(wù)端應(yīng)用的IP地址和端口寫入etcd

  • 我這邊<font color="blue">local-server</font>所在服務(wù)器IP是<font color="red">192.168.50.5</font>,端口9898,所以執(zhí)行以下命令將local-server信息寫入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898

啟動(dòng)客戶端應(yīng)用

  • 打開(kāi)DynamicServerAddressDemoApplication.java,點(diǎn)擊下圖紅框位置,即可啟動(dòng)客戶端應(yīng)用:
在這里插入圖片描述
  • 注意下圖紅框中的日志,該日志證明客戶端應(yīng)用從etcd獲取服務(wù)端信息成功:
在這里插入圖片描述
  • 瀏覽器訪問(wèn)應(yīng)用<font color="blue">get-service-addr-from-etcd</font>的http接口,成功收到響應(yīng),證明gRPC調(diào)用成功:
在這里插入圖片描述
  • 去看local-server的控制臺(tái),如下圖紅框,證明遠(yuǎn)程調(diào)用確實(shí)執(zhí)行了:
在這里插入圖片描述

重啟服務(wù)端,重啟的時(shí)候修改端口

  • 為了驗(yàn)證動(dòng)態(tài)獲取服務(wù)端信息是否有效,咱們先把local-server應(yīng)用的端口改一下,如下圖紅框,改成<font color="red">9899</font>:
在這里插入圖片描述
  • 改完重啟local-server,如下圖紅框,可見(jiàn)gRPC端口已經(jīng)改為9899:
在這里插入圖片描述
  • 這時(shí)候再訪問(wèn)<font color="blue">get-service-addr-from-etcd</font>的http接口,由于get-service-addr-from-etcd不知道local-server的監(jiān)聽(tīng)端口發(fā)生了改變,因此還是去訪問(wèn)9898端口,毫無(wú)意外的返回了失?。?/li>
在這里插入圖片描述

修改etcd中服務(wù)端的端口信息

現(xiàn)在執(zhí)行以下命令,將etcd中的服務(wù)端信息改為正確的:

docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899

調(diào)用接口觸發(fā)客戶端重新實(shí)例化Stub對(duì)象

  • 聰明的您一定知道接下來(lái)要做的事情了:讓StubWrapper的bean重新在spring環(huán)境注冊(cè),也就是調(diào)用RefreshStubInstanceController提供的http接口refreshstub:
在這里插入圖片描述
  • 查看<font color="blue">get-service-addr-from-etcd</font>應(yīng)用的控制臺(tái),如下圖紅框,StubWrapper已經(jīng)重新注冊(cè)了,并且從etcd取得了最新的服務(wù)端信息:
在這里插入圖片描述

驗(yàn)證客戶端能否正常調(diào)用修改了端口的服務(wù)端服務(wù)

  • 再次訪問(wèn)<font color="blue">get-service-addr-from-etcd</font>應(yīng)用的web接口,如下圖,gRPC調(diào)用成功:
在這里插入圖片描述
  • 至此,在不修改配置不重啟服務(wù)的情況下,客戶端也可以適應(yīng)服務(wù)端的變化了,當(dāng)然了,本文只是提供基本的操作參考,實(shí)際上的微服務(wù)環(huán)境會(huì)更復(fù)雜,例如refreshstub接口可能被其他服務(wù)調(diào)用,這樣服務(wù)端有了變化可以更加及時(shí)地被更新,還有客戶端本身也肯能是gRPC服務(wù)提供方,那也要把自己注冊(cè)到etcd上去,還有利用etcd的watch功能監(jiān)控指定的服務(wù)端是否一直存活,以及同一個(gè)gRPC服務(wù)的多個(gè)實(shí)例如何做負(fù)載均衡,等等,這些都要根據(jù)您的實(shí)際情況來(lái)定制;

  • 本篇內(nèi)容過(guò)多,可見(jiàn)對(duì)于這些官方不支持的微服務(wù)環(huán)境,咱們自己去做注冊(cè)發(fā)現(xiàn)的適配很費(fèi)時(shí)費(fèi)力的,如果設(shè)計(jì)和選型能自己做主,我們更傾向于使用現(xiàn)成的注冊(cè)中心,接下來(lái)的文章,咱們就一起嘗試使用eureka為gRPC提供注冊(cè)發(fā)現(xiàn)服務(wù);

你不孤單,欣宸原創(chuàng)一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數(shù)據(jù)庫(kù)+中間件系列
  6. DevOps系列

歡迎關(guān)注公眾號(hào):程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容