presto(六)——restful api之node

一、查詢帶有restful api的信息,我們發(fā)現(xiàn)都是以“Resource”結(jié)尾的類

帶有api的蹤跡

首先我們先從NodeResource.java中的"/v1/node"下手,他有一個成員變量:HeartbeatFailureDetector,類中提供了所有節(jié)點的執(zhí)行狀態(tài)的封裝信息:

start
uri
recentRequests
recentFailures
recentSuccesses
lastRequestTime
lastResponseTime

 if (coordinator) {
            binder.install(new FailureDetectorModule());
            jaxrsBinder(binder).bind(NodeResource.class);
}

其中綁定了:
HeartbeatFailureDetector

HeartbeatFailureDetector這個類在PostConstruct階段(也就是類注入完成之后的階段),開啟了定時器,每隔5s去監(jiān)聽worker的服務(wù)狀況:

@PostConstruct
    public void start()
    {
        if (isEnabled && started.compareAndSet(false, true)) {
            executor.scheduleWithFixedDelay(new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        updateMonitoredServices();
                    }
                    catch (Throwable e) {
                        // ignore to avoid getting unscheduled
                        log.warn(e, "Error updating services");
                    }
                }
            }, 0, 5, TimeUnit.SECONDS);
        }
    }

其中updateMonitoredServices使用httpclient的method為header方法去獲取信息,并把異常信息封裝起來。

 private void ping()
        {
            try {
                stats.recordStart();
                 //prepareHead為head http請求方法
                 //public static Builder prepareHead() {
                     // return new Builder().setMethod("HEAD");
                  // }
                httpClient.executeAsync(prepareHead().setUri(uri).build(), new ResponseHandler<Object, Exception>()
                {
                    @Override
                    public Exception handleException(Request request, Exception exception)
                    {
                        // ignore error
                        stats.recordFailure(exception);

                        // TODO: this will technically cause an NPE in httpClient, but it's not triggered because
                        // we never call get() on the response future. This behavior needs to be fixed in airlift
                        return null;
                    }

                    @Override
                    public Object handle(Request request, Response response)
                            throws Exception
                    {
                        stats.recordSuccess();
                        return null;
                    }
                });
            }
            catch (RuntimeException e) {
                log.warn(e, "Error scheduling request for %s", uri);
            }
        }

上面state實例記錄著狀態(tài)信息,與ServiceDescriptor(各個服務(wù)器的描述符:uuid(隨機(jī)生成)、nodeId、location等信息,從配置文件node.properties中讀取)

那,還有一個問題是,協(xié)調(diào)節(jié)點怎么獲取其他worker節(jié)點的信息的呢?

在updateMonitoredServices方法中:

 // selector.selectAllServices()獲取所有節(jié)點信息
        Set<ServiceDescriptor> online = selector.selectAllServices().stream()
                .filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId()))
                .collect(toImmutableSet());

上面的selector是CachingServiceSelector類,此類在PostConstruct有這樣的動作:

@PostConstruct
    public void start()
    {
        if (started.compareAndSet(false, true)) {
            Preconditions.checkState(!executor.isShutdown(), "CachingServiceSelector has been destroyed");

            // if discovery is available, get the initial set of servers before starting
            try {
                refresh().get(30, TimeUnit.SECONDS);
            }
            catch (Exception ignored) {
            }
        }
    }

其中的refresh是:

 ServiceDescriptors oldDescriptors = this.serviceDescriptors.get();

        ListenableFuture<ServiceDescriptors> future;
        if (oldDescriptors == null) {
          //DiscoveryLookupClient  lookupClient
            future = lookupClient.getServices(type, pool);
        }
        else {
            future = lookupClient.refreshServices(oldDescriptors);
        }

上面的DiscoveryLookupClient是HttpDiscoveryLookupClient,看其中方法:

private ListenableFuture<ServiceDescriptors> lookup(final String type, final String pool, final ServiceDescriptors serviceDescriptors)
    {
        Preconditions.checkNotNull(type, "type is null");

        URI uri = discoveryServiceURI.get();
        if (uri == null) {
            return Futures.immediateFailedCheckedFuture(new DiscoveryException("No discovery servers are available"));
        }

        uri = URI.create(uri + "/v1/service/" + type + "/");
        if (pool != null) {
            uri = uri.resolve(pool);
        }

  Builder requestBuilder = prepareGet()
                .setUri(uri)
                .setHeader("User-Agent", nodeInfo.getNodeId());

從上面可以看出,他會去調(diào)用service api,通過get請求將nodeid做為header信息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,578評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,094評論 25 709
  • 第一次嘗試水彩,色彩的運用還不是很上手。
    讀者1001號閱讀 145評論 0 0
  • 作為一名大一的新生,聽見大二學(xué)姐說“在大學(xué),一定要當(dāng)一個班干部什么的職位,這樣有什么好處才可以輪到你,...
    杜杜杜佳人閱讀 288評論 0 0
  • 心靜,才能聽見自己的內(nèi)心。 也許,你正狂熱追求的, 并非你真正想要的, 只是遷就了別人或社會; 也許,你正為之苦惱...
    Berlin_柏林閱讀 260評論 0 0

友情鏈接更多精彩內(nèi)容