Presto技術源碼解析總結-一個SQL的奇幻之旅 下

Presto技術總結 因為內容過長分為了上下兩集

2.4.5stage調度器開始調度

stage調度器主要包括以下三種

(1)Source task

  • SourcePartitionedScheduler

(2)Fixed task

  • FixedCountScheduler
  • FixedSourcePartitionedScheduler

分配策略主要包括下面兩種
(1) DynamicSplitPlacementPolicy

(2) FixedSplitPlacementPolicy

在query調度器中,調用stage的調度器

調用代碼為stageSchedulers.get(stage.getStageId()).schedule();

第一種為:SourcePartitionedScheduler

public synchronized ScheduleResult schedule()
{
    int overallSplitAssignmentCount = 0;
    ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
    List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
    boolean anyBlockedOnPlacements = false;
    boolean anyBlockedOnNextSplitBatch = false;
    boolean anyNotBlocked = false;

    for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
        Lifespan lifespan = entry.getKey();
        ScheduleGroup scheduleGroup = entry.getValue();
        Set<Split> pendingSplits = scheduleGroup.pendingSplits;

        if (scheduleGroup.state != ScheduleGroupState.DISCOVERING_SPLITS) {
            verify(scheduleGroup.nextSplitBatchFuture == null);
        }
        else if (pendingSplits.isEmpty()) {
            // try to get the next batch 如果沒有等待中的split,則開始獲取下一批的split
            if (scheduleGroup.nextSplitBatchFuture == null) {
                scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());

                long start = System.nanoTime();
                Futures.addCallback(scheduleGroup.nextSplitBatchFuture, new FutureCallback<SplitBatch>()
                {
                    @Override
                    public void onSuccess(SplitBatch result)
                    {
                        stage.recordGetSplitTime(start);
                    }

                    @Override
                    public void onFailure(Throwable t)
                    {
                    }
                });
            }

            if (scheduleGroup.nextSplitBatchFuture.isDone()) {
                SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
                scheduleGroup.nextSplitBatchFuture = null;
                pendingSplits.addAll(nextSplits.getSplits());
                if (nextSplits.isLastBatch() && scheduleGroup.state == ScheduleGroupState.DISCOVERING_SPLITS) {
                   //如果是最后一個batch,調度組的狀態(tài)是正在發(fā)現(xiàn)split的話,則將調度組的狀態(tài)更新為沒有更多的splits
                    scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
                }
            }
            else {
                overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
                anyBlockedOnNextSplitBatch = true;
                continue;
            }
        }

        Multimap<Node, Split> splitAssignment = ImmutableMultimap.of();
        if (!pendingSplits.isEmpty()) {
            if (!scheduleGroup.placementFuture.isDone()) {
                continue;
            }

            if (state == State.INITIALIZED) {
                state = State.SPLITS_ADDED;
            }

            // 計算分片分配的位置,根據(jù)前面生成的策略,這一步其實是將split分配到不同的node上去
            SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
            splitAssignment = splitPlacementResult.getAssignments();

            // remove splits with successful placements
            splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
            overallSplitAssignmentCount += splitAssignment.size();

            // if not completed placed, mark scheduleGroup as blocked on placement
            if (!pendingSplits.isEmpty()) {
                scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
                overallBlockedFutures.add(scheduleGroup.placementFuture);
                anyBlockedOnPlacements = true;
            }
        }

        // if no new splits will be assigned, update state and attach completion event
        Multimap<Node, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
        if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
            scheduleGroup.state = ScheduleGroupState.DONE;
            if (!lifespan.isTaskWide()) {
                Node node = ((FixedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
                noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
            }
        }

        //將split分配到不同的node上執(zhí)行,輸入node和split放回一個RemoteTask,然后執(zhí)行task
        overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));

        // Assert that "placement future is not done" implies "pendingSplits is not empty".
        // The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
        // However, there are other reasons that could lead to this.
        // Note that `computeAssignments` is quite broken:
        // 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
        // 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
        // As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
        if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
            anyNotBlocked = true;
        }
    }

    if (autoDropCompletedLifespans) {
        drainCompletedLifespans();
    }

    // * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
    //   If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
    // * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
    //   * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
    //     which may contain recently published splits. We must not ignore those.
    //   * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
    //     Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
    if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (scheduleGroups.isEmpty() && splitSource.isFinished())) {
        switch (state) {
            case INITIALIZED:
                // we have not scheduled a single split so far
                state = State.SPLITS_ADDED;
                ScheduleResult emptySplitScheduleResult = scheduleEmptySplit();
                overallNewTasks.addAll(emptySplitScheduleResult.getNewTasks());
                overallSplitAssignmentCount++;
                // fall through
            case SPLITS_ADDED:
                state = State.NO_MORE_SPLITS;
                splitSource.close();
                // fall through
            case NO_MORE_SPLITS:
                if (!scheduleGroups.isEmpty()) {
                    // we are blocked on split assignment
                    break;
                }
                state = State.FINISHED;
                whenFinishedOrNewLifespanAdded.set(null);
                // fall through
            case FINISHED:
                return new ScheduleResult(
                        true,
                        overallNewTasks.build(),
                        overallSplitAssignmentCount);
            default:
                throw new IllegalStateException("Unknown state");
        }
    }

    if (anyNotBlocked) {
        return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
    }

    // Only try to finalize task creation when scheduling would block
    overallNewTasks.addAll(finalizeTaskCreationIfNecessary());

    ScheduleResult.BlockedReason blockedReason;
    if (anyBlockedOnNextSplitBatch) {
        blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
    }
    else {
        blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
    }

    overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
    return new ScheduleResult(
            false,
            overallNewTasks.build(),
            nonCancellationPropagating(whenAnyComplete(overallBlockedFutures)),
            blockedReason,
            overallSplitAssignmentCount);
}

splitPlacementPolicy.computeAssignments()方法

DynamicSplitPlacementPolicy類  動態(tài)分配邏輯的實現(xiàn)

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
{
    //調用了nodeSelector的計算分配的方法
    return nodeSelector.computeAssignments(splits, remoteTasks.get());
}

//前面提到過nodeSelector接口的實現(xiàn)類基本是都是TopologyAwareNodeSelector,下面是TopologyAwareNodeSelector分配split的實現(xiàn)邏輯
@Override
    public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
    {
        //拿出selector里面的nodeMap
        NodeMap nodeMap = this.nodeMap.get().get();
        //創(chuàng)建分配Map
        Multimap<Node, Split> assignment = HashMultimap.create();
        NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);

        int[] topologicCounters = new int[topologicalSplitCounters.size()];
        Set<NetworkLocation> filledLocations = new HashSet<>();
        Set<Node> blockedExactNodes = new HashSet<>();
        boolean splitWaitingForAnyNode = false;
        for (Split split : splits) {
            //先判斷這個split能不能遠程獲取,如果不能的話,則取出split對應的網(wǎng)絡地址,看能否找到對應的node,包括Coordinator,然后放到候選節(jié)點里面,如果找不到對應的節(jié)點,則拋出異常
            if (!split.isRemotelyAccessible()) {
                List<Node> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
                if (candidateNodes.isEmpty()) {
                    log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
                    throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
                }
                //這個里面涉及到一個選擇策略,主要是最小候選節(jié)點數(shù),和最大task分配的split進行撮合對比
                Node chosenNode = bestNodeSplitCount(candidateNodes.iterator(), minCandidates, maxPendingSplitsPerTask, assignmentStats);
                //如果可以選出來
                if (chosenNode != null) {
                    //放入選擇的node和對應的split
                    assignment.put(chosenNode, split);
                    assignmentStats.addAssignedSplit(chosenNode);
                }
                // Exact node set won't matter, if a split is waiting for any node
                else if (!splitWaitingForAnyNode) {
                    //如果根據(jù)策略找不到,則先把所有的候選節(jié)點放到這個set中
                    blockedExactNodes.addAll(candidateNodes);
                }
                continue;
            }
            //如果不存在遠程獲取的問題,則根據(jù)下面的
            Node chosenNode = null;
            //生成網(wǎng)絡層數(shù),后面需要遞歸
            int depth = networkLocationSegmentNames.size();
            int chosenDepth = 0;
            Set<NetworkLocation> locations = new HashSet<>();
            //把這個split對應網(wǎng)絡地址遍歷放在NetworkLocation集合中
            for (HostAddress host : split.getAddresses()) {
                locations.add(networkLocationCache.get(host));
            }
            //如果緩存里面獲取不到地址,則放入root地址,并將網(wǎng)絡層數(shù)置為0
            if (locations.isEmpty()) {
                // Add the root location
                locations.add(ROOT_LOCATION);
                depth = 0;
            }
            // Try each address at progressively shallower network locations
            for (int i = depth; i >= 0 && chosenNode == null; i--) {
                for (NetworkLocation location : locations) {
                    // Skip locations which are only shallower than this level
                    // For example, locations which couldn't be located will be at the "root" location
                    if (location.getSegments().size() < i) {
                        continue;
                    }
                    location = location.subLocation(0, i);
                    if (filledLocations.contains(location)) {
                        continue;
                    }
                    Set<Node> nodes = nodeMap.getWorkersByNetworkPath().get(location);
                    chosenNode = bestNodeSplitCount(new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplits(i, depth), assignmentStats);
                    if (chosenNode != null) {
                        chosenDepth = i;
                        break;
                    }
                    filledLocations.add(location);
                }
            }
            if (chosenNode != null) {
                //放入選擇的node和對應的split
                assignment.put(chosenNode, split);
                assignmentStats.addAssignedSplit(chosenNode);
                topologicCounters[chosenDepth]++;
            }
            else {
                splitWaitingForAnyNode = true;
            }
        }
        for (int i = 0; i < topologicCounters.length; i++) {
            if (topologicCounters[i] > 0) {
                topologicalSplitCounters.get(i).update(topologicCounters[i]);
            }
        }

        ListenableFuture<?> blocked;
        int maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplits(0, networkLocationSegmentNames.size());
        if (splitWaitingForAnyNode) {
            blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        else {
            blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        return new SplitPlacementResult(blocked, assignment);
    }

SourcePartitionedScheduler類的assignSplits方法

//對傳入的參數(shù)splitAssignment進行遍歷,對每個entry都執(zhí)行如下操作
//根據(jù)node獲取該node上的task,若task為空,則新建一個task,否則將該node上的split提交給運行在該node上的task進行處理
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment, Multimap<Node, Lifespan> noMoreSplitsNotification)
{
    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();

    ImmutableSet<Node> nodes = ImmutableSet.<Node>builder()
            .addAll(splitAssignment.keySet())
            .addAll(noMoreSplitsNotification.keySet())
            .build();
    for (Node node : nodes) {
        // source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
        ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
                .putAll(partitionedNode, splitAssignment.get(node))
                .build();
        ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
        if (noMoreSplitsNotification.containsKey(node)) {
            noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
        }
        newTasks.addAll(stage.scheduleSplits(
                node,
                splits,
                noMoreSplits.build()));
    }
    return newTasks.build();
}

SqlStageExecution類的scheduleSplits方法

public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
    requireNonNull(node, "node is null");
    requireNonNull(splits, "splits is null");

    splitsScheduled.set(true);

    checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");

    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
    Collection<RemoteTask> tasks = this.tasks.get(node);
    RemoteTask task;
    if (tasks == null) {
        // The output buffer depends on the task id starting from 0 and being sequential, since each
        // task is assigned a private buffer based on task id.
        TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
        task = scheduleTask(node, taskId, splits);
        newTasks.add(task);
    }
    else {
        task = tasks.iterator().next();
        task.addSplits(splits);
    }
    if (noMoreSplitsNotification.size() > 1) {
        // The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
        // If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
        // These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
        throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
    }
    for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
        task.noMoreSplits(entry.getKey(), entry.getValue());
    }
    return newTasks.build();

第二種為FixedSourcePartitionedScheduler

public ScheduleResult schedule()
{
    // schedule a task on every node in the distribution
    List<RemoteTask> newTasks = ImmutableList.of();
    if (!scheduledTasks) {
        newTasks = partitioning.getPartitionToNode().entrySet().stream()
                .map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
                .collect(toImmutableList());
        scheduledTasks = true;
    }

    boolean allBlocked = true;
    List<ListenableFuture<?>> blocked = new ArrayList<>();
    BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;
    int splitsScheduled = 0;

    Iterator<SourcePartitionedScheduler> schedulerIterator = sourcePartitionedSchedulers.iterator();
    List<Lifespan> driverGroupsToStart = ImmutableList.of();
    while (schedulerIterator.hasNext()) {
        SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();

        for (Lifespan lifespan : driverGroupsToStart) {
            sourcePartitionedScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
        }

        ScheduleResult schedule = sourcePartitionedScheduler.schedule();
        splitsScheduled += schedule.getSplitsScheduled();
        if (schedule.getBlockedReason().isPresent()) {
            blocked.add(schedule.getBlocked());
            blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
        }
        else {
            verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
            allBlocked = false;
        }

        driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();

        if (schedule.isFinished()) {
            schedulerIterator.remove();
            sourcePartitionedScheduler.close();
        }
    }

    if (allBlocked) {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
    }
    else {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, splitsScheduled);
    }
}

第三種為FixedCountScheduler

public ScheduleResult schedule()
{
    List<RemoteTask> newTasks = partitionToNode.entrySet().stream()
            .map(entry -> taskScheduler.apply(entry.getValue(), entry.getKey()))
            .collect(toImmutableList());

    return new ScheduleResult(true, newTasks, 0);
}

2.5生成RemoteTask任務

根據(jù)Presto的架構,stage調度會產(chǎn)生task任務下發(fā)到worker上執(zhí)行

SqlStageExecution類的scheduleTask方法

private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits)
    {
        ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
        //搜集所有的sourceSplits,在類型為source的stage中,該方法傳入?yún)?shù)sourceSplits是值的,而在fixed和single的stage中,該方法的傳入?yún)?shù)sourceSplits是沒有值的
        initialSplits.putAll(sourceSplits);
        
        sourceTasks.forEach((planNodeId, task) -> {
            TaskStatus status = task.getTaskStatus();
            if (status.getState() != TaskState.FINISHED) {
                initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
            }
        });
        OutputBuffers outputBuffers = this.outputBuffers.get();
        checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");

        //創(chuàng)建遠程的task任務
        RemoteTask task = remoteTaskFactory.createRemoteTask(
                stateMachine.getSession(),
                taskId,
                node,
                stateMachine.getFragment(),
                initialSplits.build(),
                outputBuffers,
                nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
                summarizeTaskInfo);

        completeSources.forEach(task::noMoreSplits);
        allTasks.add(taskId);
        tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
        nodeTaskMap.addTask(node, task);
        task.addStateChangeListener(new StageTaskListener());
        if (!stateMachine.getState().isDone()) {
            task.start();
        }
        else {
            task.abort();
        }
        return task;
    }

RemoteTask接口對應實現(xiàn)類HttpRemoteTask

   public HttpRemoteTask(Session session,
            TaskId taskId,
            String nodeId,
            URI location,
            PlanFragment planFragment,
            Multimap<PlanNodeId, Split> initialSplits,
            OutputBuffers outputBuffers,
            HttpClient httpClient,
            Executor executor,
            ScheduledExecutorService updateScheduledExecutor,
            ScheduledExecutorService errorScheduledExecutor,
            Duration minErrorDuration,
            Duration maxErrorDuration,
            Duration taskStatusRefreshMaxWait,
            Duration taskInfoUpdateInterval,
            boolean summarizeTaskInfo,
            JsonCodec<TaskStatus> taskStatusCodec,
            JsonCodec<TaskInfo> taskInfoCodec,
            JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec,
            PartitionedSplitCountTracker partitionedSplitCountTracker,
            RemoteTaskStats stats)
    {
        requireNonNull(session, "session is null");
        requireNonNull(taskId, "taskId is null");
        requireNonNull(nodeId, "nodeId is null");
        requireNonNull(location, "location is null");
        requireNonNull(planFragment, "planFragment is null");
        requireNonNull(outputBuffers, "outputBuffers is null");
        requireNonNull(httpClient, "httpClient is null");
        requireNonNull(executor, "executor is null");
        requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        requireNonNull(stats, "stats is null");

        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
            this.taskId = taskId;
            this.session = session;
            this.nodeId = nodeId;
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, minErrorDuration, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.stats = stats;

            for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
                pendingSplits.put(entry.getKey(), scheduledSplit);
            }
            pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
                    .filter(initialSplits::containsKey)
                    .mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
                    .sum();

            List<BufferInfo> bufferStates = outputBuffers.getBuffers()
                    .keySet().stream()
                    .map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
                    .collect(toImmutableList());

            TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));

            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
                    this::failTask,
                    initialTask.getTaskStatus(),
                    taskStatusRefreshMaxWait,
                    taskStatusCodec,
                    executor,
                    httpClient,
                    minErrorDuration,
                    maxErrorDuration,
                    errorScheduledExecutor,
                    stats);

            this.taskInfoFetcher = new TaskInfoFetcher(
                    this::failTask,
                    initialTask,
                    httpClient,
                    taskInfoUpdateInterval,
                    taskInfoCodec,
                    minErrorDuration,
                    maxErrorDuration,
                    summarizeTaskInfo,
                    executor,
                    updateScheduledExecutor,
                    errorScheduledExecutor,
                    stats);

            taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isDone()) {
                    cleanUpTask();
                }
                else {
                    partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
                    updateSplitQueueSpace();
                }
            });

            long timeout = minErrorDuration.toMillis() / MIN_RETRIES;
            this.requestTimeout = new Duration(timeout + taskStatusRefreshMaxWait.toMillis(), MILLISECONDS);
            partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
            updateSplitQueueSpace();
        }
    }

Task的start方法,開始輪詢對應的task狀態(tài)

public void start()
{
    try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
        // to start we just need to trigger an update
        scheduleUpdate();

        taskStatusFetcher.start();
        taskInfoFetcher.start();
    }
}

exchange用于從上游stage中獲取數(shù)據(jù),而outputBuffer則將當前stage的數(shù)據(jù)輸出給下游stage

2.6Task執(zhí)行

2.6.1Worker接收Task任務

前面創(chuàng)建RemoteTask后,通過http rest請求將task任務下放到對應的worker上去

@Path("/v1/task")
@POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
        requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

        Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager);
        TaskInfo taskInfo = taskManager.updateTask(session,
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds());

        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }

        return Response.ok().entity(taskInfo).build();
    }

SqlTaskManager類的updateTask方法

@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    requireNonNull(session, "session is null");
    requireNonNull(taskId, "taskId is null");
    requireNonNull(fragment, "fragment is null");
    requireNonNull(sources, "sources is null");
    requireNonNull(outputBuffers, "outputBuffers is null");

    if (resourceOvercommit(session)) {
        // TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
        queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
    }

    SqlTask sqlTask = tasks.getUnchecked(taskId);
    sqlTask.recordHeartbeat();
    return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    try {
        // The LazyOutput buffer does not support write methods, so the actual
        // output buffer must be established before drivers are created (e.g.
        // a VALUES query).
        outputBuffer.setOutputBuffers(outputBuffers);

        // assure the task execution is only created once
        SqlTaskExecution taskExecution;
        synchronized (this) {
            // is task already complete?
            TaskHolder taskHolder = taskHolderReference.get();
            if (taskHolder.isFinished()) {
                return taskHolder.getFinalTaskInfo();
            }
            taskExecution = taskHolder.getTaskExecution();
            if (taskExecution == null) {
                checkState(fragment.isPresent(), "fragment must be present");
                //首次的話會新建一個task執(zhí)行器
                taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources);
                taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
                needsPlan.set(false);
            }
        }

        if (taskExecution != null) {
            taskExecution.addSources(sources);
        }
    }
    catch (Error e) {
        failed(e);
        throw e;
    }
    catch (RuntimeException e) {
        failed(e);
    }

    return getTaskInfo();
}
public SqlTaskExecution create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
{
    boolean verboseStats = getVerboseStats(session);
    TaskContext taskContext = queryContext.addTaskContext(
            taskStateMachine,
            session,
            verboseStats,
            cpuTimerEnabled);

    LocalExecutionPlan localExecutionPlan;
    try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
        try {
            localExecutionPlan = planner.plan(
                    taskContext,
                    fragment.getRoot(),
                    fragment.getSymbols(),
                    fragment.getPartitioningScheme(),
                    fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
                    fragment.getPartitionedSources(),
                    outputBuffer);

            for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
                Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
                if (sourceId.isPresent() && fragment.isPartitionedSources(sourceId.get())) {
                    checkArgument(fragment.getPipelineExecutionStrategy() == driverFactory.getPipelineExecutionStrategy(),
                            "Partitioned pipelines are expected to have the same execution strategy as the fragment");
                }
                else {
                    checkArgument(fragment.getPipelineExecutionStrategy() != UNGROUPED_EXECUTION || driverFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION,
                            "When fragment execution strategy is ungrouped, all pipelines should have ungrouped execution strategy");
                }
            }
        }
        catch (Throwable e) {
            // planning failed
            taskStateMachine.failed(e);
            throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }
    return createSqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            sources,
            localExecutionPlan,
            taskExecutor,
            taskNotificationExecutor,
            queryMonitor);
}

SqlTaskExecution類的createSqlTaskExecution方法

static SqlTaskExecution createSqlTaskExecution(
        TaskStateMachine taskStateMachine,
        TaskContext taskContext,
        OutputBuffer outputBuffer,
        List<TaskSource> sources,
        LocalExecutionPlan localExecutionPlan,
        TaskExecutor taskExecutor,
        Executor notificationExecutor,
        QueryMonitor queryMonitor)
{
    SqlTaskExecution task = new SqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            localExecutionPlan,
            taskExecutor,
            queryMonitor,
            notificationExecutor);
    try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
        // The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
        // The call back is accessed from another thread, so this code can not be placed in the constructor.
        //tasks是一個全局緩存,根據(jù)taskId獲取已經(jīng)緩存的sqlTask,若沒有則新建一個
        SqlTask sqlTask = tasks.getUnchecked(taskId);
        sqlTask.recordHeartbeat();
        //執(zhí)行sqlTask,并返回執(zhí)行信息
        return sqlTask.updateTask(session, fragment, sources, outputBuffers);
    }
}

2.6.2Worker啟動執(zhí)行

Worker啟動的時候,調用TaskExecutor類的start方法,其主要作用就是處理在Worker上運行的所有Task中的Split

@PostConstruct
public synchronized void start()
{
    //runnerThreads 的值通過配置參數(shù):task.max-worker-threads進行配置的,默認值為當前cpu核數(shù)*4
    checkState(!closed, "TaskExecutor is closed");
    for (int i = 0; i < runnerThreads; i++) {
        addRunnerThread();
    }
    splitMonitorExecutor.scheduleWithFixedDelay(this::monitorActiveSplits, 1, 1, TimeUnit.MINUTES);
}

TaskExecutor類addRunnerThread方法

private synchronized void addRunnerThread()
{
    try {
        //Runner是本類TaskExecutor的內部類
        executor.execute(new TaskRunner());
    }
    catch (RejectedExecutionException ignored) {
    }
}

TaskRunner類

private class TaskRunner
        implements Runnable
{
    private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

    @Override
    public void run()
    {
        try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
            while (!closed && !Thread.currentThread().isInterrupted()) {
                // select next worker
                //獲取下一個需要處理的PrioritizedSplitRunner對象,PrioritizedSplitRunner是對作用于一個Split所有操作的包裝,封裝了作用于一個Split上的一系列的Operator
                //優(yōu)先級SplitRunner
                final PrioritizedSplitRunner split;
                try {
                    //從等待隊列中取出一個split
                    split = waitingSplits.take();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }

                String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
                try (SetThreadName splitName = new SetThreadName(threadId)) {
                    RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
                    runningSplitInfos.add(splitInfo);

                    //將取出的split加入到runningSplit隊列,該隊列中保存了所有正在處理的split
                    runningSplits.add(split);

                    ListenableFuture<?> blocked;
                    try {
                        //調用各個Split的process()方法
                        blocked = split.process();
                    }
                    finally {
                        runningSplitInfos.remove(splitInfo);
                        //執(zhí)行完畢之后,需要將Split從runningSplits中移除
                        runningSplits.remove(split);
                    }
                    //finished表示整個split是否已經(jīng)處理完畢
                    if (split.isFinished()) {
                        log.debug("%s is finished", split.getInfo());
                        splitFinished(split);
                    }
                    else {
                        //blocked表示本次執(zhí)行是否完畢
                        if (blocked.isDone()) {
                            //如果本次執(zhí)行完畢了,split還沒有被處理完畢,則繼續(xù)放到等待隊列中
                            waitingSplits.offer(split);
                        }
                        else {
                            //放到阻塞隊列中
                            blockedSplits.put(split, blocked);
                            blocked.addListener(() -> {
                                //一旦固定時間片執(zhí)行完畢,則從阻塞隊列中移除
                                blockedSplits.remove(split);
                                //重新設置優(yōu)先級
                                split.resetLevelPriority();
                                //重新放回到等待隊列中
                                waitingSplits.offer(split);
                            }, executor);
                        }
                    }
                }
                catch (Throwable t) {
                    // ignore random errors due to driver thread interruption
                    if (!split.isDestroyed()) {
                        if (t instanceof PrestoException) {
                            PrestoException e = (PrestoException) t;
                            log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
                        }
                        else {
                            log.error(t, "Error processing %s", split.getInfo());
                        }
                    }
                    splitFinished(split);
                }
            }
        }
        finally {
            //如果線程被中斷,或者TaskExecutor結束
            if (!closed) {
                //如果是線程被中斷,然后TaskExecutor尚未結束,則重新啟動一個Runner線程
                addRunnerThread();
            }
        }
    }
}

所有對split的處理均由split.process完成,此處的split是PrioritizedSplitRunner的實例

public ListenableFuture<?> process()
        throws Exception
{
    try {
        long startNanos = ticker.read();
        start.compareAndSet(0, startNanos);
        lastReady.compareAndSet(0, startNanos);
        processCalls.incrementAndGet();

        waitNanos.getAndAdd(startNanos - lastReady.get());

        CpuTimer timer = new CpuTimer();

        //調用split的processFor(Duration duration)方法進行實際的split的處理,這里的split是SplitRunner的實例,然而SplitRunner的實例主要是DriverSplitRunner,SPLIT_RUN_QUANTA值是一個時間段,默認為一秒
        ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

        CpuTimer.CpuDuration elapsed = timer.elapsedTime();

        long quantaScheduledNanos = ticker.read() - startNanos;
        scheduledNanos.addAndGet(quantaScheduledNanos);

        priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
        lastRun.set(ticker.read());

        if (blocked == NOT_BLOCKED) {
            unblockedQuantaWallTime.add(elapsed.getWall());
        }
        else {
            blockedQuantaWallTime.add(elapsed.getWall());
        }

        long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
        cpuTimeNanos.addAndGet(quantaCpuNanos);

        globalCpuTimeMicros.update(quantaCpuNanos / 1000);
        globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);

        return blocked;
    }
    catch (Throwable e) {
        finishedFuture.setException(e);
        throw e;
    }
}

2.6.3生成Driver

DriverSplitRunner類的processFor方法,DriverSplitRunner是SqlTaskExecution類的內部類

@Override
public ListenableFuture<?> processFor(Duration duration)
{
    //driver是作用于split上的一系列的operator的封裝類,driver需要處理的Split存儲在屬性newSources中
    Driver driver;
    synchronized (this) {
        //如果在執(zhí)行該方法前,DriverSplitRunner就已經(jīng)結束了,那么就沒有必要進行后續(xù)的操作了,直接返回一個value為null的ListenableFuture即可
        if (closed) {
            return Futures.immediateFuture(null);
        }
        //若當前的Driver為null,則需要首先根據(jù)Client指定的split創(chuàng)建一個driver,partitionedSplit是類DriverSplitRunner中的屬性,其類型為ScheduledSplit,而ScheduledSplit是Split的封裝類
        if (this.driver == null) {
            this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
        }
        //driver是作用于split上的一系列的operator的封裝類,driver需要處理的split存儲在屬性newSources中
        driver = this.driver;
    }

    return driver.processFor(duration);
}

2.6.4Driver執(zhí)行

Driver類的processFor方法

public ListenableFuture<?> processFor(Duration duration)
    {
        checkLockNotHeld("Can not process for a duration while holding the driver lock");

        requireNonNull(duration, "duration is null");

        // if the driver is blocked we don't need to continue
        SettableFuture<?> blockedFuture = driverBlockedFuture.get();
        if (!blockedFuture.isDone()) {
            return blockedFuture;
        }

        //最多可以運行時間
        long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);

        //當前線程獲得鎖,若有其他線程持有鎖,則最多等待100毫秒
        Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
            driverContext.startProcessTimer();
            driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
            try {
                long start = System.nanoTime();
                do {
                    //對split的實際處理,在processInternal中
                    ListenableFuture<?> future = processInternal();
                    if (!future.isDone()) {
                        return updateDriverBlockedFuture(future);
                    }
                }
                while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
            }
            finally {
                driverContext.getYieldSignal().reset();
                driverContext.recordProcessed();
            }
            return NOT_BLOCKED;
        });
        return result.orElse(NOT_BLOCKED);
    }

Driver類的processInternal方法

@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal()
{
    checkLockHeld("Lock must be held to call processInternal");

    handleMemoryRevoke();

    try {
        //如果有尚未處理的讀取的split,將未讀取的split加入到sourceOperator中
        processNewSources();

        //如果只有一個Operator則特別處理
        if (operators.size() == 1) {
            //如果當前Driver已經(jīng)執(zhí)行完畢,則返回NOT_BLOCKED
            if (driverContext.isDone()) {
                return NOT_BLOCKED;
            }

            //獲取Operator
            Operator current = operators.get(0);
            //判斷Operator是否阻塞
            Optional<ListenableFuture<?>> blocked = getBlockedFuture(current);
            if (blocked.isPresent()) {
                current.getOperatorContext().recordBlocked(blocked.get());
                return blocked.get();
            }

            //若未阻塞,則直接結束當前Operator
            // there is only one operator so just finish it
            current.getOperatorContext().startIntervalTimer();
            current.finish();
            current.getOperatorContext().recordFinish();
            return NOT_BLOCKED;
        }

        boolean movedPage = false;
        //若Operator的個數(shù)大于1,則執(zhí)行下面的循環(huán),從下面的循環(huán)可以看出,每次取出相鄰的兩個Operator,得到前一個Operator的輸出數(shù)據(jù),然后將該輸出數(shù)據(jù)作為后一個Operator的輸入數(shù)據(jù)
        for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
            //一次取出相鄰的兩個Operator
            Operator current = operators.get(i);
            Operator next = operators.get(i + 1);

            // skip blocked operator
            if (getBlockedFuture(current).isPresent()) {
                continue;
            }

            //如果當前Operator沒有結束,而且下一個Operator也需要輸入
            if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
                //從當前Operator中獲得OutputPage,然后將該page作為輸入,交給下一個Operator進行操作
                current.getOperatorContext().startIntervalTimer();
                //Operator對Page操作的核心邏輯,不同的Operator對Page的操作處理不一樣,下面以LimitOperator為示例
                Page page = current.getOutput();
                current.getOperatorContext().recordGetOutput(page);

                //將獲得的OutputPage交給下一個Operator進行處理
                if (page != null && page.getPositionCount() != 0) {
                    next.getOperatorContext().startIntervalTimer();
                    //Operator對Page操作的核心邏輯,不同的Operator對Page的操作處理不一樣
                    next.addInput(page);
                    next.getOperatorContext().recordAddInput(page);
                    //標示,表示進行了Page的移動
                    movedPage = true;
                }

                if (current instanceof SourceOperator) {
                    movedPage = true;
                }
            }

            //如果當前的Operator已經(jīng)完成了,則通知下一個Operator:不會再有輸入了,需要完成數(shù)據(jù)處理,并將結果進行刷新
            if (current.isFinished()) {
                // let next operator know there will be no more data
                next.getOperatorContext().startIntervalTimer();
                next.finish();
                next.getOperatorContext().recordFinish();
            }
        }

        //如果所有的Operator都已經(jīng)循環(huán)完畢了,但是沒有發(fā)生Page的移動,我們需要檢查是否有Operator被block住了
        if (!movedPage) {
            List<Operator> blockedOperators = new ArrayList<>();
            List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
            //循環(huán)所有的Operator,并獲得每個Operator的ListenableFuture對象,判斷:若當前Operator已經(jīng)執(zhí)行結束,則會返回其是否在等待額外的內存
            for (Operator operator : operators) {
                Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
                if (blocked.isPresent()) {
                    blockedOperators.add(operator);
                    blockedFutures.add(blocked.get());
                }
            }

            //若確實有Operator被阻塞住了
            if (!blockedFutures.isEmpty()) {
                // unblock when the first future is complete
                //任意一個ListenableFuture完成,就會解除當前Driver的阻塞狀態(tài)
                ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
                // driver records serial blocked time
                //當前Driver添加monitor實時監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
                driverContext.recordBlocked(blocked);
                // each blocked operator is responsible for blocking the execution
                // until one of the operators can continue
                //為每個Operator注冊監(jiān)聽器,實時監(jiān)聽是否已經(jīng)解除阻塞狀態(tài)
                for (Operator operator : blockedOperators) {
                    operator.getOperatorContext().recordBlocked(blocked);
                }
                return blocked;
            }
        }

        return NOT_BLOCKED;
    }
    catch (Throwable t) {
        List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
        if (interrupterStack == null) {
            driverContext.failed(t);
            throw t;
        }

        // Driver thread was interrupted which should only happen if the task is already finished.
        // If this becomes the actual cause of a failed query there is a bug in the task state machine.
        Exception exception = new Exception("Interrupted By");
        exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
        PrestoException newException = new PrestoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
        newException.addSuppressed(t);
        driverContext.failed(newException);
        throw newException;
    }
}

Driver類的getBlockedFuture判斷指定的Operator是否阻塞

private Optional<ListenableFuture<?>> getBlockedFuture(Operator operator)
{
    ListenableFuture<?> blocked = revokingOperators.get(operator);
    if (blocked != null) {
        // We mark operator as blocked regardless of blocked.isDone(), because finishMemoryRevoke has not been called yet.
        return Optional.of(blocked);
    }
    blocked = operator.isBlocked();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForRevocableMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    return Optional.empty();
}

2.6.5Operator執(zhí)行

Operator接口的getOutput()方法和addInput()方法是Operator處理Page的核心,這里以LimitOperator為示例

@Override
public void addInput(Page page)
{
    checkState(needsInput());

    if (page.getPositionCount() <= remainingLimit) {
        remainingLimit -= page.getPositionCount();
        nextPage = page;
    }
    else {
        Block[] blocks = new Block[page.getChannelCount()];
        for (int channel = 0; channel < page.getChannelCount(); channel++) {
            Block block = page.getBlock(channel);
            blocks[channel] = block.getRegion(0, (int) remainingLimit);
        }
        nextPage = new Page((int) remainingLimit, blocks);
        remainingLimit = 0;
    }
}

@Override
public Page getOutput()
{
    Page page = nextPage;
    nextPage = null;
    return page;
}

3.技術性改造

3.1支持Hive View

3.2自定義的Connector

3.3隱式轉化

3.4支持UDF

3.5性能調優(yōu)

未寫完善待續(xù)…

如有錯誤請及時指出,共同進步~

每天晚上更新~

如需轉載請附上本文鏈接,原創(chuàng)不易謝謝~

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容