Flink的八種分區(qū)策略源碼解讀

Flink包含8中分區(qū)策略,這8中分區(qū)策略(分區(qū)器)分別如下面所示,本文將從源碼的角度一一解讀每個(gè)分區(qū)器的實(shí)現(xiàn)方式。

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

繼承關(guān)系圖

接口

名稱

ChannelSelector

實(shí)現(xiàn)

public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * 初始化channels數(shù)量,channel可以理解為下游Operator的某個(gè)實(shí)例(并行算子的某個(gè)subtask).
     */
    void setup(int numberOfChannels);

    /**
     *根據(jù)當(dāng)前的record以及Channel總數(shù),
     *決定應(yīng)將record發(fā)送到下游哪個(gè)Channel。
     *不同的分區(qū)策略會(huì)實(shí)現(xiàn)不同的該方法。
     */
    int selectChannel(T record);

    /**
    *是否以廣播的形式發(fā)送到下游所有的算子實(shí)例
     */
    boolean isBroadcast();
}

抽象類

名稱

StreamPartitioner

實(shí)現(xiàn)

public abstract class StreamPartitioner<T> implements
        ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
    private static final long serialVersionUID = 1L;

    protected int numberOfChannels;

    @Override
    public void setup(int numberOfChannels) {
        this.numberOfChannels = numberOfChannels;
    }

    @Override
    public boolean isBroadcast() {
        return false;
    }

    public abstract StreamPartitioner<T> copy();
}

繼承關(guān)系圖

GlobalPartitioner

簡(jiǎn)介

該分區(qū)器會(huì)將所有的數(shù)據(jù)都發(fā)送到下游的某個(gè)算子實(shí)例(subtask id = 0)

源碼解讀

/**
 * 發(fā)送所有的數(shù)據(jù)到下游算子的第一個(gè)task(ID = 0)
 * @param <T>
 */
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //只返回0,即只發(fā)送給下游算子的第一個(gè)task
        return 0;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "GLOBAL";
    }
}

圖解

ShufflePartitioner

簡(jiǎn)介

隨機(jī)選擇一個(gè)下游算子實(shí)例進(jìn)行發(fā)送

源碼解讀

/**
 * 隨機(jī)的選擇一個(gè)channel進(jìn)行發(fā)送
 * @param <T>
 */
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //產(chǎn)生[0,numberOfChannels)偽隨機(jī)數(shù),隨機(jī)發(fā)送到下游的某個(gè)task
        return random.nextInt(numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return new ShufflePartitioner<T>();
    }

    @Override
    public String toString() {
        return "SHUFFLE";
    }
}

圖解

BroadcastPartitioner

簡(jiǎn)介

發(fā)送到下游所有的算子實(shí)例

源碼解讀

/**
 * 發(fā)送到所有的channel
 */
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    /**
     * Broadcast模式是直接發(fā)送到下游的所有task,所以不需要通過下面的方法選擇發(fā)送的通道
     */
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "BROADCAST";
    }
}

圖解

RebalancePartitioner

簡(jiǎn)介

通過循環(huán)的方式依次發(fā)送到下游的task

源碼解讀

/**
 *通過循環(huán)的方式依次發(fā)送到下游的task
 * @param <T>
 */
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        //初始化channel的id,返回[0,numberOfChannels)的偽隨機(jī)數(shù)
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //循環(huán)依次發(fā)送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實(shí)例個(gè)數(shù),并行度)值為2
        //則第一次發(fā)送到ID = 1的task,第二次發(fā)送到ID = 0的task,第三次發(fā)送到ID = 1的task上...依次類推
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "REBALANCE";
    }
}

圖解

RescalePartitioner

簡(jiǎn)介

基于上下游Operator的并行度,將記錄以循環(huán)的方式輸出到下游Operator的每個(gè)實(shí)例。
舉例: 上游并行度是2,下游是4,則上游一個(gè)并行度以循環(huán)的方式將記錄輸出到下游的兩個(gè)并行度上;上游另一個(gè)并行度以循環(huán)的方式將記錄輸出到下游另兩個(gè)并行度上。
若上游并行度是4,下游并行度是2,則上游兩個(gè)并行度將記錄輸出到下游一個(gè)并行度上;上游另兩個(gè)并行度將記錄輸出到下游另一個(gè)并行度上。

源碼解讀

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo = -1;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "RESCALE";
    }
}

圖解

尖叫提示

Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。

StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。

JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。

ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。

而StreamingJobGraphGenerator就是StreamGraph轉(zhuǎn)換為JobGraph。在這個(gè)類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,

               // 上游算子(生產(chǎn)端)的實(shí)例(subtask)連接下游算子(消費(fèi)端)的一個(gè)或者多個(gè)實(shí)例(subtask)
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 上游算子(生產(chǎn)端)的實(shí)例(subtask)連接下游算子(消費(fèi)端)的所有實(shí)例(subtask)
                DistributionPattern.ALL_TO_ALL,
                resultPartitionType);
        }

ForwardPartitioner

簡(jiǎn)介

發(fā)送到下游對(duì)應(yīng)的第一個(gè)task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關(guān)系

源碼解讀

/**
 * 發(fā)送到下游對(duì)應(yīng)的第一個(gè)task
 * @param <T>
 */
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "FORWARD";
    }
}

圖解

尖叫提示

在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對(duì)于ForwardPartitioner,必須保證上下游算子并行度一致,否則會(huì)拋出異常

//在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }

            if (partitioner instanceof ForwardPartitioner) {
                //如果上下游的并行度不一致,會(huì)拋出異常
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow " +
                        "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                        ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                        " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

KeyGroupStreamPartitioner

簡(jiǎn)介

根據(jù)key的分組索引選擇發(fā)送到相對(duì)應(yīng)的下游subtask

源碼解讀

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/**
 * 根據(jù)key的分組索引選擇發(fā)送到相對(duì)應(yīng)的下游subtask
 * @param <T>
 * @param <K>
 */
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        //調(diào)用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
...
}

  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
...

    /**
     * 根據(jù)key分配一個(gè)并行算子實(shí)例的索引,該索引即為該key要發(fā)送的下游算子實(shí)例的路由信息,
     * 即該key發(fā)送到哪一個(gè)task
     */
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     *根據(jù)key分配一個(gè)分組id(keyGroupId)
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        //獲取key的hashcode
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
     
    /**
     * 根據(jù)key分配一個(gè)分組id(keyGroupId),
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {

        //與maxParallelism取余,獲取keyGroupId
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    //計(jì)算分區(qū)index,即該key group應(yīng)該發(fā)送到下游的哪一個(gè)算子實(shí)例
    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }
...


圖解

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-uAnrg1Pe-1585574080128)(F:\npm\mywebsite\source_posts\Flink的八種分區(qū)策略源碼解讀\key.png)]

CustomPartitionerWrapper

簡(jiǎn)介

通過Partitioner實(shí)例的partition方法(自定義的)將記錄輸出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;

    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
//實(shí)現(xiàn)Partitioner接口,重寫partition方法
        return partitioner.partition(key, numberOfChannels);
    }

    @Override
    public StreamPartitioner<T> copy() {
        return this;
    }

    @Override
    public String toString() {
        return "CUSTOM";
    }
}

比如:

public class CustomPartitioner implements Partitioner<String> {
      // key: 根據(jù)key的值來分區(qū)
      // numPartitions: 下游算子并行度
      @Override
      public int partition(String key, int numPartitions) {
         return key.length() % numPartitions;//在此處定義分區(qū)策略
      }
  }

小結(jié)

本文主要從源碼層面對(duì)Flink的8中分區(qū)策略進(jìn)行了一一分析,并對(duì)每一種分區(qū)策略給出了相對(duì)應(yīng)的圖示,方便快速理解源碼。如果你覺得本文對(duì)你有用,可以關(guān)注我的公眾號(hào),了解更多精彩內(nèi)容。微信搜索大數(shù)據(jù)技術(shù)與數(shù)倉。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉』,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容