一、首先看到入口地方:
@Path("/v1/cluster")
public class ClusterStatsResource
{
private final InternalNodeManager nodeManager;
private final QueryManager queryManager;
private final boolean isIncludeCoordinator;
有三個狀態(tài):節(jié)點管理器、查詢管理器、是否包含協(xié)調(diào)器
二、提供的服務(wù):
@GET
@Produces(MediaType.APPLICATION_JSON)
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
省略。。。。
返回的是ClusterStats信息
三、ClusterStats狀態(tài)有哪些?
public static class ClusterStats
{
private final long runningQueries;
private final long blockedQueries;
private final long queuedQueries;
private final long activeWorkers;
private final long runningDrivers;
private final double reservedMemory;
private final double rowInputRate;
private final double byteInputRate;
private final double cpuTimeRate;
此處我們可以知道此接口返回的是:運行查詢數(shù)、阻塞個數(shù)、進(jìn)入排隊數(shù)、活動的worker數(shù)、正在運行的drivers數(shù) 等信息。
四、ClusterStats狀態(tài)是如何獲取的呢?
由兩個最開始ClusterStatsResource中的兩個狀態(tài)提供:
InternalNodeManager
QueryManager
五、先看InternalNodeManager提供什么?
@Inject
public DiscoveryNodeManager(
@ServiceType("presto") ServiceSelector serviceSelector,
NodeInfo nodeInfo,
FailureDetector failureDetector,
NodeVersion expectedNodeVersion,
@ForNodeManager HttpClient httpClient)
{
省略。。。
this.currentNode = refreshNodesInternal();
}
看最后一行,refreshNodesInternal。
//獲取所有節(jié)點的狀態(tài)信息,通過/v1/service來獲取。包括location、節(jié)點狀態(tài)、節(jié)點ID、UUID等信息
Set<ServiceDescriptor> services = serviceSelector.selectAllServices().stream()
.filter(service -> !failureDetector.getFailed().contains(service))
.collect(toImmutableSet());
//獲取所有節(jié)點狀況
allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build());
activeNodesByConnectorId = byConnectorIdBuilder.build();
coordinators = coordinatorsBuilder.build();
每隔5秒來,由協(xié)調(diào)器節(jié)點主動去查詢workers狀態(tài)。而且在更新完成5s之后,就調(diào)用上面的refreshNodesInternal方法。通過/v1/service來獲取節(jié)點信息,把新節(jié)點加入到DiscoveryNodeManager的一個map中nodeStates。
@PostConstruct
public void startPollingNodeStates()
{
// 如果是協(xié)調(diào)器節(jié)點,就定時5s去刷新拉去worker節(jié)點數(shù)據(jù)
if (getCoordinators().contains(currentNode)) {
nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
AllNodes allNodes = getAllNodes();
<b>從上面我們已經(jīng)看出來ClusterStats的activeWorkers信息可以從InternalNodeManager的nodeStates中獲取。</b>
六、再看QueryManager提供什么?
他的實現(xiàn)類是:SqlQueryManager
QueryManager提供了getAllQueryInfo方法給ClusterStatsResource來獲取ClusterStats中的狀態(tài)信息。
@Override
public List<QueryInfo> getAllQueryInfo()
{
return queries.values().stream()
.map(queryExecution -> {
try {
return queryExecution.getQueryInfo();
}
catch (RuntimeException ignored) {
return null;
}
})
.filter(Objects::nonNull)
.collect(toImmutableList());
}
這個類在創(chuàng)建query時候,加入到 queries中
private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
同時對每一個query添加監(jiān)聽器,一旦執(zhí)行狀態(tài)改變,就更新狀態(tài)query狀態(tài)即:QueryInfo
七、回頭看看ClusterStats中的狀態(tài)
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
long activeNodes = nodeManager.getNodes(NodeState.ACTIVE).size();
if (!isIncludeCoordinator) {
activeNodes -= 1;
}
long runningDrivers = 0;
double memoryReservation = 0;
double rowInputRate = 0;
double byteInputRate = 0;
double cpuTimeRate = 0;
for (QueryInfo query : queryManager.getAllQueryInfo()) {
if (query.getState() == QueryState.QUEUED) {
queuedQueries++;
}
else if (query.getState() == QueryState.RUNNING) {
if (query.getQueryStats().isFullyBlocked()) {
blockedQueries++;
}
else {
runningQueries++;
}
}
if (!query.getState().isDone()) {
double totalExecutionTimeSeconds = query.getQueryStats().getElapsedTime().getValue(SECONDS);
if (totalExecutionTimeSeconds != 0) {
byteInputRate += query.getQueryStats().getProcessedInputDataSize().toBytes() / totalExecutionTimeSeconds;
rowInputRate += query.getQueryStats().getProcessedInputPositions() / totalExecutionTimeSeconds;
cpuTimeRate += (query.getQueryStats().getTotalCpuTime().getValue(SECONDS)) / totalExecutionTimeSeconds;
}
memoryReservation += query.getQueryStats().getTotalMemoryReservation().toBytes();
runningDrivers += query.getQueryStats().getRunningDrivers();
}
}
return new ClusterStats(runningQueries, blockedQueries, queuedQueries, activeNodes, runningDrivers, memoryReservation, rowInputRate, byteInputRate, cpuTimeRate);
}
我們看到大量的信息是來自query.QueryInfo中的信息。
八、狀態(tài)監(jiān)聽器實現(xiàn)
待續(xù)~~~