flink寫入kafka默認(rèn)采用的分區(qū)策略的代碼實(shí)現(xiàn)在FlinkFixedPartitioner這個(gè)類中,并不是我們理解的輪盤轉(zhuǎn)方式寫入下游分區(qū),而是每個(gè)并發(fā)固定的寫入到kafka的某個(gè)分區(qū),舉個(gè)例子:flink有3個(gè)sink并發(fā)寫入kafka,而kafka有10個(gè)分區(qū),那么數(shù)據(jù)只會(huì)寫入kafka的0-2號(hào)分區(qū)中,其他分區(qū)不會(huì)有數(shù)據(jù)。代碼的實(shí)現(xiàn)邏輯如下:
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
@Override
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(
parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
Preconditions.checkArgument(
parallelInstances > 0, "Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
return partitions[parallelInstanceId % partitions.length];
}
@Override
public boolean equals(Object o) {
return this == o || o instanceof FlinkFixedPartitioner;
}
@Override
public int hashCode() {
return FlinkFixedPartitioner.class.hashCode();
}
}
我們可以看到partitions[parallelInstanceId % partitions.length]這行代碼就是決定數(shù)據(jù)該寫入kafka哪個(gè)分區(qū)中,其中parallelInstanceId 是flink的sink并發(fā)數(shù)編號(hào),partitions.length是kafka的分區(qū)數(shù)。
在一般場(chǎng)景下,這種寫入分區(qū)策略不會(huì)有太多問題,但是如果下游kafka有多個(gè)flink寫入,舉個(gè)例子:a,b...f作業(yè)都同時(shí)把數(shù)據(jù)寫入到topic1中,每個(gè)flink并發(fā)度都是1,而topic1的分區(qū)數(shù)是10,這樣就會(huì)導(dǎo)致所有的flink作業(yè)都會(huì)把數(shù)據(jù)寫入到0分區(qū)中,1-9號(hào)分區(qū)沒有數(shù)據(jù),造成kafka的數(shù)據(jù)傾斜,這種情況下,只能我們自己自定義分區(qū)策略,我們可以簡(jiǎn)單的定義一個(gè)輪盤轉(zhuǎn)方式的分區(qū)策略:
public class FlinkRebalancePartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
private int nextPartitionToSendTo;
public FlinkRebalancePartitioner(){
}
@Override
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(
parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
Preconditions.checkArgument(
parallelInstances > 0, "Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
nextPartitionToSendTo = ThreadLocalRandom.current().nextInt(parallelInstances);
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
nextPartitionToSendTo = (nextPartitionToSendTo + 1) % partitions.length;
return partitions[nextPartitionToSendTo];
}
@Override
public boolean equals(Object o) {
return this == o || o instanceof FlinkRebalancePartitioner;
}
@Override
public int hashCode() {
return FlinkRebalancePartitioner.class.hashCode();
}
}
這種方式簡(jiǎn)單明了,參考的是flink的數(shù)據(jù)分區(qū)策略中RebalancePartitioner這個(gè)類的實(shí)現(xiàn)方式,數(shù)據(jù)就能均勻的寫入到下游kafka分區(qū)中去。