從一例狀態(tài)引發(fā)的性能問題談Flink狀態(tài)序列化

前言

好久不見(鞠躬

最近處在轉(zhuǎn)型期,每天忙到飛起,關(guān)注具體技術(shù)細節(jié)的精力自然就比較少了(上一篇許下的周更承諾也食言了 = =)。上周幫助他人快速解決了一個因誤用Flink狀態(tài)類型引發(fā)的性能問題,在這里做個quick notes,并簡要介紹一下Flink狀態(tài)序列化方面的基礎(chǔ)知識。

問題及排查

上游部門同事反饋,一個計算邏輯并不復雜的多流join DataStream API作業(yè)頻繁發(fā)生消費積壓、checkpoint失敗(現(xiàn)場截圖已丟失)。作業(yè)拓撲如下圖所示。

為了脫敏所以縮得很小 = =

按大狀態(tài)作業(yè)的pattern對集群參數(shù)進行調(diào)優(yōu),未果。

通過Flink Web UI定位到問題點位于拓撲中倒數(shù)第二個算子,部分sub-task checkpoint總是過不去。觀察Metrics面板,發(fā)現(xiàn)有少量數(shù)據(jù)傾斜,而上下游反壓度量值全部為0。

經(jīng)過持續(xù)觀察,存在傾斜的sub-task數(shù)據(jù)量最多只比其他sub-task多出10%~15%,按照常理不應引起如此嚴重的性能問題。遂找到對應的TaskManager pod打印火焰圖,結(jié)果如下。

可見RocksDB狀態(tài)讀寫的耗時極長,大部分時間花在了Kryo序列化上,說明狀態(tài)內(nèi)存儲了Flink序列化框架原生不支持的對象。直接讓相關(guān)研發(fā)同學show me the code,真相大白:

private transient MapState<String, HashSet<String>> state1;
private transient MapState<String, HashSet<String>> state2;
private transient ValueState<Map<String, String>> state3;

Flink序列化框架內(nèi)并沒有針對HashSet的序列化器,自然會fallback到Kryo。即使這些Set并不算大,狀態(tài)操作的開銷也會急劇上升。當然,ValueState<Map<String, String>>用法也是錯誤的,應改成MapState<String, String>。

最快的臨時解決方法很簡單:把所有狀態(tài)內(nèi)用到的HashSet全部改成Map<String, Boolean>,同樣可以去重。雖然并不優(yōu)雅,但因為有了原生MapSerializer支持,效率大幅提升。下面簡要介紹Flink的狀態(tài)序列化。

TypeSerializer

在我們創(chuàng)建狀態(tài)句柄所需的描述符StateDescriptor時,要指定狀態(tài)數(shù)據(jù)的類型,如:

ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("myState", Integer.class);
ValueState<Integer> state = this.getRuntimeContext().getState(stateDesc);

與此同時,也就指定了對應數(shù)據(jù)類型的Serializer。我們知道,TypeSerializer是Flink Runtime序列化機制的底層抽象,狀態(tài)數(shù)據(jù)的序列化也不例外。以處理Map類型的MapSerializer為例,代碼如下,比較清晰。

@Internal
public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {

    private static final long serialVersionUID = -6885593032367050078L;

    /** The serializer for the keys in the map */
    private final TypeSerializer<K> keySerializer;

    /** The serializer for the values in the map */
    private final TypeSerializer<V> valueSerializer;

    /**
     * Creates a map serializer that uses the given serializers to serialize the key-value pairs in
     * the map.
     *
     * @param keySerializer The serializer for the keys in the map
     * @param valueSerializer The serializer for the values in the map
     */
    public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
        this.keySerializer =
                Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
        this.valueSerializer =
                Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
    }

    // ------------------------------------------------------------------------
    //  MapSerializer specific properties
    // ------------------------------------------------------------------------

    public TypeSerializer<K> getKeySerializer() {
        return keySerializer;
    }

    public TypeSerializer<V> getValueSerializer() {
        return valueSerializer;
    }

    // ------------------------------------------------------------------------
    //  Type Serializer implementation
    // ------------------------------------------------------------------------

    @Override
    public boolean isImmutableType() {
        return false;
    }

    @Override
    public TypeSerializer<Map<K, V>> duplicate() {
        TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
        TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();

        return (duplicateKeySerializer == keySerializer)
                        && (duplicateValueSerializer == valueSerializer)
                ? this
                : new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
    }

    @Override
    public Map<K, V> createInstance() {
        return new HashMap<>();
    }

    @Override
    public Map<K, V> copy(Map<K, V> from) {
        Map<K, V> newMap = new HashMap<>(from.size());

        for (Map.Entry<K, V> entry : from.entrySet()) {
            K newKey = keySerializer.copy(entry.getKey());
            V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());

            newMap.put(newKey, newValue);
        }

        return newMap;
    }

    @Override
    public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) {
        return copy(from);
    }

    @Override
    public int getLength() {
        return -1; // var length
    }

    @Override
    public void serialize(Map<K, V> map, DataOutputView target) throws IOException {
        final int size = map.size();
        target.writeInt(size);

        for (Map.Entry<K, V> entry : map.entrySet()) {
            keySerializer.serialize(entry.getKey(), target);

            if (entry.getValue() == null) {
                target.writeBoolean(true);
            } else {
                target.writeBoolean(false);
                valueSerializer.serialize(entry.getValue(), target);
            }
        }
    }

    @Override
    public Map<K, V> deserialize(DataInputView source) throws IOException {
        final int size = source.readInt();

        final Map<K, V> map = new HashMap<>(size);
        for (int i = 0; i < size; ++i) {
            K key = keySerializer.deserialize(source);

            boolean isNull = source.readBoolean();
            V value = isNull ? null : valueSerializer.deserialize(source);

            map.put(key, value);
        }

        return map;
    }

    @Override
    public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) throws IOException {
        return deserialize(source);
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        final int size = source.readInt();
        target.writeInt(size);

        for (int i = 0; i < size; ++i) {
            keySerializer.copy(source, target);

            boolean isNull = source.readBoolean();
            target.writeBoolean(isNull);

            if (!isNull) {
                valueSerializer.copy(source, target);
            }
        }
    }

    @Override
    public boolean equals(Object obj) {
        return obj == this
                || (obj != null
                        && obj.getClass() == getClass()
                        && keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer())
                        && valueSerializer.equals(
                                ((MapSerializer<?, ?>) obj).getValueSerializer()));
    }

    @Override
    public int hashCode() {
        return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
    }

    // --------------------------------------------------------------------------------------------
    // Serializer configuration snapshotting
    // --------------------------------------------------------------------------------------------

    @Override
    public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
        return new MapSerializerSnapshot<>(this);
    }
}

總結(jié):

  • 序列化和反序列化本質(zhì)上都是對MemorySegment的操作,通過DataOutputView寫出二進制數(shù)據(jù),通過DataInputView讀入二進制數(shù)據(jù);
  • 對于復合數(shù)據(jù)類型,也應嵌套定義并調(diào)用內(nèi)部元素類型的TypeSerializer;
  • 必須要有對應的TypeSerializerSnapshot。該組件定義了TypeSerializer本身及其所包含的元數(shù)據(jù)(即state schema)的序列化方式,這些信息會存儲在快照中??梢?,通過TypeSerializerSnapshot可以判斷狀態(tài)恢復時數(shù)據(jù)的兼容性,是Flink實現(xiàn)state schema evolution特性的關(guān)鍵所在。

TypeSerializerSnapshot

TypeSerializerSnapshot接口有以下幾個重要的方法。注釋寫得很清晰,不再廢話了(實際是因為懶而且累 = =

    /**
     * Returns the version of the current snapshot's written binary format.
     *
     * @return the version of the current snapshot's written binary format.
     */
    int getCurrentVersion();

    /**
     * Writes the serializer snapshot to the provided {@link DataOutputView}. The current version of
     * the written serializer snapshot's binary format is specified by the {@link
     * #getCurrentVersion()} method.
     *
     * @param out the {@link DataOutputView} to write the snapshot to.
     * @throws IOException Thrown if the snapshot data could not be written.
     * @see #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)
     */
    void writeSnapshot(DataOutputView out) throws IOException;

    /**
     * Reads the serializer snapshot from the provided {@link DataInputView}. The version of the
     * binary format that the serializer snapshot was written with is provided. This version can be
     * used to determine how the serializer snapshot should be read.
     *
     * @param readVersion version of the serializer snapshot's written binary format
     * @param in the {@link DataInputView} to read the snapshot from.
     * @param userCodeClassLoader the user code classloader
     * @throws IOException Thrown if the snapshot data could be read or parsed.
     * @see #readVersionedSnapshot(DataInputView, ClassLoader)
     */
    void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
            throws IOException;

    /**
     * Recreates a serializer instance from this snapshot. The returned serializer can be safely
     * used to read data written by the prior serializer (i.e., the serializer that created this
     * snapshot).
     *
     * @return a serializer instance restored from this serializer snapshot.
     */
    TypeSerializer<T> restoreSerializer();

    /**
     * Checks a new serializer's compatibility to read data written by the prior serializer.
     *
     * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
     * format of the data in the checkpoint/savepoint is compatible for the format of the serializer
     * used by the program that restores the checkpoint/savepoint. The outcome can be that the
     * serialization format is compatible, that the program's serializer needs to reconfigure itself
     * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible),
     * that the format is outright incompatible, or that a migration needed. In the latter case, the
     * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring
     * program's serializer re-serializes the data, thus converting the format during the restore
     * operation.
     *
     * @param newSerializer the new serializer to check.
     * @return the serializer compatibility result.
     */
    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
            TypeSerializer<T> newSerializer);

特別注意,在狀態(tài)恢復時,state schema的兼容性判斷結(jié)果TypeSerializerSchemaCompatibility有4種:

  • COMPATIBLE_AS_IS:兼容,可以直接使用新Serializer;
  • COMPATIBLE_AFTER_MIGRATION:兼容,但需要用快照中的舊Serializer反序列化一遍數(shù)據(jù),再將數(shù)據(jù)用新Serializer重新序列化。最常見的場景如狀態(tài)POJO中增加或刪除字段,詳情可以參考PojoSerializerSnapshot類的相關(guān)代碼;
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:兼容,但需要將新Serializer重新配置之后再使用。此類場景不太常見,舉例如狀態(tài)POJO的類繼承關(guān)系發(fā)生變化;
  • INCOMPATIBLE:不兼容,無法恢復。例如,更改POJO中的一個簡單類型字段的type(e.g. String → Integer),由于負責處理簡單數(shù)據(jù)類型的SimpleTypeSerializerSnapshot不支持此類更改,就會拋出異常:
    @Override
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
            TypeSerializer<T> newSerializer) {

        return newSerializer.getClass() == serializerSupplier.get().getClass()
                ? TypeSerializerSchemaCompatibility.compatibleAsIs()
                : TypeSerializerSchemaCompatibility.incompatible();
    }

顯然,對于復合類型(如List、Map),需要先判斷外部容器Serializer的兼容性,再判斷嵌套Serializer的兼容性。詳情可以參考Flink內(nèi)部專門為此定義的CompositeTypeSerializerSnapshot抽象類,該類比較復雜,在此按下不表。

The End

在一些特殊的場景下,我們需要自定義Serializers來實現(xiàn)更好的狀態(tài)序列化(例如用RoaringBitmap代替Set在狀態(tài)中進行高效的去重),今天時間已經(jīng)很晚,暫時不給出具體實現(xiàn)了。關(guān)于自定義狀態(tài)序列化器的更多細節(jié),請看官參見官方文檔<<Custom Serialization for Managed State>>一章。

晚安晚安。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容