歡迎訪問(wèn)我的GitHub
https://github.com/zq2599/blog_demos
內(nèi)容:所有原創(chuàng)文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《java版gRPC實(shí)戰(zhàn)》全系列鏈接
- 用proto生成代碼
- 服務(wù)發(fā)布和調(diào)用
- 服務(wù)端流
- 客戶端流
- 雙向流
- 客戶端動(dòng)態(tài)獲取服務(wù)端地址
- 基于eureka的注冊(cè)發(fā)現(xiàn)
客戶端為什么要?jiǎng)討B(tài)獲取服務(wù)端地址
本文是《java版gRPC實(shí)戰(zhàn)》系列的第六篇,前面咱們?cè)陂_(kāi)發(fā)客戶端應(yīng)用時(shí),所需的服務(wù)端地址都是按如下步驟設(shè)置的:
- 在application.yml中配置,如下圖:

- 在用到gRPC的bean中,使用注解<font color="blue">GrpcClient</font>即可將Stub類注入到成員變量中:

- 上述操作方式的優(yōu)點(diǎn)是簡(jiǎn)單易用好配置,缺點(diǎn)也很明顯:服務(wù)端的IP地址或者端口一旦有變化,就必須修改application.yml并重啟客戶端應(yīng)用;
為什么不用注冊(cè)中心
- 您一定會(huì)想到解決上述問(wèn)題最簡(jiǎn)單的方法就是使用注冊(cè)中心,如nacos、eureka等,其實(shí)我也是這么想的,直到有一天,由于工作原因,我要在一個(gè)已有的gRPC微服務(wù)環(huán)境部署自己的應(yīng)用,這個(gè)微服務(wù)環(huán)境并非java技術(shù)棧,而是基于golang的,他們都使用了go-zero框架( 老扎心了),這個(gè)go-zero框架沒(méi)有提供java語(yǔ)言的SDK,因此,我只能服從go-zero框架的規(guī)則,從etcd中取得其他微服務(wù)的地址信息,才能調(diào)用其他gRPC服務(wù)端,如下圖所示:

- 如此一來(lái),咱們之前那種在application.yml中配置服務(wù)端信息的方法就用不上了,本篇咱們來(lái)開(kāi)發(fā)一個(gè)新的gRPC客戶端應(yīng)用,滿足以下需求:
- 創(chuàng)建Stub對(duì)象的時(shí)候,服務(wù)端的信息不再來(lái)自注解<font color="blue">GrpcClient</font>,而是來(lái)自查詢etcd的結(jié)果;
- etcd上的服務(wù)端信息有變化的時(shí)候,客戶端可以及時(shí)更新,而不用重啟應(yīng)用;
本篇概覽
- 本篇要開(kāi)發(fā)名為<font color="blue">get-service-addr-from-etcd</font>的springboot應(yīng)用,該應(yīng)用從etcd取得<font color="blue">local-server</font>應(yīng)用的IP和端口,然后調(diào)用local-server的<font color="red">sayHello</font>接口,如下圖:

<font color="blue">local-server</font>應(yīng)用是個(gè)簡(jiǎn)單的gRPC服務(wù)端,其詳細(xì)信息請(qǐng)參考《java版gRPC實(shí)戰(zhàn)之二:服務(wù)發(fā)布和調(diào)用》
本篇由以下章節(jié)組成:
- 開(kāi)發(fā)客戶端應(yīng)用;
- 部署gRPC服務(wù)端應(yīng)用;
- 部署etcd;
- 模擬go-zero的規(guī)則,將服務(wù)端應(yīng)用的IP地址和端口寫入etcd;
- 啟動(dòng)客戶端應(yīng)用,驗(yàn)證能否正常調(diào)用服務(wù)端的服務(wù);
- 重啟服務(wù)端,重啟的時(shí)候修改端口;
- 修改etcd中服務(wù)端的端口信息;
- 調(diào)用接口觸發(fā)客戶端重新實(shí)例化Stub對(duì)象;
- 驗(yàn)證客戶端能否正常調(diào)用修改了端口的服務(wù)端服務(wù);
源碼下載
- 本篇實(shí)戰(zhàn)中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 鏈接 | 備注 |
|---|---|---|
| 項(xiàng)目主頁(yè) | https://github.com/zq2599/blog_demos | 該項(xiàng)目在GitHub上的主頁(yè) |
| git倉(cāng)庫(kù)地址(https) | https://github.com/zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,https協(xié)議 |
| git倉(cāng)庫(kù)地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,ssh協(xié)議 |
- 這個(gè)git項(xiàng)目中有多個(gè)文件夾,《java版gRPC實(shí)戰(zhàn)》系列的源碼在<font color="blue">grpc-tutorials</font>文件夾下,如下圖紅框所示:

- <font color="blue">grpc-tutorials</font>文件夾下有多個(gè)目錄,本篇文章對(duì)應(yīng)的客戶端代碼在<font color="blue">get-service-addr-from-etcd</font>目錄下,如下圖:

開(kāi)發(fā)客戶端應(yīng)用
- 在父工程的build.gradle文件中新增一行,這是etcd相關(guān)的庫(kù),如下圖紅框所示:

- 在父工程grpc-turtorials下面新建名為<font color="blue">get-service-addr-from-etcd</font>的模塊,其build.gradle內(nèi)容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation 'io.etcd:jetcd-core'
implementation project(':grpc-lib')
}
- 配置文件application.yml,設(shè)置自己的web端口號(hào)和應(yīng)用名,另外grpc.etcdendpoints是etcd集群的地址信息:
server:
port: 8084
spring:
application:
name: get-service-addr-from-etcd
grpc:
# etcd的地址,從此處取得gRPC服務(wù)端的IP和端口
etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
啟動(dòng)類DynamicServerAddressDemoApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;
新增StubWrapper.java文件,這是個(gè)spring bean,要重點(diǎn)關(guān)注的是simpleBlockingStub方法,當(dāng)bean在spring注冊(cè)的時(shí)候simpleBlockingStub方法會(huì)被執(zhí)行,這樣每當(dāng)bean在spring注冊(cè)時(shí),都會(huì)從etcd查詢gRPC服務(wù)端信息,然后創(chuàng)建SimpleBlockingStub對(duì)象:
package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8;
/**
* @author will (zq2599@gmail.com)
* @version 1.0
* @description: 包裝了SimpleBlockingStub實(shí)例的類,發(fā)起gRPC請(qǐng)求時(shí)需要用到SimpleBlockingStub實(shí)例
* @date 2021/5/8 19:34
*/
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper {
/**
* 這是etcd中的一個(gè)key,該key對(duì)應(yīng)的值是grpc服務(wù)端的地址信息
*/
private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server";
/**
* 配置文件中寫好的etcd地址
*/
private String etcdendpoints;
private SimpleGrpc.SimpleBlockingStub simpleBlockingStub;
/**
* 從etcd查詢gRPC服務(wù)端的地址
* @return
*/
public String[] getGrpcServerInfo() {
// 創(chuàng)建client類
KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient();
GetResponse response = null;
// 去etcd查詢/grpc/local-server這個(gè)key的值
try {
response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
} catch (Exception exception) {
log.error("get grpc key from etcd error", exception);
}
if (null==response || response.getKvs().isEmpty()) {
log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
return null;
}
// 從response中取得值
String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8);
// rawAddrInfo是“192.169.0.1:8080”這樣的字符串,即一個(gè)IP和一個(gè)端口,用":"分割,
// 這里用":"分割成數(shù)組返回
return null==rawAddrInfo ? null : rawAddrInfo.split(":");
}
/**
* 每次注冊(cè)bean都會(huì)執(zhí)行的方法,
* 該方法從etcd取得gRPC服務(wù)端地址,
* 用于實(shí)例化成員變量SimpleBlockingStub
*/
@PostConstruct
public void simpleBlockingStub() {
// 從etcd獲取地址信息
String[] array = getGrpcServerInfo();
log.info("create stub bean, array info from etcd {}", Arrays.toString(array));
// 數(shù)組的第一個(gè)元素是gRPC服務(wù)端的IP地址,第二個(gè)元素是端口
if (null==array || array.length<2) {
log.error("can not get valid grpc address from etcd");
return;
}
// 數(shù)組的第一個(gè)元素是gRPC服務(wù)端的IP地址
String addr = array[0];
// 數(shù)組的第二個(gè)元素是端口
int port = Integer.parseInt(array[1]);
// 根據(jù)剛才獲取的gRPC服務(wù)端的地址和端口,創(chuàng)建channel
Channel channel = ManagedChannelBuilder
.forAddress(addr, port)
.usePlaintext()
.build();
// 根據(jù)channel創(chuàng)建stub
simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
}
}
- GrpcClientService是封裝了StubWrapper的服務(wù)類:
package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class GrpcClientService {
@Autowired(required = false)
@Setter
private StubWrapper stubWrapper;
public String sendMessage(final String name) {
// 很有可能simpleStub對(duì)象為null
if (null==stubWrapper) {
return "invalid SimpleBlockingStub, please check etcd configuration";
}
try {
final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
return response.getMessage();
} catch (final StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}
}
}
- 新增一個(gè)controller類GrpcClientController,提供一個(gè)http接口,里面會(huì)調(diào)用GrpcClientService的方法,最終完成遠(yuǎn)程gRPC調(diào)用:
package com.bolingcavalry.dynamicrpcaddr;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "will") String name) {
return grpcClientService.sendMessage(name);
}
}
- 接下來(lái)新增一個(gè)controller類RefreshStubInstanceController,對(duì)外提供一個(gè)http接口refreshstub,作用是刪掉stubWrapper這個(gè)bean,再重新注冊(cè)一次,這樣每當(dāng)外部調(diào)用refreshstub接口,就可以從etcd取得服務(wù)端信息再重新實(shí)例化SimpleBlockingStub成員變量,這樣就達(dá)到了客戶端動(dòng)態(tài)獲取服務(wù)端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RefreshStubInstanceController implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Autowired
private GrpcClientService grpcClientService;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@RequestMapping("/refreshstub")
public String refreshstub() {
String beanName = "stubWrapper";
//獲取BeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
// 刪除已有bean
defaultListableBeanFactory.removeBeanDefinition(beanName);
//創(chuàng)建bean信息.
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class);
//動(dòng)態(tài)注冊(cè)bean.
defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
// 更新引用關(guān)系(注意,applicationContext.getBean方法很重要,會(huì)觸發(fā)StubWrapper實(shí)例化操作)
grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class));
return "Refresh success";
}
}
- 編碼完成,開(kāi)始驗(yàn)證;
部署gRPC服務(wù)端應(yīng)用
部署gRPC服務(wù)端應(yīng)用很簡(jiǎn)單,啟動(dòng)<font color="blue">local-server</font>應(yīng)用即可:

部署etcd
- 為了簡(jiǎn)化操作,我這里的etcd集群是用docker部署的,對(duì)應(yīng)的docker-compose.yml文件內(nèi)容如下:
version: '3'
services:
etcd1:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd1'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd1:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd1:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2379:2379
volumes:
- ./store/etcd1/data:/etcd_data
etcd2:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd2'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd2:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd2:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2380:2379
volumes:
- ./store/etcd2/data:/etcd_data
etcd3:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd3'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd3:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd3:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2381:2379
volumes:
- ./store/etcd3/data:/etcd_data
- 準(zhǔn)備好上述文件后,執(zhí)行<font color="blue">docker-compose up -d</font>即可創(chuàng)建集群;
將服務(wù)端應(yīng)用的IP地址和端口寫入etcd
- 我這邊<font color="blue">local-server</font>所在服務(wù)器IP是<font color="red">192.168.50.5</font>,端口9898,所以執(zhí)行以下命令將local-server信息寫入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898
啟動(dòng)客戶端應(yīng)用
- 打開(kāi)DynamicServerAddressDemoApplication.java,點(diǎn)擊下圖紅框位置,即可啟動(dòng)客戶端應(yīng)用:

- 注意下圖紅框中的日志,該日志證明客戶端應(yīng)用從etcd獲取服務(wù)端信息成功:

- 瀏覽器訪問(wèn)應(yīng)用<font color="blue">get-service-addr-from-etcd</font>的http接口,成功收到響應(yīng),證明gRPC調(diào)用成功:

- 去看local-server的控制臺(tái),如下圖紅框,證明遠(yuǎn)程調(diào)用確實(shí)執(zhí)行了:

重啟服務(wù)端,重啟的時(shí)候修改端口
- 為了驗(yàn)證動(dòng)態(tài)獲取服務(wù)端信息是否有效,咱們先把local-server應(yīng)用的端口改一下,如下圖紅框,改成<font color="red">9899</font>:

- 改完重啟local-server,如下圖紅框,可見(jiàn)gRPC端口已經(jīng)改為9899:

- 這時(shí)候再訪問(wèn)<font color="blue">get-service-addr-from-etcd</font>的http接口,由于get-service-addr-from-etcd不知道local-server的監(jiān)聽(tīng)端口發(fā)生了改變,因此還是去訪問(wèn)9898端口,毫無(wú)意外的返回了失?。?/li>

修改etcd中服務(wù)端的端口信息
現(xiàn)在執(zhí)行以下命令,將etcd中的服務(wù)端信息改為正確的:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899
調(diào)用接口觸發(fā)客戶端重新實(shí)例化Stub對(duì)象
- 聰明的您一定知道接下來(lái)要做的事情了:讓StubWrapper的bean重新在spring環(huán)境注冊(cè),也就是調(diào)用RefreshStubInstanceController提供的http接口refreshstub:

- 查看<font color="blue">get-service-addr-from-etcd</font>應(yīng)用的控制臺(tái),如下圖紅框,StubWrapper已經(jīng)重新注冊(cè)了,并且從etcd取得了最新的服務(wù)端信息:

驗(yàn)證客戶端能否正常調(diào)用修改了端口的服務(wù)端服務(wù)
- 再次訪問(wèn)<font color="blue">get-service-addr-from-etcd</font>應(yīng)用的web接口,如下圖,gRPC調(diào)用成功:

至此,在不修改配置不重啟服務(wù)的情況下,客戶端也可以適應(yīng)服務(wù)端的變化了,當(dāng)然了,本文只是提供基本的操作參考,實(shí)際上的微服務(wù)環(huán)境會(huì)更復(fù)雜,例如refreshstub接口可能被其他服務(wù)調(diào)用,這樣服務(wù)端有了變化可以更加及時(shí)地被更新,還有客戶端本身也肯能是gRPC服務(wù)提供方,那也要把自己注冊(cè)到etcd上去,還有利用etcd的watch功能監(jiān)控指定的服務(wù)端是否一直存活,以及同一個(gè)gRPC服務(wù)的多個(gè)實(shí)例如何做負(fù)載均衡,等等,這些都要根據(jù)您的實(shí)際情況來(lái)定制;
本篇內(nèi)容過(guò)多,可見(jiàn)對(duì)于這些官方不支持的微服務(wù)環(huán)境,咱們自己去做注冊(cè)發(fā)現(xiàn)的適配很費(fèi)時(shí)費(fèi)力的,如果設(shè)計(jì)和選型能自己做主,我們更傾向于使用現(xiàn)成的注冊(cè)中心,接下來(lái)的文章,咱們就一起嘗試使用eureka為gRPC提供注冊(cè)發(fā)現(xiàn)服務(wù);
你不孤單,欣宸原創(chuàng)一路相伴
歡迎關(guān)注公眾號(hào):程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos