Flink(1.13) 中的分區(qū)器

前言

flink中有七大官方定義的分區(qū)器以及一個(gè)用于自定義的分區(qū)器(共八個(gè))。

org.apache.flink.streaming.runtime.partitioner.StreamPartitioner 是所有分區(qū)器的父類,是一個(gè)抽象類

@Internal
public abstract class StreamPartitioner<T>
        implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
  • ChannelSelector
public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * Initializes the channel selector with the number of output channels.
     *
     * @param numberOfChannels the total number of output channels which are attached to respective
     *     output gate.
     */
    void setup(int numberOfChannels);

    /**
     * Returns the logical channel index, to which the given record should be written. It is illegal
     * to call this method for broadcast channel selectors and this method can remain not
     * implemented in that case (for example by throwing {@link UnsupportedOperationException}).
     *
     * @param record the record to determine the output channels for.
     * @return an integer number which indicates the index of the output channel through which the
     *     record shall be forwarded.
     */
    int selectChannel(T record);

    /**
     * Returns whether the channel selector always selects all the output channels.
     *
     * @return true if the selector is for broadcast mode.
     */
    boolean isBroadcast();
}
  • 底層實(shí)現(xiàn)類:
  1. RebalancePartitioner (org.apache.flink.streaming.runtime.partitioner)
  2. RescalePartitioner (org.apache.flink.streaming.runtime.partitioner)
  3. KeyGroupStreamPartitioner (org.apache.flink.streaming.runtime.partitioner)
  4. GlobalPartitioner (org.apache.flink.streaming.runtime.partitioner)
  5. ShufflePartitioner (org.apache.flink.streaming.runtime.partitioner)
  6. ForwardPartitioner (org.apache.flink.streaming.runtime.partitioner)
  7. CustomPartitionerWrapper (org.apache.flink.streaming.runtime.partitioner)
  8. BroadcastPartitioner (org.apache.flink.streaming.runtime.partitioner)

對(duì)流重新分區(qū)的幾個(gè)算子

  • KeyBy
    先按照key分組, 按照key的雙重hash來(lái)選擇后面的分區(qū)
    分區(qū)器:KeyGroupStreamPartitioner

  • shuffle
    對(duì)流中的元素隨機(jī)分區(qū)
    分區(qū)器:ShufflePartitioner

  • rebalance
    對(duì)流中的元素平均分布到每個(gè)區(qū).當(dāng)處理傾斜數(shù)據(jù)的時(shí)候, 進(jìn)行性能優(yōu)化。

1:1

若并發(fā)度一樣時(shí),就是1:1,只有上游并行度<下游并行度時(shí),會(huì)出現(xiàn)輪詢

rebalance

程序

    @Test
    public void rebalance() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 全局設(shè)置并發(fā)度
        env.setParallelism(3);

        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        source.print("print>>>");

        // 進(jìn)行輪詢
        source.rebalance().print("rebalance>>>");

        System.out.println("-----------------------");

        env.execute();

    }

結(jié)果

print>>>:3> zhangsan
rebalance>>>:2> zhangsan

print>>>:1> lisi
rebalance>>>:3> lisi

print>>>:2> wangwu
rebalance>>>:1> wangwu

print>>>:3> zhaoliu
rebalance>>>:2> zhaoliu

print>>>:1> sunqi
rebalance>>>:3> sunqi

print>>>:2> tianba
rebalance>>>:1> tianba

print>>>:3> zhaosanfeng
rebalance>>>:2> zhaosanfeng

print>>>:1> abcd
rebalance>>>:3> abcd

print>>>:2> wangfefe
rebalance>>>:1> wangfefe

分區(qū)器:RebalancePartitioner

  • rescale
    同 rebalance一樣, 也是平均循環(huán)的分布數(shù)據(jù). 但是要比rebalance更高效, 因?yàn)閞escale不需要通過(guò)網(wǎng)絡(luò), 完全走的"管道"

如何理解 rescale 可以減少網(wǎng)絡(luò)傳輸?

rebalance 的輪詢方式


rebalance 的輪詢方式

假設(shè)上游并行度為2,下游并行度為4,總發(fā)送數(shù)則是2*4=8次。


rescale的輪詢方式


上游2,下游4
上游4,下游2

他們之間會(huì)進(jìn)行分組,每人負(fù)責(zé)其中一部分。


源碼解釋

通過(guò)在輸出通道中循環(huán)來(lái)平均分配數(shù)據(jù)的分區(qū)器 
Partitioner that distributes the data equally by cycling through the output channels. 

這僅分發(fā)到下游節(jié)點(diǎn)的子集,因?yàn)?{@link org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} 
在遇到 {@code RescalePartitioner} 時(shí)會(huì)實(shí)例化 {@link DistributionPatternPOINTWISE} 分發(fā)模式
This distributes only to a subset of downstream nodes because 
{@linkorg.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} instantiates a {@link
DistributionPattern#POINTWISE} distribution pattern when encountering {@code RescalePartitioner}.

上游操作向其發(fā)送元素的下游操作子集取決于上游和下游操作的并行度
<p>The subset of downstream operations to which the upstream operation sends elements depends on
the degree of parallelism of both the upstream and downstream operation.

例如,如果上游操作的并行度為 2,下游操作的并行度為 4,那么一個(gè)上游操作會(huì)將元素分發(fā)給兩個(gè)下游操作,而另一個(gè)上游操作將分發(fā)給其他兩個(gè)下游操作。
For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one
upstream operation would distribute elements to two downstream operations while the other
upstream operation would distribute to the other two downstream operations. 

另一方面,如果下游操作的并行度為 2,而上游操作的并行度為 4,那么兩個(gè)上游操作將分配給一個(gè)下游操作,而其他兩個(gè)上游操作將分配給其他下游操作。
If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4
then two upstream operations will distribute to one downstream operation while the other two
upstream operations will distribute to the other downstream operations.

在不同并行度不是彼此的倍數(shù)的情況下,一個(gè)或多個(gè)下游操作將具有來(lái)自上游操作的不同數(shù)量的輸入。
<p>In cases where the different parallelisms are not multiples of each other one or several
downstream operations will have a differing number of inputs from upstream operations.

總結(jié):rescale 性能優(yōu)于 rebalance

業(yè)務(wù)場(chǎng)景:解決某些場(chǎng)景下的數(shù)據(jù)傾斜問(wèn)題(數(shù)據(jù)來(lái)源就是傾斜的)。

分區(qū)器:RescalePartitioner

RebalancePartitioner

    // 默認(rèn)為0
    private int nextChannelToSendTo;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

numberOfChannels :下游并行度,假設(shè)為3

第一次運(yùn)行:nextChannelToSendTo = 1

nextChannelToSendTo = (0 + 1) % 3;

第二次運(yùn)行:nextChannelToSendTo = 2

nextChannelToSendTo = (1 + 1) % 3;

第三次運(yùn)行:nextChannelToSendTo = 0

nextChannelToSendTo = (2+ 1) % 3;

第四次運(yùn)行:nextChannelToSendTo = 1

nextChannelToSendTo = (0+ 1) % 3;

RescalePartitioner

    private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

numberOfChannels :下游并行度,假設(shè)為3

分區(qū)方式:
0,1,2,0...

KeyGroupStreamPartitioner

核心邏輯

new KeyGroupStreamPartitioner<>(keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not extract key from " + record.getInstance().getValue(), e);
        }
        /**
         *  key:按什么分組的值
         *  maxParallelism:最大支持的并行度 1<<7 =128(系統(tǒng)定義的)
         *  numberOfChannels:下游并行度,自己定義的。
         */
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(
                key, maxParallelism, numberOfChannels);
    }

第一次hash

    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        // 'key.hashCode()' 對(duì)key進(jìn)行一次hash,
        // maxParallelism 最大并行度 1<<7 =128
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

第二次hash

    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        //  keyHash : key.hashCode() hash 結(jié)果
        //  murmurHash:使用  murmur 使數(shù)據(jù)更散列。
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

總結(jié):對(duì)key進(jìn)行兩次hash% 128

/**
  * maxParallelism:最大并行度 1<<7=128
  * parallelism:下游并行度
  * keyGroupId :對(duì)key進(jìn)行兩次`hash`% maxParallelism 的結(jié)果
  */
  public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }

案例演示

    @Test
    public void a(){
        String key = "a";
        int maxParallelism = 1 << 7;
        int parallelism = 16;

        int keyGroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism;

        int r = keyGroupId * parallelism / maxParallelism;

        System.out.println(r);
    }

結(jié)果

10

通過(guò)上面程序,就能將key具體分配到某個(gè)slot中執(zhí)行。下游的并行度是可以通過(guò).setParallelism()進(jìn)行設(shè)置。

GlobalPartitioner

ShufflePartitioner

shffle分區(qū)的邏輯比較簡(jiǎn)單

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }

numberOfChannels : 下游并行度,若并行度為12,那么分發(fā)范圍就是1~12

ForwardPartitioner

CustomPartitionerWrapper

BroadcastPartitioner

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容