ribbon源碼閱讀--IPing源碼分析

本人小白,這個(gè)文章是本人的閱讀筆記,不是權(quán)威解讀,需要自己甄別對(duì)錯(cuò)

最后一個(gè)部分了,就是IPing組件了,接著看看這個(gè)

IPing機(jī)制在我的理解當(dāng)中可能就是就是彌補(bǔ)Eureka的服務(wù)信息變更的延時(shí)效應(yīng)的。

我們算算,假設(shè)一個(gè)服務(wù)宕機(jī)了,其他服務(wù)Eureka Client列表最慢獲取到服務(wù)變更消息的時(shí)間,也是復(fù)習(xí)下Eureka的相關(guān)知識(shí)

  • 服務(wù)故障移除 30S執(zhí)行一次,但是的但是,服務(wù)租約的過期是需要2 * 90S,就是3分鐘,取最長的,那么需要6次檢測(cè)才能感知故障進(jìn)行移除。
  • 故障實(shí)例移除了,但是只是清空了注冊(cè)表和讀寫緩存,只讀緩存需要花費(fèi)30S進(jìn)行同步,才能從最近變更隊(duì)列中獲取。
  • Eureka Client是花費(fèi)30S進(jìn)行一次增量獲取

假設(shè)啊,時(shí)間都錯(cuò)開了,那么SumTime = 180 + 30 + 30 大概需要4分鐘,并且Ribbon的AllList從Eureka Client本地注冊(cè)表拉取也是30S進(jìn)行一次,那么感知一個(gè)服務(wù)宕機(jī)了,需要花費(fèi)好幾分鐘,這不是有點(diǎn)小坑?

這種情況下,在并發(fā)比較高的場景,那么請(qǐng)求宕機(jī)服務(wù)的線程都會(huì)超時(shí),超時(shí)造成的當(dāng)前線程資源耗盡會(huì)引發(fā)服務(wù)雪崩效應(yīng),不知道是不是基于這個(gè)考慮,Hystrix中資源隔離和服務(wù)熔斷降級(jí)很好的解決了這個(gè)情況!

IPing機(jī)制在一定情況下也緩解了這種情況發(fā)生。

說說主要的IPing實(shí)現(xiàn)吧

首先IPing是怎么實(shí)例化的呢?

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

如果配置了自定義IPing機(jī)制就用自己那一套,沒配就走NIWSDiscoveryPing

那么在哪進(jìn)行使用的呢,在ZoneAwareLoadBalancer進(jìn)行初始化的時(shí)候傳入了,向上追蹤父類構(gòu)造,你會(huì)發(fā)現(xiàn)在BaseLoadBalancer初始化時(shí)候使用了,順便說一嘴,BaseLoadBalancer是原生的Ribbon調(diào)用接口類。

        String clientName = clientConfig.getClientName();
        this.name = clientName;
        // 默認(rèn)定時(shí)調(diào)度30S
        int pingIntervalTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerPingInterval,
                        Integer.parseInt("30")));
        int maxTotalPingTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                        Integer.parseInt("2")));
        //啟動(dòng)IPing機(jī)制
        setPingInterval(pingIntervalTime);
        setMaxTotalPingTime(maxTotalPingTime);

        // cross associate with each other
        // i.e. Rule,Ping meet your container LB
        // LB, these are your Ping and Rule guys ...
        setRule(rule);
        setPing(ping);

接著會(huì)觸發(fā)一個(gè)定時(shí)任務(wù)

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
        //30S執(zhí)行一次        
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

任務(wù)核心執(zhí)行方法

    public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }
            
            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                //使用IPing的isAlive()方法判斷服務(wù)是否可用
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();
                    //重置服務(wù)狀態(tài)
                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        //加入服務(wù)狀態(tài)改變列表
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();
                //通知服務(wù)狀態(tài)變更
                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }

就是這一塊調(diào)用了,目的也是重置服務(wù)狀態(tài)

具體的實(shí)現(xiàn)類我們看幾個(gè)主要的,不常用的就懶得看了

NIWSDiscoveryPing

默認(rèn)實(shí)現(xiàn)

    public boolean isAlive(Server server) {
            boolean isAlive = true;
            if (server!=null && server instanceof DiscoveryEnabledServer){
                DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
                InstanceInfo instanceInfo = dServer.getInstanceInfo();
                if (instanceInfo!=null){                    
                    InstanceStatus status = instanceInfo.getStatus();
                    if (status!=null){
                        isAlive = status.equals(InstanceStatus.UP);
                    }
                }
            }
            return isAlive;
        }

從本地注冊(cè)表拉取判斷服務(wù)實(shí)例狀態(tài),但是有個(gè)疑問,Ribbon的AllList拉取,即enableAndInitLearnNewServersFeature()啟動(dòng)的定時(shí)更新AllList任務(wù)也是30S執(zhí)行一次,這個(gè)任務(wù)是全量更新服務(wù)列表,那么這個(gè)IPing的作用是???不懂,待大佬解答啊

PingUrl

    public boolean isAlive(Server server) {
                String urlStr   = "";
                if (isSecure){
                    urlStr = "https://";
                }else{
                    urlStr = "http://";
                }
                urlStr += server.getId();
                urlStr += getPingAppendString();

                boolean isAlive = false;

                HttpClient httpClient = new DefaultHttpClient();
                HttpUriRequest getRequest = new HttpGet(urlStr);
                String content=null;
                try {
                    HttpResponse response = httpClient.execute(getRequest);
                    content = EntityUtils.toString(response.getEntity());
                    isAlive = (response.getStatusLine().getStatusCode() == 200);
                    if (getExpectedContent()!=null){
                        LOGGER.debug("content:" + content);
                        if (content == null){
                            isAlive = false;
                        }else{
                            if (content.equals(getExpectedContent())){
                                isAlive = true;
                            }else{
                                isAlive = false;
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }finally{
                    // Release the connection.
                    getRequest.abort();
                }

                return isAlive;
        }

就是直接請(qǐng)求服務(wù)了,默認(rèn)的就是http://192.168.31.128:8080,默認(rèn)情況下返回200就任務(wù)服務(wù)可用,當(dāng)然可配置接口和返回值進(jìn)行二次驗(yàn)證

這個(gè)機(jī)制對(duì)Eureka Client服務(wù)感知延遲做了一定的保障,但是這個(gè)是不是有點(diǎn)耗費(fèi)性能

好了,IPing機(jī)制就說這點(diǎn),沒啥好說的

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容