淺談Flink對(duì)象重用(object reuse)

前言

今天是大年初一,祝各位虎年大吉大利~

近期受工作變動(dòng)影響,博客又荒廢了許久。今天難得有空,就前段時(shí)間內(nèi)部技術(shù)分享里提到的一個(gè)小知識(shí)點(diǎn)來(lái)寫幾筆。

對(duì)象重用(object reuse)在Flink文檔的Execution Configuration一節(jié)中并不起眼,并且關(guān)于它的說(shuō)明也語(yǔ)焉不詳,如下:

enableObjectReuse() / disableObjectReuse() By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

那么,"reuse"的具體操作是什么?為什么可能會(huì)造成bug?什么時(shí)候可以安全地啟用它呢?本文來(lái)簡(jiǎn)單聊一聊。

算子鏈與DataStream API對(duì)象重用

筆者之前講過(guò),算子鏈(operator chaining)是StreamGraph向JobGraph轉(zhuǎn)化過(guò)程中的主要優(yōu)化措施。經(jīng)過(guò)此優(yōu)化,所有chain在一起的sub-task都會(huì)在同一個(gè)TaskManager slot中執(zhí)行,能夠減少不必要的數(shù)據(jù)交換、序列化(注意這點(diǎn))和上下文切換,從而提高作業(yè)的執(zhí)行效率。

算子鏈內(nèi)部的簡(jiǎn)單示意圖如下。

但是,將chained operators連接在一起的ChainingOutput實(shí)際上有兩種,即ChainingOutputCopyingChainingOutput。查看OperatorChain類中對(duì)應(yīng)的代碼:

if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
    currentOperatorOutput = new ChainingOutput<>(operator, outputTag);
} else {
    TypeSerializer<IN> inSerializer =
            operatorConfig.getTypeSerializerIn1(userCodeClassloader);
    currentOperatorOutput = new CopyingChainingOutput<>(operator, inSerializer, outputTag);
}

也就是說(shuō),如果啟用了對(duì)象重用,構(gòu)造算子鏈時(shí)采用的是ChainingOutput,反之則是CopyingChainingOutput。它們唯一的不同點(diǎn)就是將StreamRecord推到下游算子時(shí)的處理方式,做個(gè)對(duì)比:

// ChainingOutput#pushToOperator()
protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        input.setKeyContextElement(castRecord);
        input.processElement(castRecord);
    } catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

// CopyingChainingOutput#pushToOperator()
protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator (and Serializer) expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        input.setKeyContextElement(copy);
        input.processElement(copy);
    } catch (ClassCastException e) {
        if (outputTag != null) {
            // Enrich error message
            ClassCastException replace =
                    new ClassCastException(
                            String.format(
                                    "%s. Failed to push OutputTag with id '%s' to operator. "
                                            + "This can occur when multiple OutputTags with different types "
                                            + "but identical names are being used.",
                                    e.getMessage(), outputTag.getId()));
            throw new ExceptionInChainedOperatorException(replace);
        } else {
            throw new ExceptionInChainedOperatorException(e);
        }
    } catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

可見,對(duì)象重用的本質(zhì)就是在算子鏈中的下游算子使用上游對(duì)象的淺拷貝。若關(guān)閉對(duì)象重用,則必須經(jīng)過(guò)一輪序列化和反序列化,相當(dāng)于深拷貝,所以就不能100%地發(fā)揮算子鏈的優(yōu)化效果。

但正所謂魚與熊掌不可兼得,若啟用了對(duì)象重用,那么我們的業(yè)務(wù)代碼中必然不能出現(xiàn)以下兩種情況,以免造成混亂:

  • 在下游修改上游發(fā)射的對(duì)象,或者上游存入其State中的對(duì)象;
  • 同一條流直接對(duì)接多個(gè)處理邏輯(如stream.map(new AFunc())的同時(shí)還有stream.map(new BFunc()))。

總之,在enableObjectReuse()之前,需要謹(jǐn)慎評(píng)估業(yè)務(wù)代碼是否會(huì)帶來(lái)副作用。社區(qū)大佬David Anderson曾在Stack Overflow上給出了一個(gè)簡(jiǎn)單明晰的回答,可參見這里。

Flink SQL中的對(duì)象重用

另一位社區(qū)大佬Nico Kruber曾經(jīng)寫過(guò)一篇名為<<A Journey to Beating Flink's SQL Performance>>的文章,其中說(shuō)啟用對(duì)象重用可以為Blink Planner帶來(lái)可觀的性能收益,并且還相當(dāng)安全。為什么?

我們知道,F(xiàn)link SQL的類型系統(tǒng)與DataStream Runtime原生的類型系統(tǒng)有一定區(qū)別,故某些基礎(chǔ)數(shù)據(jù)類型的序列化器的實(shí)現(xiàn)也有不同。以最常見的字符串類型為例,DataStream原生的StringSerializercopy()方法如下。

@Override
public String copy(String from) {
    return from;
}

可見是能夠利用String類型本身的不可變性(immutability)來(lái)避免真正的復(fù)制。所以,若DataStream API程序中的復(fù)雜數(shù)據(jù)類型越少,序列化成本就越低,打開對(duì)象重用的收益也就越小。前述的文章也說(shuō)明了這一點(diǎn)。

Flink SQL體系中的StringDataSerializer#copy()方法則完全不一樣,如下(實(shí)際上是BinaryStringData#copy())。

public BinaryStringData copy() {
    ensureMaterialized();
    byte[] copy =
            BinarySegmentUtils.copyToBytes(
                    binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
    return new BinaryStringData(
            new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
            0,
            binarySection.sizeInBytes,
            javaObject);
}

可見是要實(shí)打?qū)嵉貜?fù)制底層的MemorySegment,此時(shí)對(duì)象重用的優(yōu)點(diǎn)就很明顯了。

如何保證這邊不會(huì)有像DataStream API同樣的隱患?答案在(之前講過(guò)的)代碼生成階段。例如,在查詢維表的CommonExecLookupJoin執(zhí)行節(jié)點(diǎn)中,生成訪問(wèn)輸入字段的代碼時(shí),會(huì)判斷是否要強(qiáng)制深拷貝(當(dāng)允許對(duì)象重用時(shí),deepCopy就為true):

  def generateFieldAccess(
    ctx: CodeGeneratorContext,
    inputType: LogicalType,
    inputTerm: String,
    index: Int,
    deepCopy: Boolean): GeneratedExpression = {
    val expr = generateFieldAccess(ctx, inputType, inputTerm, index)
    if (deepCopy) {    // 
      expr.deepCopy(ctx)
    } else {
      expr
    }
  }

如果結(jié)果類型是可變(mutable)類型的話,就會(huì)生成新的拷貝代碼,防止出問(wèn)題。

def deepCopy(ctx: CodeGeneratorContext): GeneratedExpression = {
  // only copy when type is mutable
  if (TypeCheckUtils.isMutable(resultType)) {
    // if the type need copy, it must be a boxed type
    val typeTerm = boxedTypeTermForType(resultType)
    val serTerm = ctx.addReusableTypeSerializer(resultType)
    val newResultTerm = ctx.addReusableLocalVariable(typeTerm, "field")
    val newCode =
      s"""
         |$code
         |$newResultTerm = $resultTerm;
         |if (!$nullTerm) {
         |  $newResultTerm = ($typeTerm) ($serTerm.copy($newResultTerm));
         |}
      """.stripMargin
    GeneratedExpression(newResultTerm, nullTerm, newCode, resultType, literalValue)
  } else {
    this
  }
}

The End

邊看《開端》邊寫的這一篇,三心二意,有錯(cuò)誤請(qǐng)批評(píng)指正(

京東物流人工智能與大數(shù)據(jù)部持續(xù)招人中,各位有意年后換工作的大佬盡管丟簡(jiǎn)歷過(guò)來(lái),JDL歡迎你~

民那晚安(

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容