Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
簡(jiǎn)介
Flink的特點(diǎn)是高吞吐低延遲。但是Flink中的某環(huán)節(jié)的數(shù)據(jù)處理邏輯需要和外部系統(tǒng)交互,調(diào)用耗時(shí)不可控會(huì)顯著降低集群性能,這時(shí)候怎么辦?
為了解決這個(gè)問(wèn)題,F(xiàn)link引入了AsyncFunction系列接口。使用這些異步接口調(diào)用外部服務(wù)的時(shí)候,不用再同步等待結(jié)果返回,只需要將數(shù)據(jù)存入隊(duì)列,外部服務(wù)接口返回時(shí)會(huì)更新隊(duì)列數(shù)據(jù)狀態(tài)。在調(diào)用外部服務(wù)后直接返回處理下一個(gè)異步調(diào)用,不需要同步等待結(jié)果。下游拉取數(shù)據(jù)的時(shí)候直接從隊(duì)列獲取即可。
使用方法
在講解AsyncFunction使用方法之前,我們先“偽造”一個(gè)耗時(shí)的外部系統(tǒng)調(diào)用。調(diào)用pullData會(huì)立即返回一個(gè)CompletableFuture。耗時(shí)5秒后生成的數(shù)據(jù)通過(guò)CompletableFuture返回。
public class AsyncIODemo implements Serializable {
private static final ExecutorService executorService = Executors.newFixedThreadPool(4);
public CompletableFuture<String> pullData(final String source) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
executorService.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
completableFuture.complete("Output value: " + source);
});
return completableFuture;
}
}
接下來(lái)編寫Flink作業(yè):
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")
val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String] {
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
// 調(diào)用前面的外部系統(tǒng)調(diào)用,拉取數(shù)據(jù)
val future = new AsyncIODemo().pullData(input)
// 這個(gè)方法是非阻塞的,一旦數(shù)據(jù)獲取成功,會(huì)立即調(diào)用resultFuture.complete方法
future.whenCompleteAsync(new BiConsumer[String, Throwable] {
override def accept(t: String, u: Throwable): Unit = {
resultFuture.complete(Array(t))
}
})
}
}, 10, TimeUnit.SECONDS)
// 上面設(shè)置最長(zhǎng)異步調(diào)用超時(shí)時(shí)間為10秒
asyncStream.print()
env.execute()
執(zhí)行Flink作業(yè)。我們發(fā)現(xiàn)雖然外部系統(tǒng)調(diào)用了4次,然而并沒(méi)有等待20秒后才輸出全部4個(gè)結(jié)果,實(shí)際上只等待了5秒左右。AsyncFunction的功能得到了驗(yàn)證。
注意:盡管
AsyncFunction字面上為異步調(diào)用,實(shí)際上asynInvoke方法仍然是同步的。絕不能在該方法中阻塞等待調(diào)用結(jié)果,這樣失去了它原本的作用。應(yīng)該在此處編寫異步回調(diào)方法,通過(guò)異步方式通知Flink數(shù)據(jù)已獲取完畢。
AsyncFunction
從這里開始進(jìn)入源碼分析環(huán)節(jié)。AsyncFunction接口源碼如下:
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
* exceptionally completed with a timeout exception.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
}
AsyncFunction接口有兩個(gè)方法:
- asyncInvoke:異步操作每一個(gè)數(shù)據(jù)流輸入元素。方法的第一個(gè)參數(shù)
input為數(shù)據(jù)流中的元素,第二個(gè)參數(shù)resultFuture用于收集異步處理的結(jié)果或者是錯(cuò)誤信息。不要在此方法內(nèi)同步等待數(shù)據(jù)處理邏輯,這樣會(huì)阻塞線程,降低作業(yè)吞吐量。 - timeout:定義數(shù)據(jù)超時(shí)處理邏輯。方法的參數(shù)和
asyncInvoke相同。AsyncFunction已經(jīng)提供了默認(rèn)實(shí)現(xiàn)。如果需要自定義超時(shí)邏輯,可以覆蓋這個(gè)方法。
ResultFuture
ResultFuture在異步操作的時(shí)候用于收集結(jié)果或錯(cuò)誤。
@PublicEvolving
public interface ResultFuture<OUT> {
/**
* Completes the result future with a collection of result objects.
*
* <p>Note that it should be called for exactly one time in the user code. Calling this function
* for multiple times will cause data lose.
*
* <p>Put all results in a {@link Collection} and then emit output.
*
* @param result A list of results.
*/
void complete(Collection<OUT> result);
/**
* Completes the result future exceptionally with an exception.
*
* @param error A Throwable object.
*/
void completeExceptionally(Throwable error);
}
它包含兩個(gè)方法:
- complete:如果異步邏輯順利返回,調(diào)用
complete方法轉(zhuǎn)入結(jié)果數(shù)據(jù)的集合對(duì)象,將數(shù)據(jù)傳遞給下游。 - completeExceptionally:如果異步邏輯需要錯(cuò)誤,需要調(diào)用這個(gè)方法將錯(cuò)誤傳入。
AsyncDataStream
該類是創(chuàng)建異步算子的工具類。它有2種方法:
- unorderedWait:不保證輸出元素的順序和讀入元素順序相同。
- orderedWait:保證輸出元素的順序和讀入元素順序相同。
這兩種方法每個(gè)還對(duì)應(yīng)兩個(gè)重載方法,但是參數(shù)含義是相同的。參數(shù)為:
- DataStream<IN> in:需要添加異步處理邏輯的數(shù)據(jù)流。
AsyncDataStream實(shí)際上是個(gè)工具類,并不是一種流的類型。 - AsyncFunction<IN, OUT> func:用戶定義的異步執(zhí)行邏輯。
- long timeout:異步任務(wù)超時(shí)時(shí)間。
- TimeUnit timeUnit:超時(shí)時(shí)間單位。
- int capacity:異步任務(wù)初始隊(duì)列長(zhǎng)度。只有部分重載方法有這個(gè)參數(shù)。默認(rèn)值為100。
下面是orderedWait其中一個(gè)重載方法的代碼。
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity) {
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}
它調(diào)用了addOperator方法,為DataStream添加一個(gè)OneInputTransformation,其中包含了AsyncWaitOperator。
其他幾個(gè)unorderedWait或orderedWait重載方法調(diào)用的都是addOperator,不再贅述。
接下來(lái)輪到了addOperator方法:
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncFunction.class,
0,
1,
new int[] {1, 0},
in.getType(),
Utils.getCallLocationName(),
true);
// create transform
AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);
return in.transform("async wait operator", outTypeInfo, operatorFactory);
}
這個(gè)方法創(chuàng)建了一個(gè)AsyncWaitOperatorFactory,將其包裝入transformation。factory在生成ExecutionGraph的時(shí)候?qū)?chuàng)建出AsyncWaitOperator。下一節(jié)我們一起分析下異步操作的核心AsyncWaitOperator。
AsyncWaitOperator
我們從AsyncWaitOperator的構(gòu)造方法開始。構(gòu)造方法參數(shù)中最重要的是outputMode,它決定了異步處理任務(wù)隊(duì)列的類型,從而決定用戶數(shù)據(jù)異步處理后是否嚴(yán)格按照輸入順序輸出。
public AsyncWaitOperator(
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull MailboxExecutor mailboxExecutor) {
super(asyncFunction);
// 設(shè)置可以和下游算子組成OperatorChain
setChainingStrategy(ChainingStrategy.ALWAYS);
Preconditions.checkArgument(
capacity > 0, "The number of concurrent async operation should be greater than 0.");
// 默認(rèn)隊(duì)列長(zhǎng)度
this.capacity = capacity;
// 枚舉值,決定用戶數(shù)據(jù)異步處理后是否嚴(yán)格按照輸入順序輸出
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
// 異步處理超時(shí)時(shí)間
this.timeout = timeout;
// 時(shí)間服務(wù),用于設(shè)置定時(shí)器,檢測(cè)超時(shí)等
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
// 用戶作業(yè)執(zhí)行線程池
this.mailboxExecutor = mailboxExecutor;
}
在operator創(chuàng)建出來(lái)后緊接著會(huì)執(zhí)行setup方法,進(jìn)行初始化操作。
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
// 調(diào)用父類初始化邏輯
super.setup(containingTask, config, output);
// 創(chuàng)建元素序列化器
this.inStreamElementSerializer =
new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
switch (outputMode) {
case ORDERED:
// 如果需要保持輸出數(shù)據(jù)有序
// 創(chuàng)建的隊(duì)列為OrderedStreamElementQueue
queue = new OrderedStreamElementQueue<>(capacity);
break;
case UNORDERED:
// 如果不需要保持輸出有序
// 創(chuàng)建的隊(duì)列為UnorderedStreamElementQueue
queue = new UnorderedStreamElementQueue<>(capacity);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
this.timestampedCollector = new TimestampedCollector<>(output);
}
setup方法根據(jù)outputMode是否保證輸出元素順序,來(lái)決定創(chuàng)建的StreamElementQueue。
接下來(lái)是處理元素的processElement方法。上游每個(gè)元素到來(lái)的時(shí)候,都會(huì)調(diào)用這個(gè)方法。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// add element first to the queue
// 將元素放入隊(duì)列中
// 返回隊(duì)列的entry
// 隊(duì)列中的entry類型實(shí)現(xiàn)了ResultFuture接口,后面介紹
final ResultFuture<OUT> entry = addToWorkQueue(element);
// 創(chuàng)建ResultHandler,包裝了超時(shí)定時(shí)器,輸入數(shù)據(jù)和resultFuture
// 用來(lái)操作resultFuture和超時(shí)定時(shí)器
final ResultHandler resultHandler = new ResultHandler(element, entry);
// register a timeout for the entry if timeout is configured
// 如果配置了超時(shí)時(shí)間
if (timeout > 0L) {
// 計(jì)算超時(shí)時(shí)刻
final long timeoutTimestamp =
timeout + getProcessingTimeService().getCurrentProcessingTime();
// 注冊(cè)一個(gè)定時(shí)器,在超時(shí)的時(shí)刻調(diào)用AsyncFunction的timeout方法
final ScheduledFuture<?> timeoutTimer =
getProcessingTimeService()
.registerTimer(
timeoutTimestamp,
timestamp ->
userFunction.timeout(
element.getValue(), resultHandler));
// 設(shè)置定時(shí)器給resultHandler
resultHandler.setTimeoutTimer(timeoutTimer);
}
// 調(diào)用AsyncFunction的asyncInvoke方法
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
繼續(xù)查看addToWorkQueue方法,將元素放入任務(wù)隊(duì)列中。
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
throws InterruptedException {
Optional<ResultFuture<OUT>> queueEntry;
// 如果元素添加隊(duì)列失敗,說(shuō)明隊(duì)列已滿
// 需要當(dāng)前線程讓出執(zhí)行機(jī)會(huì)給mailboxExecutor,即執(zhí)行用戶自定義處理邏輯
while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
mailboxExecutor.yield();
}
// 添加隊(duì)列成功,返回ResultFuture
return queueEntry.get();
}
workQueue我們?cè)诤竺嬗懻摗=酉聛?lái)分析ResultHandler。
ResultHandler
ResultHandler是ResultFuture的實(shí)現(xiàn)類,為AsyncFunction中兩個(gè)方法的參數(shù),讓用戶使用。分別處理異步處理完成(complete)和異步處理異常(completeExceptionally)兩種情況。
ResultHandler持有4個(gè)成員變量:
- timeoutTimer:定時(shí)器,在數(shù)據(jù)計(jì)算完畢(調(diào)用了
complete方法的時(shí)候),需要將timer清除,所以需要持有定時(shí)器。 - inputRecord:數(shù)據(jù)流中的原始數(shù)據(jù)。
- resultFuture:實(shí)際為元素隊(duì)列中的entry。這個(gè)后面介紹。
- completed:用來(lái)表示異步計(jì)算是否完成。
用戶的自定義異步處理邏輯在AsyncFunction中,異步處理完成的時(shí)候需要調(diào)用ResultHandler的complete方法。這個(gè)方法將completed變量標(biāo)記為true。然后調(diào)用processInMainbox方法。
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(
results, "Results must not be null, use empty collection to emit nothing");
// already completed (exceptionally or with previous complete call from ill-written
// AsyncFunction), so
// ignore additional result
if (!completed.compareAndSet(false, true)) {
return;
}
processInMailbox(results);
}
processInMainbox方法在MailboxExecutor線程池執(zhí)行resultFuture的complete方法,通知持有這些元素的隊(duì)列,該元素已經(jīng)處理完畢。然后清除掉超時(shí)時(shí)間timer。最后調(diào)用outputCompletedElement,輸出已完成的元素到下游。對(duì)應(yīng)的代碼如下所示:
private void processInMailbox(Collection<OUT> results) {
// move further processing into the mailbox thread
mailboxExecutor.execute(
() -> processResults(results),
"Result in AsyncWaitOperator of input %s",
results);
}
private void processResults(Collection<OUT> results) {
// Cancel the timer once we've completed the stream record buffer entry. This will
// remove the registered
// timer task
if (timeoutTimer != null) {
// canceling in mailbox thread avoids
// https://issues.apache.org/jira/browse/FLINK-13635
timeoutTimer.cancel(true);
}
// update the queue entry with the result
resultFuture.complete(results);
// now output all elements from the queue that have been completed (in the correct
// order)
outputCompletedElement();
}
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements()) {
mailboxExecutor.execute(
this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
}
}
}
StreamElementQueue
這一節(jié)我們分析異步處理的核心:StreamElementQueue。所有需要異步處理的數(shù)據(jù)都會(huì)在此隊(duì)列中排隊(duì)。
此隊(duì)列需要支持是否保持輸出元素順序這兩種情形,因此它具有兩個(gè)實(shí)現(xiàn)類:
- OrderedStreamElementQueue:元素輸出的順序嚴(yán)格和輸入的順序一致。
- UnorderedStreamElementQueue:不保證元素輸出的順序和輸入的一致。
該接口有如下方法:
@Internal
public interface StreamElementQueue<OUT> {
// 嘗試將元素放入隊(duì)列,如果隊(duì)列已滿,返回Optional.EMPTY
// 返回一個(gè)ResultFuture對(duì)象
Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);
// 彈出隊(duì)列頭部一個(gè)已經(jīng)完成異步處理的元素給outputCollector
void emitCompletedElement(TimestampedCollector<OUT> output);
// 檢查隊(duì)列頭部元素是否已完成異步處理
boolean hasCompletedElements();
// 其余方法省略
// ...
}
下面分別介紹這兩種子類Queue。
OrderedStreamElementQueue
這個(gè)隊(duì)列保證了輸出元素順序和輸入元素順序嚴(yán)格一致。它使用一個(gè)Queue<StreamElementQueueEntry<OUT>>類型隊(duì)列保存輸入數(shù)據(jù)。Queue使用的是ArrayDeque類型。
添加元素的tryPut方法如下。如果添加成功(未超出隊(duì)列容量限制),返回ResultFuture<OUT>,否則返回Optional.EMPTY。
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
if (queue.size() < capacity) {
// 只有隊(duì)列有剩余空間的情況下才加入隊(duì)列
// 根據(jù)element的類型(數(shù)據(jù)還是watermark),構(gòu)造對(duì)應(yīng)的隊(duì)列entry
StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);
// 將entry加入隊(duì)列
queue.add(queueEntry);
LOG.debug(
"Put element into ordered stream element queue. New filling degree "
+ "({}/{}).",
queue.size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into ordered stream element queue because it "
+ "was full ({}/{}).",
queue.size(),
capacity);
// 如果超出隊(duì)列容量,返回EMPTY
return Optional.empty();
}
}
createEntry方法根據(jù)element的類型,創(chuàng)建不同的隊(duì)列entry(StreamElementQueueEntry)。如果元素是數(shù)據(jù)類型,創(chuàng)建StreamRecordQueueEntry,如果元素是watermark,則創(chuàng)建WatermarkQueueEntry。
private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
if (streamElement.isRecord()) {
return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
}
if (streamElement.isWatermark()) {
return new WatermarkQueueEntry<>((Watermark) streamElement);
}
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
從隊(duì)列中取出元素的方法為emitCompletedElement。OrderedStreamElementQueue從隊(duì)列的頭部獲取一個(gè)元素,發(fā)送給outputCollector。hasCompletedElements方法也是檢測(cè)隊(duì)列頭部的元素是否已經(jīng)完成異步處理。所以說(shuō)OrderedStreamElementQueue能夠保證輸出數(shù)據(jù)和輸入數(shù)據(jù)的順序嚴(yán)格一致。但是帶來(lái)的問(wèn)題是處理延遲會(huì)受到異步處理時(shí)間的影響。
@Override
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}
UnorderedStreamElementQueue
和OrderedStreamElementQueue不同的是,UnorderedStreamElementQueue使用Deque<Segment<OUT>>類型雙向隊(duì)列來(lái)保存輸入數(shù)據(jù)。隊(duì)列的元素類型為Segment。需要注意的是,隊(duì)列中元素的個(gè)數(shù)并不等于元素的個(gè)數(shù),因?yàn)橐粋€(gè)Segment可以包含多個(gè)元素。
Segment內(nèi)部包含了兩個(gè)集合incompleteElements和completedElements,分別保存未完成處理的元素和已完成處理的元素。
/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}
添加元素的時(shí)候,需要判斷隊(duì)列entry是否已經(jīng)異步處理完畢,將其加入相應(yīng)的集合中。
void add(StreamElementQueueEntry<OUT> queueEntry) {
if (queueEntry.isDone()) {
completedElements.add(queueEntry);
} else {
incompleteElements.add(queueEntry);
}
}
當(dāng)entry中數(shù)據(jù)計(jì)算完畢的時(shí)候,需要調(diào)用complete方法,將這個(gè)entry移動(dòng)到已完成計(jì)算的元素集合中。
void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
// adding only to completed queue if not completed before
// there may be a real result coming after a timeout result, which is updated in the
// queue entry but
// the entry is not re-added to the complete queue
if (incompleteElements.remove(elementQueueEntry)) {
completedElements.add(elementQueueEntry);
}
}
在觸發(fā)計(jì)算的時(shí)候,需要獲取到已經(jīng)完成計(jì)算的元素。獲取方法為從completedElements中poll一個(gè)交給outputCollector。
int emitCompleted(TimestampedCollector<OUT> output) {
final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
if (completedEntry == null) {
return 0;
}
completedEntry.emitResult(output);
return 1;
}
分析到這里不難發(fā)現(xiàn),Segment放棄了元素順序保證,將已經(jīng)完成計(jì)算的元素挑出來(lái)放置到completedElements集合中,因此下游在拉取數(shù)據(jù)的時(shí)候,不會(huì)因?yàn)殛?duì)列中間有一個(gè)長(zhǎng)時(shí)間未complete的元素而阻塞,從而降低了延遲,并且減少了延遲抖動(dòng)。
那么問(wèn)題來(lái)了,看似一個(gè)Segment皆可以解決問(wèn)題,為何需要一個(gè)隊(duì)列來(lái)存放Segment?Segment是什么時(shí)候創(chuàng)建的?如何決定元素加入哪個(gè)Segment?接下來(lái)我們討論這些問(wèn)題。
首先分析tryPut方法。
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
// 檢查是否超出隊(duì)列長(zhǎng)度
if (size() < capacity) {
StreamElementQueueEntry<OUT> queueEntry;
// 根據(jù)不同的數(shù)據(jù)類型來(lái)生成不同的隊(duì)列entry
if (streamElement.isRecord()) {
queueEntry = addRecord((StreamRecord<?>) streamElement);
} else if (streamElement.isWatermark()) {
queueEntry = addWatermark((Watermark) streamElement);
} else {
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
numberOfEntries++;
LOG.debug(
"Put element into unordered stream element queue. New filling degree "
+ "({}/{}).",
size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into unordered stream element queue because it "
+ "was full ({}/{}).",
size(),
capacity);
return Optional.empty();
}
}
對(duì)比分析下addRecord和addWatermark方法,不難發(fā)現(xiàn)端倪。加入record的時(shí)候,如果隊(duì)列中沒(méi)有Segment則創(chuàng)建一個(gè)新的Segment,如果有,就在這個(gè)Segment中插入這個(gè)record。然而加入watermark這個(gè)方法則不同。它還會(huì)判斷隊(duì)列中最后一個(gè)Segment是否為空。如果為空,則創(chuàng)建一個(gè)新的Segment再把watermark放入。到這里我們就搞清楚了Segment是怎么創(chuàng)建和數(shù)據(jù)如何加入Segment這兩個(gè)問(wèn)題。數(shù)據(jù)流中每當(dāng)遇到一個(gè)watermark,就會(huì)使用新的Segment。
private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
// ensure that there is at least one segment
Segment<OUT> lastSegment;
if (segments.isEmpty()) {
lastSegment = addSegment(capacity);
} else {
lastSegment = segments.getLast();
}
// entry is bound to segment to notify it easily upon completion
StreamElementQueueEntry<OUT> queueEntry =
new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
lastSegment.add(queueEntry);
return queueEntry;
}
private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
Segment<OUT> watermarkSegment;
if (!segments.isEmpty() && segments.getLast().isEmpty()) {
// reuse already existing segment if possible (completely drained) or the new segment
// added at the end of
// this method for two succeeding watermarks
watermarkSegment = segments.getLast();
} else {
watermarkSegment = addSegment(1);
}
StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
watermarkSegment.add(watermarkEntry);
// add a new segment for actual elements
addSegment(capacity);
return watermarkEntry;
}
接下來(lái)我們看下發(fā)送已完成數(shù)據(jù)這個(gè)方法。和加入數(shù)據(jù)相反,這里獲取隊(duì)列中第一個(gè)Segment,從其中拿出一個(gè)已完成計(jì)算的元素。最后判斷下這個(gè)Segment中是否保存的還有元素,如果沒(méi)有的話,將這個(gè)Segment從隊(duì)列中彈出被垃圾回收。但是至少要確保隊(duì)列中有一個(gè)Segment。
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (segments.isEmpty()) {
return;
}
final Segment currentSegment = segments.getFirst();
numberOfEntries -= currentSegment.emitCompleted(output);
// remove any segment if there are further segments, if not leave it as an optimization even
// if empty
if (segments.size() > 1 && currentSegment.isEmpty()) {
segments.pop();
}
}
通過(guò)這種設(shè)計(jì)UnorderedStreamElementQueue能夠?qū)⒁贿B串?dāng)?shù)據(jù),通過(guò)watermark分隔,放入不同的Segment中。從emitCompletedElement方法可以看出,只有隊(duì)列頭部的Segment中的數(shù)據(jù)全部彈出或超時(shí)之后,才有可能去讀取下一個(gè)Segment中的數(shù)據(jù)。這種設(shè)計(jì)允許一定程度的輸出結(jié)果亂序,但是亂序程度不可能跨越watermark。從而保證了watermark語(yǔ)義的正確,不會(huì)由于亂序的容忍而導(dǎo)致部分?jǐn)?shù)據(jù)被意外認(rèn)為“來(lái)遲”。
本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請(qǐng)注明出處。