在上一個版本中利用netty實現(xiàn)了簡單的一對一的RPC,需要手動設(shè)置服務(wù)地址,限制性較大。
在本文中,利用zookeeper作為服務(wù)注冊中心,在服務(wù)端啟動時將本地的服務(wù)信息注冊到zookeeper中,當客戶端發(fā)起遠程服務(wù)調(diào)用時,先從zookeeper中獲取該服務(wù)的地址,然后根據(jù)獲得的這個地址來利用netty進行網(wǎng)絡(luò)傳送。
在服務(wù)端和注冊中心之間需要建立監(jiān)聽,當服務(wù)信息發(fā)生變化或網(wǎng)絡(luò)連接等問題時需要對注冊中心的服務(wù)信息進行修改。在本文中創(chuàng)建了服務(wù)注冊監(jiān)控中心,利用心跳機制來判斷與服務(wù)端是否有較穩(wěn)定的連接,當出現(xiàn)網(wǎng)絡(luò)不穩(wěn)定時,則從注冊中心中刪除屬于該服務(wù)端的服務(wù)信息。在本項目中設(shè)定在5分鐘內(nèi)3次以上沒有發(fā)送心跳包為不穩(wěn)定狀態(tài)。
關(guān)于心跳機制,之前有一篇文章介紹過:Dubbo心跳機制
zookeeper注冊中心
zookeeper是hadoop中的一個重要組件,其主要是作為分布式協(xié)調(diào)服務(wù)
zookeeper采用節(jié)點樹的數(shù)據(jù)模型,類似linux文件系統(tǒng)。
每個節(jié)點稱做一個ZNode,每個ZNode都可以通過路徑唯一標識,同時每個節(jié)點還可以存儲少量數(shù)據(jù)。
本項目借鑒dubbo的注冊中心模型來設(shè)計本文的注冊中心。
總體上設(shè)計了四級節(jié)點,在一個節(jié)點是一個持久節(jié)點/register,表示是記錄注冊服務(wù)的區(qū)域。二級節(jié)點是服務(wù)接口名,三級節(jié)點是遠程服務(wù)ip地址,該節(jié)點是臨時節(jié)點,節(jié)點存儲的數(shù)據(jù)是具體的實現(xiàn)類名。

在客戶端會根據(jù)服務(wù)接口名在注冊中心進行查找,得到遠程服務(wù)ip地址,并根據(jù)節(jié)點中存儲的具體實現(xiàn)類名進行反射。
首先進行zookeeper初始化,利用了CuratorFramework有關(guān)類
private static void init() {
RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
client = CuratorFrameworkFactory.builder().connectString(ZKConsts.ZK_SERVER_PATH)
.sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
.namespace(ZKConsts.WORK_SPACE).build();
client.start();
}
服務(wù)的注冊代碼
public static void register(URL url) {
try {
String interfaceName = url.getInterfaceName();
String implClassName = url.getImplClassName();
Stat stat = client.checkExists().forPath(getPath(interfaceName, url.toString()));
if (stat != null) {
System.out.println("該節(jié)點已存在!");
client.delete().forPath(getPath(interfaceName, url.toString()));
}
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
//權(quán)限控制,任何連接的客戶端都可以操作該屬性znode
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(getPath(interfaceName, url.toString()), implClassName.getBytes());
System.out.println(getPath(interfaceName, url.toString()));
} catch (Exception e) {
e.printStackTrace();
}
}
根據(jù)服務(wù)接口名來獲取遠程服務(wù)連接地址
public static URL random(String interfaceName) {
try {
System.out.println("開始查找服務(wù)節(jié)點:" + getPath(interfaceName));
List<String> urlList = client.getChildren().forPath("/" + interfaceName);
System.out.println("結(jié)果:" + urlList);
String serviceUrl = urlList.get(0);
String[] urls = serviceUrl.split(":");
String implClassName = get(interfaceName, serviceUrl);
return new URL(urls[0], Integer.valueOf(urls[1]), interfaceName, implClassName);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
return null;
}
注冊中心與服務(wù)端進行連接時需要判斷是否維持了穩(wěn)定的連接,如果服務(wù)端出現(xiàn)宕機等情況時需要從注冊中心中刪除這些服務(wù)。
以前的一些處理機制,有session機制和wacher機制。
session機制
每個zookeeper注冊中心與服務(wù)端進行連接時會創(chuàng)建一個session,在設(shè)置的sessionTimeout內(nèi),服務(wù)端會與注冊中心進行心跳包的定時發(fā)送,從而感知每個客戶端是否宕機,如果創(chuàng)建某個臨時Znode節(jié)點對應的session銷毀時,相應的臨時節(jié)點也會被注冊中心刪除。
watcher機制
針對每個節(jié)點的操作,都有要給監(jiān)督者進行watcher,當監(jiān)控的某個節(jié)點發(fā)生了變化,則會觸發(fā)watcher事件。注冊中心的watcher是一次性的,觸發(fā)后會被銷毀。父節(jié)點,子節(jié)點增刪改都能夠觸發(fā)watcher。觸發(fā)銷毀后,下次需要監(jiān)聽時還需要再注冊一次。
本文心跳機制
服務(wù)端定時向注冊中心發(fā)送本機地址,看作心跳數(shù)據(jù)包,而注冊中心監(jiān)控則維持一個channelId和具體地址的map,并且通過IdleHandler監(jiān)聽空閑事件,到達一定的空閑次數(shù)則認為不活躍,當不活躍時,zookeeper刪除對應的url節(jié)點。該版本實現(xiàn)了上面的內(nèi)容,后續(xù)的步驟在以后的版本實現(xiàn)。
如果10s內(nèi)沒有觸發(fā)讀,就會執(zhí)行userEventTriggered方法。如果5分鐘中出現(xiàn)兩次不活躍次數(shù),就認定該連接不穩(wěn)定,注冊中心會移除屬于該服務(wù)端的服務(wù)。你也可以根據(jù)實際情況設(shè)定不穩(wěn)定標準。
service.scheduleAtFixedRate(() -> {
if (future.channel().isActive()) {
int time = new Random().nextInt(5);
log.info("本次定時任務(wù)獲取的隨機數(shù):{}", time);
if (time > 3) {
log.info("發(fā)送本地地址到注冊中心:{}", url);
future.channel().writeAndFlush(url);
}
}
}, 60, 60, TimeUnit.SECONDS);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent state = (IdleStateEvent)evt;
if (state.state().equals(IdleState.READER_IDLE)) {
log.info("讀空閑");
} else if (state.state().equals(IdleState.WRITER_IDLE)) {
log.info("寫空閑");
}
//在一定時間內(nèi)讀寫空閑才會關(guān)閉鏈接
else if (state.state().equals(IdleState.ALL_IDLE)) {
if (++inActiveCount == 1) {
start = System.currentTimeMillis();
}
int minute = (int)((System.currentTimeMillis() - start) / (60 * 1000)) + 1;
log.info("第{}次讀寫都空閑,計時分鐘數(shù){}", inActiveCount, minute);
if (inActiveCount > 2 && minute < 5) {
log.info("移除不活躍ip");
removeAndClose(ctx);
} else {
if (minute >= 5) {
log.info("新周期開始");
start = 0;
inActiveCount = 0;
}
}
}
}
}
具體實現(xiàn)代碼:RPC第二版