Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
前言
本篇我們一起分析下Flink中流處理作業(yè)的初始化和執(zhí)行邏輯。
AbstractInvokable
AbstractInvokable是TaskManager中運(yùn)行的所有任務(wù)的父類。所有的讀取上游數(shù)據(jù),用戶數(shù)據(jù)處理邏輯(map,filter算子以及用戶自己編寫的processFunction等等)和發(fā)送處理過的數(shù)據(jù)到下游相關(guān)邏輯都在該類的invoke方法中得到執(zhí)行。
AbstractInvokable中與任務(wù)執(zhí)行相關(guān)的2個(gè)方法為:
- invoke方法:?jiǎn)?dòng)任務(wù)執(zhí)行的入口方法。實(shí)現(xiàn)類必須重寫這個(gè)方法。
- cancel方法:任務(wù)被取消或者是用戶終止任務(wù)的時(shí)候被調(diào)用
它有兩個(gè)實(shí)現(xiàn)類:
- BatchTask:所有批處理類型Task的基類。
- StreamTask:所有流處理類型Task的基類。
我們以流處理為重點(diǎn),下面詳細(xì)介紹下StreamTask這個(gè)類。
AbstractInvokable的創(chuàng)建
在開始分析StreamTask之前我們需要了解下它是在何處,如何被創(chuàng)建出來的。
翻閱Task線程的處理邏輯,不難發(fā)現(xiàn)它的invoke變量初始化位于Task的doRun方法。
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
這一行代碼使用用戶代碼類加載器(userCodeClassLoader),調(diào)用目標(biāo)類唯一參數(shù)為Environment類型的構(gòu)造方法,創(chuàng)建出invokable對(duì)象。
private static AbstractInvokable loadAndInstantiateInvokable(
ClassLoader classLoader, String className, Environment environment) throws Throwable {
final Class<? extends AbstractInvokable> invokableClass;
try {
// 使用指定的classloader加載className對(duì)應(yīng)的class,并轉(zhuǎn)換為AbstractInvokable類型
invokableClass =
Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
} catch (Throwable t) {
throw new Exception("Could not load the task's invokable class.", t);
}
Constructor<? extends AbstractInvokable> statelessCtor;
try {
// 獲取構(gòu)造函數(shù)
statelessCtor = invokableClass.getConstructor(Environment.class);
} catch (NoSuchMethodException ee) {
throw new FlinkException("Task misses proper constructor", ee);
}
// instantiate the class
try {
//noinspection ConstantConditions --> cannot happen
// 傳入environment變量,創(chuàng)建出新的對(duì)象
return statelessCtor.newInstance(environment);
} catch (InvocationTargetException e) {
// directly forward exceptions from the eager initialization
throw e.getTargetException();
} catch (Exception e) {
throw new FlinkException("Could not instantiate the task's invokable class.", e);
}
}
StreamTask
StreamTask類是所有流處理任務(wù)的基類。Task由TaskManager部署和執(zhí)行。Task是本地運(yùn)行單元。每一個(gè)Task包含了一個(gè)或多個(gè)operator。這些operator在同一個(gè)OperatorChain中。
StreamTask任務(wù)執(zhí)行生命周期包含:
- setInitialState:設(shè)置各個(gè)operator的初始狀態(tài)。對(duì)應(yīng)
initializeState方法。 - 調(diào)用
invoke方法。
其中invoke方法包含的邏輯可細(xì)分為:
- 創(chuàng)建出task相關(guān)配置,創(chuàng)建OperatorChain。
- 執(zhí)行operator的setup邏輯。
- 執(zhí)行task相關(guān)的初始化邏輯。
- 加載并初始化operator的狀態(tài)。
- 調(diào)用各個(gè)operator的open方法。
- 執(zhí)行各個(gè)operator內(nèi)的數(shù)據(jù)處理邏輯。
- 關(guān)閉operator。
- 銷毀operator。
- 任務(wù)清理操作。
下面我們從代碼層面詳細(xì)分析下invoke方法的處理流程。
invoke方法
本節(jié)我們分析StreamTask核心執(zhí)行邏輯invoke方法。invoke方法如下所示:
@Override
public final void invoke() throws Exception {
try {
// 調(diào)用作業(yè)執(zhí)行前相關(guān)準(zhǔn)備邏輯
beforeInvoke();
// final check to exit early before starting to run
// 如果任務(wù)被取消,拋出異常退出
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
// 執(zhí)行用戶編寫的task邏輯
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
// 再次檢查如果任務(wù)被取消,拋出異常退出
if (canceled) {
throw new CancelTaskException();
}
// 執(zhí)行調(diào)用后相關(guān)邏輯
afterInvoke();
} catch (Throwable invokeException) {
failing = !canceled;
try {
cleanUpInvoke();
}
// TODO: investigate why Throwable instead of Exception is used here.
catch (Throwable cleanUpException) {
Throwable throwable =
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
ExceptionUtils.rethrowException(throwable);
}
ExceptionUtils.rethrowException(invokeException);
}
// 執(zhí)行invoke后清理操作
cleanUpInvoke();
}
beforeInvoke方法
beforeInvoke方法主要為task的初始化操作,包含創(chuàng)建OperatorChain,讀取上游數(shù)據(jù)和下游數(shù)據(jù)輸出配置等。詳細(xì)內(nèi)容如下:
protected void beforeInvoke() throws Exception {
disposedOperators = false;
LOG.debug("Initializing {}.", getName());
// 創(chuàng)建出OperatorChain
// OperatorChain是JobGraph生成時(shí)的一箱優(yōu)化措施
// 將復(fù)合條件的多個(gè)StreamNode(對(duì)應(yīng)數(shù)據(jù)變換操作)合并到一個(gè)chain中
// 他們會(huì)被調(diào)度到同一個(gè)StreamTask中執(zhí)行
operatorChain = new OperatorChain<>(this, recordWriter);
// 獲取OperatorChain中第一個(gè)operator
mainOperator = operatorChain.getMainOperator();
// task specific initialization
// 執(zhí)行task專屬的初始化工作
// 這個(gè)是抽象方法
// 具體邏輯需要在子類中實(shí)現(xiàn)
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
// task動(dòng)作必須在StreamTaskActionExecutor中執(zhí)行,防止出現(xiàn)并發(fā)執(zhí)行問題,影響checkpoint
// 該executor實(shí)際為StreamTaskActionExecutor.IMMEDIATE,即在當(dāng)前線程直接運(yùn)行
actionExecutor.runThrowing(
() -> {
// 創(chuàng)建SequentialChannelStateReader,用于讀取checkpoint時(shí)保存的channel狀態(tài)
SequentialChannelStateReader reader =
getEnvironment()
.getTaskStateManager()
.getSequentialChannelStateReader();
// 獲取ResultPartitionWriter狀態(tài)
reader.readOutputData(
getEnvironment().getAllWriters(),
!configuration.isGraphContainingLoops());
// 初始化OperatorChain中所有的operator
// 調(diào)用他們的initializeState(初始化狀態(tài))和open(包含初始化動(dòng)作)方法
operatorChain.initializeStateAndOpenOperators(
createStreamTaskStateInitializer());
channelIOExecutor.execute(
() -> {
try {
// 獲取InputGate狀態(tài)
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
// 在inputGate狀態(tài)被讀取之后執(zhí)行
inputGate
.getStateConsumedFuture()
.thenRun(
() ->
// 在task線程中執(zhí)行
mainMailboxExecutor.execute(
// 執(zhí)行請(qǐng)求partition方法
inputGate::requestPartitions,
"Input gate request partitions"));
}
});
// 水池狀態(tài)為正在執(zhí)行
isRunning = true;
}
runMailboxLoop方法
runMailboxLoop方法啟動(dòng)task的數(shù)據(jù)輸入和處理邏輯:
public void runMailboxLoop() throws Exception {
mailboxProcessor.runMailboxLoop();
}
MailBoxProcessor在StreamTask的構(gòu)造函數(shù)中創(chuàng)建出來:
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
mailboxProcessor.runMailboxLoop()方法可以理解為在actionExecutor線程池執(zhí)行processInput方法。
processInput方法從上游(StreamTaskNetworkInput,InputGate)讀取數(shù)據(jù)。這部分邏輯參見Flink 源碼之節(jié)點(diǎn)間通信。
afterInvoke
afterInvoke方法內(nèi)容如下,概括起來為task執(zhí)行完畢后的清理工作,關(guān)閉operator等。
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
getCompletionFuture().exceptionally(unused -> null).join();
final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
// close all operators in a chain effect way
// 關(guān)閉OperatorChain中所有的operator
// 從前向后依次調(diào)用各個(gè)operator的close方法
operatorChain.closeOperators(actionExecutor);
// make sure no further checkpoint and notification actions happen.
// at the same time, this makes sure that during any "regular" exit where still
actionExecutor.runThrowing(
() -> {
// make sure no new timers can come
// 停止timer服務(wù)
FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);
// let mailbox execution reject all new letters from this point
// 準(zhǔn)備關(guān)閉mailboxProcessor,不再接受新的事件
mailboxProcessor.prepareClose();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
// 設(shè)置task狀態(tài)為停止
isRunning = false;
});
// processes the remaining mails; no new mails can be enqueued
// 處理積壓的事件
mailboxProcessor.drain();
// make sure all timers finish
// 等待所有的time都停止
timersFinishedFuture.get();
LOG.debug("Closed operators for task {}", getName());
// make sure all buffered data is flushed
// 處理掉buffer中的所有數(shù)據(jù)
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
// 依次廢棄掉OperatorChain中的所有operator(順序?yàn)閺念^到尾)
disposeAllOperators();
}
StreamTask的子類
StreamTask是所有流處理計(jì)算任務(wù)的父類,它本身是一個(gè)抽象類。為了處理不同類型的StreamOperator,StreamTask有多種不同的實(shí)現(xiàn)。幾個(gè)典型的實(shí)現(xiàn)如下:
- OneInputStreamTask:處理
OneInputStreamOperator,即只有一個(gè)輸入流的StreamOperator。 - TwoInputStreamTask:處理
TwoInputStreamOperator,具有2個(gè)輸入流。 - MultipleInputStreamTask:處理
MultipleInputStreamOperator,具有多個(gè)輸入流。 - SourceStreamTask:處理
StreamSource,即數(shù)據(jù)源。
接下來我們重點(diǎn)關(guān)注這些類實(shí)現(xiàn)的抽象方法。
OneInputStreamTask的init方法
它的init方法主要流程為創(chuàng)建網(wǎng)絡(luò)輸入與輸出,創(chuàng)建inputProcessor用于從網(wǎng)絡(luò)輸入讀取數(shù)據(jù),反序列化之后傳遞給網(wǎng)絡(luò)輸出。最后初始化數(shù)據(jù)流監(jiān)控。代碼和分析如下:
@Override
public void init() throws Exception {
// 獲取流作業(yè)配置
StreamConfig configuration = getConfiguration();
// 獲取網(wǎng)絡(luò)輸入流數(shù)量
int numberOfInputs = configuration.getNumberOfNetworkInputs();
if (numberOfInputs > 0) {
// 創(chuàng)建一個(gè)CheckpointedInputGate
// 該類型InputGate擁有一個(gè)CheckpointBarrierHandler,用來處理接收到的CheckpointBarrier
CheckpointedInputGate inputGate = createCheckpointedInputGate();
// 監(jiān)控相關(guān),設(shè)置流入數(shù)據(jù)條數(shù)計(jì)數(shù)器
Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
// 創(chuàng)建StreamTaskNetworkOutput
// 發(fā)送反序列化后的數(shù)據(jù)給task處理流程
DataOutput<IN> output = createDataOutput(numRecordsIn);
// 創(chuàng)建StreamTaskNetworkInput
// 包裝了CheckpointedInputGate,從中讀取網(wǎng)絡(luò)接收到的原始數(shù)據(jù)并發(fā)給反序列化器
StreamTaskInput<IN> input = createTaskInput(inputGate);
// 讀取輸入流配置
StreamConfig.InputConfig[] inputConfigs =
configuration.getInputs(getUserCodeClassLoader());
StreamConfig.InputConfig inputConfig = inputConfigs[0];
// 如果要求對(duì)數(shù)據(jù)排序
// 含義為數(shù)據(jù)按照key字段分組
// 在一段時(shí)間內(nèi)只會(huì)給task提供同一分組的數(shù)據(jù)
// 不同組的數(shù)據(jù)不會(huì)頻繁交替出現(xiàn)
if (requiresSorting(inputConfig)) {
checkState(
!configuration.isCheckpointingEnabled(),
"Checkpointing is not allowed with sorted inputs.");
input = wrapWithSorted(input);
}
// 注冊(cè)流入數(shù)據(jù)條數(shù)計(jì)數(shù)器監(jiān)控
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.reuseRecordsInputCounter(numRecordsIn);
// 創(chuàng)建inputProcessor
// 從網(wǎng)絡(luò)讀取數(shù)據(jù),反序列化后給output,然后把反序列化后的數(shù)據(jù)交給OperatorChain
inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
}
// 創(chuàng)建watermark監(jiān)控
mainOperator
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment()
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
}
其中創(chuàng)建CheckpointedInputGate的過程在 Flink 源碼之分布式快照 有介紹,請(qǐng)大家查閱。
TwoInputStreamTask的init方法
它的初始化方法和OneInputStreamTask的類似,只不過需要?jiǎng)?chuàng)建兩個(gè)InputGate。TwoInputStreamTask對(duì)應(yīng)CoOperator,即有兩個(gè)輸入流的operator(比如CoFlatmap)。
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
ClassLoader userClassLoader = getUserCodeClassLoader();
int numberOfInputs = configuration.getNumberOfNetworkInputs();
ArrayList<IndexedInputGate> inputList1 = new ArrayList<>();
ArrayList<IndexedInputGate> inputList2 = new ArrayList<>();
List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
for (int i = 0; i < numberOfInputs; i++) {
int inputType = inEdges.get(i).getTypeNumber();
IndexedInputGate reader = getEnvironment().getInputGate(i);
switch (inputType) {
case 1:
// 如果是輸入流1,加入到inputList1中
inputList1.add(reader);
break;
case 2:
// 如果是輸入流2,加入到inputList2中
inputList2.add(reader);
break;
default:
throw new RuntimeException("Invalid input type number: " + inputType);
}
}
// 創(chuàng)建CheckpointedInputGate,包裝了UnionInputGate
// 包裝了多個(gè)InputGate,ID相同的channel會(huì)被合并
// 這里創(chuàng)建出兩個(gè)UnionInputGate,每個(gè)UnionInputGate合并了多個(gè)inputType相同的InputGate
// 最后根據(jù)這個(gè)InputGate,創(chuàng)建出StreamTwoInputProcessor
createInputProcessor(
inputList1, inputList2, gateIndex -> inEdges.get(gateIndex).getPartitioner());
// 監(jiān)控相關(guān)部分,這里省略
// ...
}
MultipleInputStreamTask和上面的邏輯類似,不再贅述。
SourceStreamTask的init方法
@Override
protected void init() {
// we check if the source is actually inducing the checkpoints, rather
// than the trigger
// 獲取數(shù)據(jù)源數(shù)據(jù)產(chǎn)生邏輯SourceFunction
SourceFunction<?> source = mainOperator.getUserFunction();
// 如果source實(shí)現(xiàn)了這個(gè)接口,說明接收到CheckpointCoordinator發(fā)來的觸發(fā)checkpoint消息之時(shí)source不觸發(fā)checkpoint
// checkpoint的觸發(fā)由輸入數(shù)據(jù)控制
if (source instanceof ExternallyInducedSource) {
externallyInducedCheckpoints = true;
// 創(chuàng)建checkpoint觸發(fā)鉤子
ExternallyInducedSource.CheckpointTrigger triggerHook =
new ExternallyInducedSource.CheckpointTrigger() {
@Override
public void triggerCheckpoint(long checkpointId) throws FlinkException {
// TODO - we need to see how to derive those. We should probably not
// encode this in the
// TODO - source's trigger message, but do a handshake in this task
// between the trigger
// TODO - message from the master, and the source's trigger
// notification
final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault(),
configuration.isExactlyOnceCheckpointMode(),
configuration.isUnalignedCheckpointsEnabled(),
configuration.getAlignmentTimeout());
final long timestamp = System.currentTimeMillis();
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointId, timestamp);
try {
// 調(diào)用StreamTask的異步觸發(fā)checkpoint方法
SourceStreamTask.super
.triggerCheckpointAsync(
checkpointMetaData, checkpointOptions)
.get();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new FlinkException(e.getMessage(), e);
}
}
};
((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
}
// 配置checkpoint啟動(dòng)延遲時(shí)間監(jiān)控
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.gauge(
MetricNames.CHECKPOINT_START_DELAY_TIME,
this::getAsyncCheckpointStartDelayNanos);
}
StreamTask從上游獲取數(shù)據(jù)
StreamTask從上游獲取數(shù)據(jù)的調(diào)用鏈為:
- StreamTask.processInput
- inputProcessor.processInput
- StreamTaskNetworkInput.emitNext
- inputGate.pollNext
- inputChannel.getNextBuffer
StreamTask通過InputGate從上游其他Task獲取到數(shù)據(jù)。每個(gè)InputGate包含一個(gè)或多個(gè)InputChannel,根據(jù)數(shù)據(jù)是否走網(wǎng)絡(luò)通信,這些InputChannel分為RemoteInputChannel和LocalInputChannel。其中RemoteInputChannel使用Netty通過網(wǎng)絡(luò)從上游task的ResultSubPartition獲取數(shù)據(jù),適用與本task和上游task運(yùn)行在不同集群節(jié)點(diǎn)的情況。和它相反的是LocalInputChannel,適用于本task和上游task運(yùn)行在同一節(jié)點(diǎn)的情況,從上游task獲取數(shù)據(jù)不需要走網(wǎng)絡(luò)通信。
這部分邏輯的詳細(xì)分析,參見 Flink 源碼之節(jié)點(diǎn)間通信。
數(shù)據(jù)傳遞給OperatorChain
這一段邏輯我們從StreamTaskNetworkInput的processElement方法開始分析。
StreamTask的processInput方法為處理數(shù)據(jù)邏輯的入口。這個(gè)方法調(diào)用了StreamOneInputProcessor的同名方法,命令StreamTaskNetworkInput一直循環(huán)不停的從InputGate中獲取數(shù)據(jù)。對(duì)于獲取到的數(shù)據(jù),需要先交給反序列化器,將二進(jìn)制數(shù)據(jù)反序列化為StreamRecord對(duì)象。接著交給processElement方法處理。
上面邏輯的分析請(qǐng)參見 Flink 源碼之節(jié)點(diǎn)間通信 讀取數(shù)據(jù)章節(jié)。
下面是processElement方法。該方法位于AbstractStreamTaskNetworkInput。參數(shù)中的output實(shí)際上就是StreamTaskNetworkOutput`對(duì)象。
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
// 首先判斷元素的類型,可能是數(shù)據(jù),watermark,延遲標(biāo)記或者是流狀態(tài)
if (recordOrMark.isRecord()) {
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(
recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(
recordOrMark.asStreamStatus(),
flattenedChannelIndices.get(lastChannel),
output);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
StreamTaskNetworkOutput接收反序列化處理過的數(shù)據(jù),發(fā)送給OperatorChain的第一個(gè)operator。
private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {
// 創(chuàng)建的時(shí)候傳入的是OperatorChain的mainOperator,即第一個(gè)operator
private final OneInputStreamOperator<IN, ?> operator;
private final WatermarkGauge watermarkGauge;
private final Counter numRecordsIn;
private StreamTaskNetworkOutput(
OneInputStreamOperator<IN, ?> operator,
StreamStatusMaintainer streamStatusMaintainer,
WatermarkGauge watermarkGauge,
Counter numRecordsIn) {
super(streamStatusMaintainer);
this.operator = checkNotNull(operator);
this.watermarkGauge = checkNotNull(watermarkGauge);
this.numRecordsIn = checkNotNull(numRecordsIn);
}
// 發(fā)送數(shù)據(jù)
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
// 調(diào)用processElement方法,處理數(shù)據(jù)
operator.processElement(record);
}
// 發(fā)送watermark
@Override
public void emitWatermark(Watermark watermark) throws Exception {
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
// 調(diào)用processWatermark方法,處理watermark
operator.processWatermark(watermark);
}
// 發(fā)送延遲標(biāo)記,被用于統(tǒng)計(jì)數(shù)據(jù)在整個(gè)Flink處理流程中的耗時(shí)
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
operator.processLatencyMarker(latencyMarker);
}
}
OperatorChain的邏輯在后續(xù)博客中單獨(dú)分析。
本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請(qǐng)注明出處。