一、查詢帶有restful api的信息,我們發(fā)現(xiàn)都是以“Resource”結(jié)尾的類

首先我們先從NodeResource.java中的"/v1/node"下手,他有一個成員變量:HeartbeatFailureDetector,類中提供了所有節(jié)點的執(zhí)行狀態(tài)的封裝信息:
start
uri
recentRequests
recentFailures
recentSuccesses
lastRequestTime
lastResponseTime
if (coordinator) {
binder.install(new FailureDetectorModule());
jaxrsBinder(binder).bind(NodeResource.class);
}
其中綁定了:
HeartbeatFailureDetector
HeartbeatFailureDetector這個類在PostConstruct階段(也就是類注入完成之后的階段),開啟了定時器,每隔5s去監(jiān)聽worker的服務(wù)狀況:
@PostConstruct
public void start()
{
if (isEnabled && started.compareAndSet(false, true)) {
executor.scheduleWithFixedDelay(new Runnable()
{
@Override
public void run()
{
try {
updateMonitoredServices();
}
catch (Throwable e) {
// ignore to avoid getting unscheduled
log.warn(e, "Error updating services");
}
}
}, 0, 5, TimeUnit.SECONDS);
}
}
其中updateMonitoredServices使用httpclient的method為header方法去獲取信息,并把異常信息封裝起來。
private void ping()
{
try {
stats.recordStart();
//prepareHead為head http請求方法
//public static Builder prepareHead() {
// return new Builder().setMethod("HEAD");
// }
httpClient.executeAsync(prepareHead().setUri(uri).build(), new ResponseHandler<Object, Exception>()
{
@Override
public Exception handleException(Request request, Exception exception)
{
// ignore error
stats.recordFailure(exception);
// TODO: this will technically cause an NPE in httpClient, but it's not triggered because
// we never call get() on the response future. This behavior needs to be fixed in airlift
return null;
}
@Override
public Object handle(Request request, Response response)
throws Exception
{
stats.recordSuccess();
return null;
}
});
}
catch (RuntimeException e) {
log.warn(e, "Error scheduling request for %s", uri);
}
}
上面state實例記錄著狀態(tài)信息,與ServiceDescriptor(各個服務(wù)器的描述符:uuid(隨機(jī)生成)、nodeId、location等信息,從配置文件node.properties中讀取)
那,還有一個問題是,協(xié)調(diào)節(jié)點怎么獲取其他worker節(jié)點的信息的呢?
在updateMonitoredServices方法中:
// selector.selectAllServices()獲取所有節(jié)點信息
Set<ServiceDescriptor> online = selector.selectAllServices().stream()
.filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId()))
.collect(toImmutableSet());
上面的selector是CachingServiceSelector類,此類在PostConstruct有這樣的動作:
@PostConstruct
public void start()
{
if (started.compareAndSet(false, true)) {
Preconditions.checkState(!executor.isShutdown(), "CachingServiceSelector has been destroyed");
// if discovery is available, get the initial set of servers before starting
try {
refresh().get(30, TimeUnit.SECONDS);
}
catch (Exception ignored) {
}
}
}
其中的refresh是:
ServiceDescriptors oldDescriptors = this.serviceDescriptors.get();
ListenableFuture<ServiceDescriptors> future;
if (oldDescriptors == null) {
//DiscoveryLookupClient lookupClient
future = lookupClient.getServices(type, pool);
}
else {
future = lookupClient.refreshServices(oldDescriptors);
}
上面的DiscoveryLookupClient是HttpDiscoveryLookupClient,看其中方法:
private ListenableFuture<ServiceDescriptors> lookup(final String type, final String pool, final ServiceDescriptors serviceDescriptors)
{
Preconditions.checkNotNull(type, "type is null");
URI uri = discoveryServiceURI.get();
if (uri == null) {
return Futures.immediateFailedCheckedFuture(new DiscoveryException("No discovery servers are available"));
}
uri = URI.create(uri + "/v1/service/" + type + "/");
if (pool != null) {
uri = uri.resolve(pool);
}
Builder requestBuilder = prepareGet()
.setUri(uri)
.setHeader("User-Agent", nodeInfo.getNodeId());
從上面可以看出,他會去調(diào)用service api,通過get請求將nodeid做為header信息。