10. kafka消費(fèi)者如何分配分區(qū)

消費(fèi)者如何分配分區(qū)就是指某個topic,其N個分區(qū)和消費(fèi)該topic的若干消費(fèi)者群組下M個消費(fèi)者的關(guān)系。如下圖所示,C0和C1兩個消費(fèi)者如何分配N個分區(qū):


消費(fèi)者&分區(qū).png
  • 核心接口:org.apache.kafka.clients.consumer.internals.PartitionAssignor
  • 內(nèi)置策略:org.apache.kafka.clients.consumer.RangeAssignor和org.apache.kafka.clients.consumer.RoundRobinAssignor。
  • 默認(rèn)策略:org.apache.kafka.clients.consumer.RangeAssignor
  • 配置方式:在構(gòu)造KafkaConsumer時增加參數(shù)partition.assignment.strategy,值為內(nèi)置的兩種策略中的一種,或者是一個實(shí)現(xiàn)了PartitionAssignor接口的全類名。例如:
... ...
// 指定分區(qū)分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

range策略

  • 實(shí)現(xiàn)

org.apache.kafka.clients.consumer.RangeAssignor

  • 說明

range策略針對于每個topic,各個topic之間分配時沒有任何關(guān)聯(lián),分配步驟如下:

  1. topic下的所有有效分區(qū)平鋪,例如P0, P1, P2, P3... ...
  2. 消費(fèi)者按照字典排序,例如C0, C1, C2
  3. 分區(qū)數(shù)除以消費(fèi)者數(shù),得到n
  4. 分區(qū)數(shù)對消費(fèi)者數(shù)取余,得到m
  5. 消費(fèi)者集合中,前m個消費(fèi)者能夠分配到n+1個分區(qū),而剩余的消費(fèi)者只能分配到n個分區(qū)。

所以對于某個topic來說:
如果有5個分區(qū)(P0, P1, P2, P3, P4),且訂閱這個topic的消費(fèi)者組有2個消費(fèi)者(C0, C1)。那么P0, P1, P2將被C0消費(fèi),P3, P4將被C1消費(fèi)。
如果有4個分區(qū)(P0, P1, P2, P3),且訂閱這個topic的消費(fèi)者組有2個消費(fèi)者(C0, C1)。那么P0, P1將被C0消費(fèi),P2, P3將被C1消費(fèi)。

range策略如下圖所示:


range策略
  • 源碼注釋
The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.
For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
說明:兩個topic分區(qū)數(shù)無法整除消費(fèi)者數(shù),所以,第一個消費(fèi)者C0會多分配一個分區(qū)。所以C0消費(fèi)p0和p1兩個分區(qū),C1消費(fèi)p2分區(qū)。
  • 源碼

核心源碼如下:

// partitionsPerTopic表示topic和分區(qū)關(guān)系,key是topic,value是分區(qū)數(shù)量
// subscriptions表示訂閱關(guān)系,key是消費(fèi)者,value是訂閱的topic
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    // 得到topic和訂閱的消費(fèi)者集合信息,例如{t0:[c0, c1], t1:[C0, C1]}
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    // 保存topic分區(qū)和訂閱該topic的消費(fèi)者關(guān)系結(jié)果map
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        // memberId就是消費(fèi)者client.id+uuid(kafka在client.id上追加的)
        assignment.put(memberId, new ArrayList<TopicPartition>());

    // 遍歷每個topic和消費(fèi)者集合信息組成的map(由這個遍歷可知,range策略分配結(jié)果在各個topic之間互不影響)
    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        // topic名稱
        String topic = topicEntry.getKey();
        // topic的消費(fèi)者集合信息
        List<String> consumersForTopic = topicEntry.getValue();

        // 當(dāng)前topic的分區(qū)數(shù)量
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        // 如果當(dāng)天topic沒有分區(qū),那么繼續(xù)遍歷下一個topic
        if (numPartitionsForTopic == null)
            continue;

        // 消費(fèi)者集合根據(jù)字典排序
        Collections.sort(consumersForTopic);
        // 每個topic分區(qū)數(shù)量除以消費(fèi)者數(shù)量,得出每個消費(fèi)者分配到的分區(qū)數(shù)量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        // 無法整除的剩余分區(qū)數(shù)量
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
        // 根據(jù)topic名稱和分區(qū)數(shù)量,得到分區(qū)集合信息
        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        // 遍歷訂閱當(dāng)前topic的消費(fèi)者集合
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            // 分配到的分區(qū)的開始位置
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            // 分配到的分區(qū)數(shù)量(整除分配到的分區(qū)數(shù)量,加上1個無法整除分配到的分區(qū)--如果有資格分配到這個分區(qū)的話。判斷是否有資格分配到這個分區(qū):如果整除后余數(shù)為m,那么排序后的消費(fèi)者集合中前m個消費(fèi)者都能分配到一個額外的分區(qū))
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            // 給消費(fèi)者分配分區(qū)
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}
  • 總結(jié)

由上面的分析可知,range策略會把無法整除的剩余分區(qū),分配給前面幾個消費(fèi)者,而且每個topic都會如此。這樣的話,topic越多,前面幾個消費(fèi)者可能承受的壓力就越大。range的弊端還是非常明顯的。

roundrobin策略

  • 實(shí)現(xiàn)

org.apache.kafka.clients.consumer.RoundRobinAssignor

  • 說明

roundrobin策略針對于全局所有的topic和消費(fèi)者,分配步驟如下:

  1. 消費(fèi)者按照字典排序,例如C0, C1, C2... ...,并構(gòu)造環(huán)形迭代器。
  2. topic名稱按照字典排序,并得到每個topic的所有分區(qū),從而得到所有分區(qū)集合。
  3. 遍歷第2步所有分區(qū)集合,同時輪詢消費(fèi)者。
  4. 如果輪詢到的消費(fèi)者訂閱的topic不包括當(dāng)前遍歷的分區(qū)所屬topic,則跳過;否則分配給當(dāng)前消費(fèi)者,并繼續(xù)第3步。

所以對于某個topic來說:
如果有5個分區(qū)(P0, P1, P2, P3, P4),且訂閱這個topic的消費(fèi)者組有2個消費(fèi)者(C0, C1)。那么P0, P2, P4將被C0消費(fèi),P1, P3將被C1消費(fèi)。

roundrobin策略如下圖所示:


roundrobin策略

如圖所示:
3個Topic:T0(3個分區(qū)0, 1, 2), T1(兩個分區(qū)0, 1), T2(4個分區(qū)0, 1, 2, 3);
3個consumer: C0訂閱了[T0, T1], C1訂閱了[T1, T2], C2訂閱了[T2, T0];

roundrobin結(jié)果分配結(jié)果如下:
T0-P0分配給C0,T0-P1分配給C2,T0-P2分配給C0,
T1-P0分配給C1,T1-P1分配給C0,
T2-P0分配給C1,T2-P1分配給C2,T2-P2分配給C1,T0-P3分配給C2;

推算過程:
分區(qū)T0-P0,消費(fèi)者C0,C0訂閱了這個分區(qū)所在Topic即T0,所以T0-P0分配給C0;
輪詢到下一個分區(qū)T0-P1和下一個消費(fèi)者C1;
分區(qū)T0-P1,消費(fèi)者C1,C1沒有訂閱T0,取下一個消費(fèi)者C2,C2訂閱了T0,所以T0-P1分配給C2;
輪詢到下一個分區(qū)T0-P2和下一個消費(fèi)者C0;
分區(qū)T0-P2,消費(fèi)者C0,C0訂閱了T0,所以T0-P2分配給C0;
輪詢到下一個分區(qū)T1-P0和下一個消費(fèi)者C1;
分區(qū)T1-P0,消費(fèi)者C1,C1訂閱T1,所以T1-P0分配給C1;
以此類推即可。

  • 源碼注釋
The round robin assignor lays out all the available partitions and all the available consumers. It then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumers.) For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
When subscriptions differ across consumer instances, the assignment process still considers each consumer instance in round robin fashion but skips over an instance if it is not subscribed to the topic. Unlike the case when subscriptions are identical, this can result in imbalanced assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. Tha assignment will be:
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]

這段源碼注釋中,第一種情況比較好理解,第二種情況套用上面的分配步驟進(jìn)行推算,過程如下:

  1. 消費(fèi)者字典排序且構(gòu)造成環(huán)形隊(duì)列[C0, C1, C2];C0訂閱了[t0],C1訂閱了[t0, t1],C2訂閱了[t0, t1, t2];
  2. topic字段排序即[t0, t1, t2],t0只有一個分區(qū)p0,t1有兩個分區(qū)p0和p1,t2有三個分區(qū)p0,p1和p2。得到這三個topic下所有分區(qū)集合[t0p0, t1p0, t1p1, t2p0, t2p1, t2p2];
  3. 開始遍歷所有分區(qū)。
  4. 遍歷分區(qū)t0p0,同時消費(fèi)者為C0,C0訂閱了t0這個topic,所以分區(qū)t0p0分配給C0這個消費(fèi)者;
  5. 遍歷分區(qū)t1p0,同時消費(fèi)者為C1(每次消費(fèi)者都需要輪詢),C1訂閱了t1,所以分區(qū)t1p0分配給C1這個消費(fèi)者;
  6. 遍歷分區(qū)t1p1,同時消費(fèi)者為C2,C2訂閱了t1這個topic,所以分區(qū)t1p1分配給C1這個消費(fèi)者;
  7. 遍歷分區(qū)t2p0,同時消費(fèi)者為C0,C0沒有訂閱t1,輪詢到消費(fèi)者C1,C1也沒有訂閱t2,輪詢到C2,C2訂閱了t2這個topic,所以分區(qū)t2p0分配給C2這個消費(fèi)者;
  8. 遍歷分區(qū)t2p1,同時消費(fèi)者為C0,C0沒有訂閱t1,輪詢到消費(fèi)者C1,C1也沒有訂閱t2,輪詢到C2,C2訂閱了t2這個topic,所以分區(qū)t2p0分配給C2這個消費(fèi)者;
  9. 遍歷分區(qū)t2p2,同時消費(fèi)者為C0,C0沒有訂閱t1,輪詢到消費(fèi)者C1,C1也沒有訂閱t2,輪詢到C2,C2訂閱了t2這個topic,所以分區(qū)t2p0分配給C2這個消費(fèi)者;
  10. 遍歷完所有分區(qū),over。

over。

  • 源碼

核心源碼如下:

// partitionsPerTopic表示topic和分區(qū)關(guān)系,key是topic,value是分區(qū)數(shù)量
// subscriptions表示訂閱關(guān)系,key是消費(fèi)者,value是訂閱的topic信息
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    // 將消費(fèi)者集合先按照字典排序,再構(gòu)造成一個環(huán)形迭代器
    CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
    // 以topic名稱排序(SortedSet<String> topics = new TreeSet<>();TreeSet保存topic名稱從而實(shí)現(xiàn)排序),遍歷topic下的分區(qū),得到全部分區(qū)(分區(qū)主要信息包括topic名稱和分區(qū)編號)
    for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
        final String topic = partition.topic();
        // assigner.peek()得到最后一次遍歷的消費(fèi)者。如果遍歷的當(dāng)前分區(qū)所屬topic不在最后一次遍歷的消費(fèi)者訂閱的topic范圍內(nèi),那么從環(huán)形迭代器中輪詢選擇下一個消費(fèi)者,直到選擇的消費(fèi)者訂閱的topic集合包含當(dāng)前topic。
        while (!subscriptions.get(assigner.peek()).topics().contains(topic))
            assigner.next();
        // 給消費(fèi)者分配分區(qū),并輪詢到下一個消費(fèi)者
        assignment.get(assigner.next()).add(partition);
    }
    return assignment;
}
  • CircularIterator

CircularIterator環(huán)形迭代器的實(shí)現(xiàn)比較簡單,內(nèi)部用一個List<T>存儲數(shù)據(jù),next()迭代時稍作改造即可,這個環(huán)形迭代器的作用就是輪詢?nèi)≈?,上面的源碼是輪詢?nèi)∠M(fèi)者:

@Override
public T next() {
    // i初始值為0
    T next = list.get(i);
    // 每次取值后,i的值+1,由于是環(huán)形迭代器,為了讓i不超過List的最大下標(biāo),所以i對list.size()取模。
    i = (i + 1) % list.size();
    return next;
}

自定義(隨機(jī))策略

自定義實(shí)現(xiàn)非常簡單,自定義類AfeiAssignor實(shí)現(xiàn)抽象類AbstractPartitionAssignor即可,核心源碼如下:

/**
 * 自定義實(shí)現(xiàn)的隨機(jī)選擇消費(fèi)者分配器
 * @author wangzhenfei9
 * @version 1.0.0
 * @since 2018年07月10日
 */
public class AfeiAssignor extends AbstractPartitionAssignor {

    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics()) {
                put(res, topic, consumerId);
            }
        }
        return res;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        // 得到topic和訂閱該topic的消費(fèi)者集合(參考RangeAssignor中的consumersPerTopic()方法)
        Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList<>());
        }

        // 遍歷每個topic
        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            // 訂閱當(dāng)前topic的所有消費(fèi)者集合
            List<String> consumersForTopic = topicEntry.getValue();
            int consumerSize = consumersForTopic.size();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null) {
                continue;
            }

            // 當(dāng)前topic下所有分區(qū)
            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (TopicPartition partition:partitions){
                // 隨機(jī)選擇一個消費(fèi)者
                int rand = new Random().nextInt(consumerSize);
                // 得到隨機(jī)選擇的消費(fèi)者
                String selectedConsumer = consumersForTopic.get(rand);
                // 給選擇的消費(fèi)者分配當(dāng)前分區(qū)
                assignment.get(selectedConsumer).add(partition);
            }
        }
        System.out.println("分配結(jié)果: "+new Gson().toJson(assignment));
        return assignment;
    }


    @Override
    public String name() {
        return "afei";
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容