Flink 源碼之AsyncFunction

Flink源碼分析系列文檔目錄

請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄

簡(jiǎn)介

Flink的特點(diǎn)是高吞吐低延遲。但是Flink中的某環(huán)節(jié)的數(shù)據(jù)處理邏輯需要和外部系統(tǒng)交互,調(diào)用耗時(shí)不可控會(huì)顯著降低集群性能,這時(shí)候怎么辦?

為了解決這個(gè)問(wèn)題,F(xiàn)link引入了AsyncFunction系列接口。使用這些異步接口調(diào)用外部服務(wù)的時(shí)候,不用再同步等待結(jié)果返回,只需要將數(shù)據(jù)存入隊(duì)列,外部服務(wù)接口返回時(shí)會(huì)更新隊(duì)列數(shù)據(jù)狀態(tài)。在調(diào)用外部服務(wù)后直接返回處理下一個(gè)異步調(diào)用,不需要同步等待結(jié)果。下游拉取數(shù)據(jù)的時(shí)候直接從隊(duì)列獲取即可。

使用方法

在講解AsyncFunction使用方法之前,我們先“偽造”一個(gè)耗時(shí)的外部系統(tǒng)調(diào)用。調(diào)用pullData會(huì)立即返回一個(gè)CompletableFuture。耗時(shí)5秒后生成的數(shù)據(jù)通過(guò)CompletableFuture返回。

public class AsyncIODemo implements Serializable {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public CompletableFuture<String> pullData(final String source) {

        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        executorService.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            completableFuture.complete("Output value: " + source);
        });

        return completableFuture;
    }
}

接下來(lái)編寫Flink作業(yè):

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")

val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String] {
    override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
        // 調(diào)用前面的外部系統(tǒng)調(diào)用,拉取數(shù)據(jù)
        val future = new AsyncIODemo().pullData(input)
        // 這個(gè)方法是非阻塞的,一旦數(shù)據(jù)獲取成功,會(huì)立即調(diào)用resultFuture.complete方法
        future.whenCompleteAsync(new BiConsumer[String, Throwable] {
            override def accept(t: String, u: Throwable): Unit = {
                resultFuture.complete(Array(t))
            }
        })
    }
}, 10, TimeUnit.SECONDS)
// 上面設(shè)置最長(zhǎng)異步調(diào)用超時(shí)時(shí)間為10秒

asyncStream.print()
env.execute()

執(zhí)行Flink作業(yè)。我們發(fā)現(xiàn)雖然外部系統(tǒng)調(diào)用了4次,然而并沒(méi)有等待20秒后才輸出全部4個(gè)結(jié)果,實(shí)際上只等待了5秒左右。AsyncFunction的功能得到了驗(yàn)證。

注意:盡管AsyncFunction字面上為異步調(diào)用,實(shí)際上asynInvoke方法仍然是同步的。絕不能在該方法中阻塞等待調(diào)用結(jié)果,這樣失去了它原本的作用。應(yīng)該在此處編寫異步回調(diào)方法,通過(guò)異步方式通知Flink數(shù)據(jù)已獲取完畢。

AsyncFunction

從這里開始進(jìn)入源碼分析環(huán)節(jié)。AsyncFunction接口源碼如下:

@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     *     trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
     * exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
    }
}

AsyncFunction接口有兩個(gè)方法:

  • asyncInvoke:異步操作每一個(gè)數(shù)據(jù)流輸入元素。方法的第一個(gè)參數(shù)input為數(shù)據(jù)流中的元素,第二個(gè)參數(shù)resultFuture用于收集異步處理的結(jié)果或者是錯(cuò)誤信息。不要在此方法內(nèi)同步等待數(shù)據(jù)處理邏輯,這樣會(huì)阻塞線程,降低作業(yè)吞吐量。
  • timeout:定義數(shù)據(jù)超時(shí)處理邏輯。方法的參數(shù)和asyncInvoke相同。AsyncFunction已經(jīng)提供了默認(rèn)實(shí)現(xiàn)。如果需要自定義超時(shí)邏輯,可以覆蓋這個(gè)方法。

ResultFuture

ResultFuture在異步操作的時(shí)候用于收集結(jié)果或錯(cuò)誤。

@PublicEvolving
public interface ResultFuture<OUT> {
    /**
     * Completes the result future with a collection of result objects.
     *
     * <p>Note that it should be called for exactly one time in the user code. Calling this function
     * for multiple times will cause data lose.
     *
     * <p>Put all results in a {@link Collection} and then emit output.
     *
     * @param result A list of results.
     */
    void complete(Collection<OUT> result);

    /**
     * Completes the result future exceptionally with an exception.
     *
     * @param error A Throwable object.
     */
    void completeExceptionally(Throwable error);
}

它包含兩個(gè)方法:

  • complete:如果異步邏輯順利返回,調(diào)用complete方法轉(zhuǎn)入結(jié)果數(shù)據(jù)的集合對(duì)象,將數(shù)據(jù)傳遞給下游。
  • completeExceptionally:如果異步邏輯需要錯(cuò)誤,需要調(diào)用這個(gè)方法將錯(cuò)誤傳入。

AsyncDataStream

該類是創(chuàng)建異步算子的工具類。它有2種方法:

  • unorderedWait:不保證輸出元素的順序和讀入元素順序相同。
  • orderedWait:保證輸出元素的順序和讀入元素順序相同。

這兩種方法每個(gè)還對(duì)應(yīng)兩個(gè)重載方法,但是參數(shù)含義是相同的。參數(shù)為:

  • DataStream<IN> in:需要添加異步處理邏輯的數(shù)據(jù)流。AsyncDataStream實(shí)際上是個(gè)工具類,并不是一種流的類型。
  • AsyncFunction<IN, OUT> func:用戶定義的異步執(zhí)行邏輯。
  • long timeout:異步任務(wù)超時(shí)時(shí)間。
  • TimeUnit timeUnit:超時(shí)時(shí)間單位。
  • int capacity:異步任務(wù)初始隊(duì)列長(zhǎng)度。只有部分重載方法有這個(gè)參數(shù)。默認(rèn)值為100。

下面是orderedWait其中一個(gè)重載方法的代碼。

public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit,
    int capacity) {
    return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}

它調(diào)用了addOperator方法,為DataStream添加一個(gè)OneInputTransformation,其中包含了AsyncWaitOperator。

其他幾個(gè)unorderedWaitorderedWait重載方法調(diào)用的都是addOperator,不再贅述。

接下來(lái)輪到了addOperator方法:

private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    int bufSize,
    OutputMode mode) {

    TypeInformation<OUT> outTypeInfo =
        TypeExtractor.getUnaryOperatorReturnType(
        func,
        AsyncFunction.class,
        0,
        1,
        new int[] {1, 0},
        in.getType(),
        Utils.getCallLocationName(),
        true);

    // create transform
    AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
        new AsyncWaitOperatorFactory<>(
        in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);

    return in.transform("async wait operator", outTypeInfo, operatorFactory);
}

這個(gè)方法創(chuàng)建了一個(gè)AsyncWaitOperatorFactory,將其包裝入transformation。factory在生成ExecutionGraph的時(shí)候?qū)?chuàng)建出AsyncWaitOperator。下一節(jié)我們一起分析下異步操作的核心AsyncWaitOperator

AsyncWaitOperator

我們從AsyncWaitOperator的構(gòu)造方法開始。構(gòu)造方法參數(shù)中最重要的是outputMode,它決定了異步處理任務(wù)隊(duì)列的類型,從而決定用戶數(shù)據(jù)異步處理后是否嚴(yán)格按照輸入順序輸出。

public AsyncWaitOperator(
    @Nonnull AsyncFunction<IN, OUT> asyncFunction,
    long timeout,
    int capacity,
    @Nonnull AsyncDataStream.OutputMode outputMode,
    @Nonnull ProcessingTimeService processingTimeService,
    @Nonnull MailboxExecutor mailboxExecutor) {
    super(asyncFunction);

    // 設(shè)置可以和下游算子組成OperatorChain
    setChainingStrategy(ChainingStrategy.ALWAYS);

    Preconditions.checkArgument(
        capacity > 0, "The number of concurrent async operation should be greater than 0.");
    // 默認(rèn)隊(duì)列長(zhǎng)度
    this.capacity = capacity;

    // 枚舉值,決定用戶數(shù)據(jù)異步處理后是否嚴(yán)格按照輸入順序輸出
    this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

    // 異步處理超時(shí)時(shí)間
    this.timeout = timeout;

    // 時(shí)間服務(wù),用于設(shè)置定時(shí)器,檢測(cè)超時(shí)等
    this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

    // 用戶作業(yè)執(zhí)行線程池
    this.mailboxExecutor = mailboxExecutor;
}

在operator創(chuàng)建出來(lái)后緊接著會(huì)執(zhí)行setup方法,進(jìn)行初始化操作。

@Override
public void setup(
    StreamTask<?, ?> containingTask,
    StreamConfig config,
    Output<StreamRecord<OUT>> output) {
    // 調(diào)用父類初始化邏輯
    super.setup(containingTask, config, output);

    // 創(chuàng)建元素序列化器
    this.inStreamElementSerializer =
        new StreamElementSerializer<>(
        getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

    switch (outputMode) {
        case ORDERED:
            // 如果需要保持輸出數(shù)據(jù)有序
            // 創(chuàng)建的隊(duì)列為OrderedStreamElementQueue
            queue = new OrderedStreamElementQueue<>(capacity);
            break;
        case UNORDERED:
            // 如果不需要保持輸出有序
            // 創(chuàng)建的隊(duì)列為UnorderedStreamElementQueue
            queue = new UnorderedStreamElementQueue<>(capacity);
            break;
        default:
            throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
    }

    this.timestampedCollector = new TimestampedCollector<>(output);
}

setup方法根據(jù)outputMode是否保證輸出元素順序,來(lái)決定創(chuàng)建的StreamElementQueue。

接下來(lái)是處理元素的processElement方法。上游每個(gè)元素到來(lái)的時(shí)候,都會(huì)調(diào)用這個(gè)方法。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    // add element first to the queue
    // 將元素放入隊(duì)列中
    // 返回隊(duì)列的entry
    // 隊(duì)列中的entry類型實(shí)現(xiàn)了ResultFuture接口,后面介紹
    final ResultFuture<OUT> entry = addToWorkQueue(element);

    // 創(chuàng)建ResultHandler,包裝了超時(shí)定時(shí)器,輸入數(shù)據(jù)和resultFuture
    // 用來(lái)操作resultFuture和超時(shí)定時(shí)器
    final ResultHandler resultHandler = new ResultHandler(element, entry);

    // register a timeout for the entry if timeout is configured
    // 如果配置了超時(shí)時(shí)間
    if (timeout > 0L) {
        // 計(jì)算超時(shí)時(shí)刻
        final long timeoutTimestamp =
            timeout + getProcessingTimeService().getCurrentProcessingTime();

        // 注冊(cè)一個(gè)定時(shí)器,在超時(shí)的時(shí)刻調(diào)用AsyncFunction的timeout方法
        final ScheduledFuture<?> timeoutTimer =
            getProcessingTimeService()
            .registerTimer(
            timeoutTimestamp,
            timestamp ->
            userFunction.timeout(
                element.getValue(), resultHandler));

        // 設(shè)置定時(shí)器給resultHandler
        resultHandler.setTimeoutTimer(timeoutTimer);
    }

    // 調(diào)用AsyncFunction的asyncInvoke方法
    userFunction.asyncInvoke(element.getValue(), resultHandler);
}

繼續(xù)查看addToWorkQueue方法,將元素放入任務(wù)隊(duì)列中。

private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
    throws InterruptedException {

    Optional<ResultFuture<OUT>> queueEntry;
    
    // 如果元素添加隊(duì)列失敗,說(shuō)明隊(duì)列已滿
    // 需要當(dāng)前線程讓出執(zhí)行機(jī)會(huì)給mailboxExecutor,即執(zhí)行用戶自定義處理邏輯
    while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
        mailboxExecutor.yield();
    }

    // 添加隊(duì)列成功,返回ResultFuture
    return queueEntry.get();
}

workQueue我們?cè)诤竺嬗懻摗=酉聛?lái)分析ResultHandler。

ResultHandler

ResultHandlerResultFuture的實(shí)現(xiàn)類,為AsyncFunction中兩個(gè)方法的參數(shù),讓用戶使用。分別處理異步處理完成(complete)和異步處理異常(completeExceptionally)兩種情況。

ResultHandler持有4個(gè)成員變量:

  • timeoutTimer:定時(shí)器,在數(shù)據(jù)計(jì)算完畢(調(diào)用了complete方法的時(shí)候),需要將timer清除,所以需要持有定時(shí)器。
  • inputRecord:數(shù)據(jù)流中的原始數(shù)據(jù)。
  • resultFuture:實(shí)際為元素隊(duì)列中的entry。這個(gè)后面介紹。
  • completed:用來(lái)表示異步計(jì)算是否完成。

用戶的自定義異步處理邏輯在AsyncFunction中,異步處理完成的時(shí)候需要調(diào)用ResultHandlercomplete方法。這個(gè)方法將completed變量標(biāo)記為true。然后調(diào)用processInMainbox方法。

@Override
public void complete(Collection<OUT> results) {
    Preconditions.checkNotNull(
        results, "Results must not be null, use empty collection to emit nothing");

    // already completed (exceptionally or with previous complete call from ill-written
    // AsyncFunction), so
    // ignore additional result
    if (!completed.compareAndSet(false, true)) {
        return;
    }

    processInMailbox(results);
}

processInMainbox方法在MailboxExecutor線程池執(zhí)行resultFuturecomplete方法,通知持有這些元素的隊(duì)列,該元素已經(jīng)處理完畢。然后清除掉超時(shí)時(shí)間timer。最后調(diào)用outputCompletedElement,輸出已完成的元素到下游。對(duì)應(yīng)的代碼如下所示:

private void processInMailbox(Collection<OUT> results) {
    // move further processing into the mailbox thread
    mailboxExecutor.execute(
        () -> processResults(results),
        "Result in AsyncWaitOperator of input %s",
        results);
}

private void processResults(Collection<OUT> results) {
    // Cancel the timer once we've completed the stream record buffer entry. This will
    // remove the registered
    // timer task
    if (timeoutTimer != null) {
        // canceling in mailbox thread avoids
        // https://issues.apache.org/jira/browse/FLINK-13635
        timeoutTimer.cancel(true);
    }

    // update the queue entry with the result
    resultFuture.complete(results);
    // now output all elements from the queue that have been completed (in the correct
    // order)
    outputCompletedElement();
}

private void outputCompletedElement() {
    if (queue.hasCompletedElements()) {
        // emit only one element to not block the mailbox thread unnecessarily
        queue.emitCompletedElement(timestampedCollector);
        // if there are more completed elements, emit them with subsequent mails
        if (queue.hasCompletedElements()) {
            mailboxExecutor.execute(
                this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
        }
    }
}

StreamElementQueue

這一節(jié)我們分析異步處理的核心:StreamElementQueue。所有需要異步處理的數(shù)據(jù)都會(huì)在此隊(duì)列中排隊(duì)。

此隊(duì)列需要支持是否保持輸出元素順序這兩種情形,因此它具有兩個(gè)實(shí)現(xiàn)類:

  • OrderedStreamElementQueue:元素輸出的順序嚴(yán)格和輸入的順序一致。
  • UnorderedStreamElementQueue:不保證元素輸出的順序和輸入的一致。

該接口有如下方法:

@Internal
public interface StreamElementQueue<OUT> {

    // 嘗試將元素放入隊(duì)列,如果隊(duì)列已滿,返回Optional.EMPTY
    // 返回一個(gè)ResultFuture對(duì)象
    Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);

    // 彈出隊(duì)列頭部一個(gè)已經(jīng)完成異步處理的元素給outputCollector
    void emitCompletedElement(TimestampedCollector<OUT> output);

    // 檢查隊(duì)列頭部元素是否已完成異步處理
    boolean hasCompletedElements();

    // 其余方法省略
    // ...
}

下面分別介紹這兩種子類Queue。

OrderedStreamElementQueue

這個(gè)隊(duì)列保證了輸出元素順序和輸入元素順序嚴(yán)格一致。它使用一個(gè)Queue<StreamElementQueueEntry<OUT>>類型隊(duì)列保存輸入數(shù)據(jù)。Queue使用的是ArrayDeque類型。

添加元素的tryPut方法如下。如果添加成功(未超出隊(duì)列容量限制),返回ResultFuture<OUT>,否則返回Optional.EMPTY。

@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
    if (queue.size() < capacity) {
        // 只有隊(duì)列有剩余空間的情況下才加入隊(duì)列
        // 根據(jù)element的類型(數(shù)據(jù)還是watermark),構(gòu)造對(duì)應(yīng)的隊(duì)列entry
        StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

        // 將entry加入隊(duì)列
        queue.add(queueEntry);

        LOG.debug(
            "Put element into ordered stream element queue. New filling degree "
            + "({}/{}).",
            queue.size(),
            capacity);

        return Optional.of(queueEntry);
    } else {
        LOG.debug(
            "Failed to put element into ordered stream element queue because it "
            + "was full ({}/{}).",
            queue.size(),
            capacity);

        // 如果超出隊(duì)列容量,返回EMPTY
        return Optional.empty();
    }
}

createEntry方法根據(jù)element的類型,創(chuàng)建不同的隊(duì)列entry(StreamElementQueueEntry)。如果元素是數(shù)據(jù)類型,創(chuàng)建StreamRecordQueueEntry,如果元素是watermark,則創(chuàng)建WatermarkQueueEntry。

private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
    if (streamElement.isRecord()) {
        return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
    }
    if (streamElement.isWatermark()) {
        return new WatermarkQueueEntry<>((Watermark) streamElement);
    }
    throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}

從隊(duì)列中取出元素的方法為emitCompletedElement。OrderedStreamElementQueue從隊(duì)列的頭部獲取一個(gè)元素,發(fā)送給outputCollector。hasCompletedElements方法也是檢測(cè)隊(duì)列頭部的元素是否已經(jīng)完成異步處理。所以說(shuō)OrderedStreamElementQueue能夠保證輸出數(shù)據(jù)和輸入數(shù)據(jù)的順序嚴(yán)格一致。但是帶來(lái)的問(wèn)題是處理延遲會(huì)受到異步處理時(shí)間的影響。

@Override
public boolean hasCompletedElements() {
    return !queue.isEmpty() && queue.peek().isDone();
}

@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
    if (hasCompletedElements()) {
        final StreamElementQueueEntry<OUT> head = queue.poll();
        head.emitResult(output);
    }
}

UnorderedStreamElementQueue

OrderedStreamElementQueue不同的是,UnorderedStreamElementQueue使用Deque<Segment<OUT>>類型雙向隊(duì)列來(lái)保存輸入數(shù)據(jù)。隊(duì)列的元素類型為Segment。需要注意的是,隊(duì)列中元素的個(gè)數(shù)并不等于元素的個(gè)數(shù),因?yàn)橐粋€(gè)Segment可以包含多個(gè)元素。

Segment內(nèi)部包含了兩個(gè)集合incompleteElementscompletedElements,分別保存未完成處理的元素和已完成處理的元素。

/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;

Segment(int initialCapacity) {
    incompleteElements = new HashSet<>(initialCapacity);
    completedElements = new ArrayDeque<>(initialCapacity);
}

添加元素的時(shí)候,需要判斷隊(duì)列entry是否已經(jīng)異步處理完畢,將其加入相應(yīng)的集合中。

void add(StreamElementQueueEntry<OUT> queueEntry) {
    if (queueEntry.isDone()) {
        completedElements.add(queueEntry);
    } else {
        incompleteElements.add(queueEntry);
    }
}

當(dāng)entry中數(shù)據(jù)計(jì)算完畢的時(shí)候,需要調(diào)用complete方法,將這個(gè)entry移動(dòng)到已完成計(jì)算的元素集合中。

void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
    // adding only to completed queue if not completed before
    // there may be a real result coming after a timeout result, which is updated in the
    // queue entry but
    // the entry is not re-added to the complete queue
    if (incompleteElements.remove(elementQueueEntry)) {
        completedElements.add(elementQueueEntry);
    }
}

在觸發(fā)計(jì)算的時(shí)候,需要獲取到已經(jīng)完成計(jì)算的元素。獲取方法為從completedElementspoll一個(gè)交給outputCollector。

int emitCompleted(TimestampedCollector<OUT> output) {
    final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
    if (completedEntry == null) {
        return 0;
    }
    completedEntry.emitResult(output);
    return 1;
}

分析到這里不難發(fā)現(xiàn),Segment放棄了元素順序保證,將已經(jīng)完成計(jì)算的元素挑出來(lái)放置到completedElements集合中,因此下游在拉取數(shù)據(jù)的時(shí)候,不會(huì)因?yàn)殛?duì)列中間有一個(gè)長(zhǎng)時(shí)間未complete的元素而阻塞,從而降低了延遲,并且減少了延遲抖動(dòng)。

那么問(wèn)題來(lái)了,看似一個(gè)Segment皆可以解決問(wèn)題,為何需要一個(gè)隊(duì)列來(lái)存放Segment?Segment是什么時(shí)候創(chuàng)建的?如何決定元素加入哪個(gè)Segment?接下來(lái)我們討論這些問(wèn)題。

首先分析tryPut方法。

@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
    // 檢查是否超出隊(duì)列長(zhǎng)度
    if (size() < capacity) {
        StreamElementQueueEntry<OUT> queueEntry;
        // 根據(jù)不同的數(shù)據(jù)類型來(lái)生成不同的隊(duì)列entry
        if (streamElement.isRecord()) {
            queueEntry = addRecord((StreamRecord<?>) streamElement);
        } else if (streamElement.isWatermark()) {
            queueEntry = addWatermark((Watermark) streamElement);
        } else {
            throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
        }

        numberOfEntries++;

        LOG.debug(
            "Put element into unordered stream element queue. New filling degree "
            + "({}/{}).",
            size(),
            capacity);

        return Optional.of(queueEntry);
    } else {
        LOG.debug(
            "Failed to put element into unordered stream element queue because it "
            + "was full ({}/{}).",
            size(),
            capacity);

        return Optional.empty();
    }
}

對(duì)比分析下addRecordaddWatermark方法,不難發(fā)現(xiàn)端倪。加入record的時(shí)候,如果隊(duì)列中沒(méi)有Segment則創(chuàng)建一個(gè)新的Segment,如果有,就在這個(gè)Segment中插入這個(gè)record。然而加入watermark這個(gè)方法則不同。它還會(huì)判斷隊(duì)列中最后一個(gè)Segment是否為空。如果為空,則創(chuàng)建一個(gè)新的Segment再把watermark放入。到這里我們就搞清楚了Segment是怎么創(chuàng)建和數(shù)據(jù)如何加入Segment這兩個(gè)問(wèn)題。數(shù)據(jù)流中每當(dāng)遇到一個(gè)watermark,就會(huì)使用新的Segment。

private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
    // ensure that there is at least one segment
    Segment<OUT> lastSegment;
    if (segments.isEmpty()) {
        lastSegment = addSegment(capacity);
    } else {
        lastSegment = segments.getLast();
    }

    // entry is bound to segment to notify it easily upon completion
    StreamElementQueueEntry<OUT> queueEntry =
        new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
    lastSegment.add(queueEntry);
    return queueEntry;
}

private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
    Segment<OUT> watermarkSegment;
    if (!segments.isEmpty() && segments.getLast().isEmpty()) {
        // reuse already existing segment if possible (completely drained) or the new segment
        // added at the end of
        // this method for two succeeding watermarks
        watermarkSegment = segments.getLast();
    } else {
        watermarkSegment = addSegment(1);
    }

    StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
    watermarkSegment.add(watermarkEntry);

    // add a new segment for actual elements
    addSegment(capacity);
    return watermarkEntry;
}

接下來(lái)我們看下發(fā)送已完成數(shù)據(jù)這個(gè)方法。和加入數(shù)據(jù)相反,這里獲取隊(duì)列中第一個(gè)Segment,從其中拿出一個(gè)已完成計(jì)算的元素。最后判斷下這個(gè)Segment中是否保存的還有元素,如果沒(méi)有的話,將這個(gè)Segment從隊(duì)列中彈出被垃圾回收。但是至少要確保隊(duì)列中有一個(gè)Segment。

@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
    if (segments.isEmpty()) {
        return;
    }
    final Segment currentSegment = segments.getFirst();
    numberOfEntries -= currentSegment.emitCompleted(output);

    // remove any segment if there are further segments, if not leave it as an optimization even
    // if empty
    if (segments.size() > 1 && currentSegment.isEmpty()) {
        segments.pop();
    }
}

通過(guò)這種設(shè)計(jì)UnorderedStreamElementQueue能夠?qū)⒁贿B串?dāng)?shù)據(jù),通過(guò)watermark分隔,放入不同的Segment中。從emitCompletedElement方法可以看出,只有隊(duì)列頭部的Segment中的數(shù)據(jù)全部彈出或超時(shí)之后,才有可能去讀取下一個(gè)Segment中的數(shù)據(jù)。這種設(shè)計(jì)允許一定程度的輸出結(jié)果亂序,但是亂序程度不可能跨越watermark。從而保證了watermark語(yǔ)義的正確,不會(huì)由于亂序的容忍而導(dǎo)致部分?jǐn)?shù)據(jù)被意外認(rèn)為“來(lái)遲”。

本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請(qǐng)注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容