輕量級RPC設(shè)計與實現(xiàn)第二版

在上一個版本中利用netty實現(xiàn)了簡單的一對一的RPC,需要手動設(shè)置服務(wù)地址,限制性較大。
在本文中,利用zookeeper作為服務(wù)注冊中心,在服務(wù)端啟動時將本地的服務(wù)信息注冊到zookeeper中,當客戶端發(fā)起遠程服務(wù)調(diào)用時,先從zookeeper中獲取該服務(wù)的地址,然后根據(jù)獲得的這個地址來利用netty進行網(wǎng)絡(luò)傳送。
在服務(wù)端和注冊中心之間需要建立監(jiān)聽,當服務(wù)信息發(fā)生變化或網(wǎng)絡(luò)連接等問題時需要對注冊中心的服務(wù)信息進行修改。在本文中創(chuàng)建了服務(wù)注冊監(jiān)控中心,利用心跳機制來判斷與服務(wù)端是否有較穩(wěn)定的連接,當出現(xiàn)網(wǎng)絡(luò)不穩(wěn)定時,則從注冊中心中刪除屬于該服務(wù)端的服務(wù)信息。在本項目中設(shè)定在5分鐘內(nèi)3次以上沒有發(fā)送心跳包為不穩(wěn)定狀態(tài)。

關(guān)于心跳機制,之前有一篇文章介紹過:Dubbo心跳機制

zookeeper注冊中心

zookeeper是hadoop中的一個重要組件,其主要是作為分布式協(xié)調(diào)服務(wù)
zookeeper采用節(jié)點樹的數(shù)據(jù)模型,類似linux文件系統(tǒng)。
每個節(jié)點稱做一個ZNode,每個ZNode都可以通過路徑唯一標識,同時每個節(jié)點還可以存儲少量數(shù)據(jù)。
本項目借鑒dubbo的注冊中心模型來設(shè)計本文的注冊中心。
總體上設(shè)計了四級節(jié)點,在一個節(jié)點是一個持久節(jié)點/register,表示是記錄注冊服務(wù)的區(qū)域。二級節(jié)點是服務(wù)接口名,三級節(jié)點是遠程服務(wù)ip地址,該節(jié)點是臨時節(jié)點,節(jié)點存儲的數(shù)據(jù)是具體的實現(xiàn)類名。

1.png

在客戶端會根據(jù)服務(wù)接口名在注冊中心進行查找,得到遠程服務(wù)ip地址,并根據(jù)節(jié)點中存儲的具體實現(xiàn)類名進行反射。

首先進行zookeeper初始化,利用了CuratorFramework有關(guān)類

private static void init() {
        RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
        client = CuratorFrameworkFactory.builder().connectString(ZKConsts.ZK_SERVER_PATH)
                .sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
                .namespace(ZKConsts.WORK_SPACE).build();
        client.start();
    }

服務(wù)的注冊代碼

public static void register(URL url) {
        try {
            String interfaceName = url.getInterfaceName();
            String implClassName = url.getImplClassName();
            Stat stat = client.checkExists().forPath(getPath(interfaceName, url.toString()));
            if (stat != null) {
                System.out.println("該節(jié)點已存在!");
                client.delete().forPath(getPath(interfaceName, url.toString()));
            }
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    //權(quán)限控制,任何連接的客戶端都可以操作該屬性znode
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(getPath(interfaceName, url.toString()), implClassName.getBytes());
            System.out.println(getPath(interfaceName, url.toString()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

根據(jù)服務(wù)接口名來獲取遠程服務(wù)連接地址

public static URL random(String interfaceName) {
        try {

            System.out.println("開始查找服務(wù)節(jié)點:" + getPath(interfaceName));
            List<String> urlList = client.getChildren().forPath("/" + interfaceName);
            System.out.println("結(jié)果:" + urlList);
            String serviceUrl = urlList.get(0);
            String[] urls = serviceUrl.split(":");
            String implClassName = get(interfaceName, serviceUrl);
            return new URL(urls[0], Integer.valueOf(urls[1]), interfaceName, implClassName);
        } catch (Exception e) {
            System.out.println(e);
            e.printStackTrace();
        }
        return null;
    }

注冊中心與服務(wù)端進行連接時需要判斷是否維持了穩(wěn)定的連接,如果服務(wù)端出現(xiàn)宕機等情況時需要從注冊中心中刪除這些服務(wù)。
以前的一些處理機制,有session機制和wacher機制。
session機制
每個zookeeper注冊中心與服務(wù)端進行連接時會創(chuàng)建一個session,在設(shè)置的sessionTimeout內(nèi),服務(wù)端會與注冊中心進行心跳包的定時發(fā)送,從而感知每個客戶端是否宕機,如果創(chuàng)建某個臨時Znode節(jié)點對應的session銷毀時,相應的臨時節(jié)點也會被注冊中心刪除。
watcher機制
針對每個節(jié)點的操作,都有要給監(jiān)督者進行watcher,當監(jiān)控的某個節(jié)點發(fā)生了變化,則會觸發(fā)watcher事件。注冊中心的watcher是一次性的,觸發(fā)后會被銷毀。父節(jié)點,子節(jié)點增刪改都能夠觸發(fā)watcher。觸發(fā)銷毀后,下次需要監(jiān)聽時還需要再注冊一次。
本文心跳機制
服務(wù)端定時向注冊中心發(fā)送本機地址,看作心跳數(shù)據(jù)包,而注冊中心監(jiān)控則維持一個channelId和具體地址的map,并且通過IdleHandler監(jiān)聽空閑事件,到達一定的空閑次數(shù)則認為不活躍,當不活躍時,zookeeper刪除對應的url節(jié)點。該版本實現(xiàn)了上面的內(nèi)容,后續(xù)的步驟在以后的版本實現(xiàn)。
如果10s內(nèi)沒有觸發(fā)讀,就會執(zhí)行userEventTriggered方法。如果5分鐘中出現(xiàn)兩次不活躍次數(shù),就認定該連接不穩(wěn)定,注冊中心會移除屬于該服務(wù)端的服務(wù)。你也可以根據(jù)實際情況設(shè)定不穩(wěn)定標準。

 service.scheduleAtFixedRate(() -> {
                if (future.channel().isActive()) {
                    int time = new Random().nextInt(5);
                    log.info("本次定時任務(wù)獲取的隨機數(shù):{}", time);
                    if (time > 3) {
                        log.info("發(fā)送本地地址到注冊中心:{}", url);
                        future.channel().writeAndFlush(url);
                    }
                }
            }, 60, 60, TimeUnit.SECONDS);
 @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent state = (IdleStateEvent)evt;
            if (state.state().equals(IdleState.READER_IDLE)) {
                log.info("讀空閑");
            } else if (state.state().equals(IdleState.WRITER_IDLE)) {
                log.info("寫空閑");
            }
            //在一定時間內(nèi)讀寫空閑才會關(guān)閉鏈接
            else if (state.state().equals(IdleState.ALL_IDLE)) {
                if (++inActiveCount == 1) {
                    start = System.currentTimeMillis();
                }
                int minute = (int)((System.currentTimeMillis() - start) / (60 * 1000)) + 1;
                log.info("第{}次讀寫都空閑,計時分鐘數(shù){}", inActiveCount, minute);
                if (inActiveCount > 2 && minute < 5) {
                    log.info("移除不活躍ip");
                    removeAndClose(ctx);
                } else {
                    if (minute >= 5) {
                        log.info("新周期開始");
                        start = 0;
                        inActiveCount = 0;
                    }
                }
            }
        }
    }

具體實現(xiàn)代碼:RPC第二版

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容