flink寫入kafka分區(qū)策略

flink寫入kafka默認(rèn)采用的分區(qū)策略的代碼實(shí)現(xiàn)在FlinkFixedPartitioner這個(gè)類中,并不是我們理解的輪盤轉(zhuǎn)方式寫入下游分區(qū),而是每個(gè)并發(fā)固定的寫入到kafka的某個(gè)分區(qū),舉個(gè)例子:flink有3個(gè)sink并發(fā)寫入kafka,而kafka有10個(gè)分區(qū),那么數(shù)據(jù)只會(huì)寫入kafka的0-2號(hào)分區(qū)中,其他分區(qū)不會(huì)有數(shù)據(jù)。代碼的實(shí)現(xiàn)邏輯如下:

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {

    private static final long serialVersionUID = -3785320239953858777L;

    private int parallelInstanceId;

    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(
                parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(
                parallelInstances > 0, "Number of subtasks must be larger than 0.");

        this.parallelInstanceId = parallelInstanceId;
    }

    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(
                partitions != null && partitions.length > 0,
                "Partitions of the target topic is empty.");

        return partitions[parallelInstanceId % partitions.length];
    }

    @Override
    public boolean equals(Object o) {
        return this == o || o instanceof FlinkFixedPartitioner;
    }

    @Override
    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

我們可以看到partitions[parallelInstanceId % partitions.length]這行代碼就是決定數(shù)據(jù)該寫入kafka哪個(gè)分區(qū)中,其中parallelInstanceId 是flink的sink并發(fā)數(shù)編號(hào),partitions.length是kafka的分區(qū)數(shù)。
在一般場(chǎng)景下,這種寫入分區(qū)策略不會(huì)有太多問題,但是如果下游kafka有多個(gè)flink寫入,舉個(gè)例子:a,b...f作業(yè)都同時(shí)把數(shù)據(jù)寫入到topic1中,每個(gè)flink并發(fā)度都是1,而topic1的分區(qū)數(shù)是10,這樣就會(huì)導(dǎo)致所有的flink作業(yè)都會(huì)把數(shù)據(jù)寫入到0分區(qū)中,1-9號(hào)分區(qū)沒有數(shù)據(jù),造成kafka的數(shù)據(jù)傾斜,這種情況下,只能我們自己自定義分區(qū)策略,我們可以簡(jiǎn)單的定義一個(gè)輪盤轉(zhuǎn)方式的分區(qū)策略:

public class FlinkRebalancePartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;
    private int nextPartitionToSendTo;

    public FlinkRebalancePartitioner(){

    }

    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(
                parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(
                parallelInstances > 0, "Number of subtasks must be larger than 0.");

        this.parallelInstanceId = parallelInstanceId;
        nextPartitionToSendTo = ThreadLocalRandom.current().nextInt(parallelInstances);
    }

    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        nextPartitionToSendTo = (nextPartitionToSendTo + 1) % partitions.length;
        return partitions[nextPartitionToSendTo];
    }

    @Override
    public boolean equals(Object o) {
        return this == o || o instanceof FlinkRebalancePartitioner;
    }

    @Override
    public int hashCode() {
        return FlinkRebalancePartitioner.class.hashCode();
    }
}

這種方式簡(jiǎn)單明了,參考的是flink的數(shù)據(jù)分區(qū)策略中RebalancePartitioner這個(gè)類的實(shí)現(xiàn)方式,數(shù)據(jù)就能均勻的寫入到下游kafka分區(qū)中去。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容