Kafka3.x監(jiān)控

kafka從2012年11月加入Apache,轉(zhuǎn)眼已經(jīng)過去了十個(gè)年頭。
0.8版本,引入了副本機(jī)制,棄用了舊版Scala客戶端,誕生了消息格式v0
0.10版本,消息格式升至v1
0.11版本,新增了事務(wù)能力,消息格式升至v2
1.0版本,支持Java 9的支持
2.0版本,放棄了對(duì) Java 7 的支持,并刪除了之前已棄用的 Scala 生產(chǎn)者和消費(fèi)者。
2.1版本,Java 11 支持
2.8版本,使用自有的K Raft替換 zk
3.0版本,不再支持 Java 8 和 Scala 2.12,棄用消息格式 v0 和 v1
3.1版本,Apache Kafka 支持 Java 17

雖然現(xiàn)在大家對(duì)kafka的使用依然會(huì)選擇對(duì)zookeeper強(qiáng)依賴,但是如果要使用Raft協(xié)議,那么kafka-manager(后改名為CMAK)已經(jīng)無法兼容。如果想要對(duì)kafka運(yùn)維監(jiān)控,并且集成自有告警系統(tǒng),需要自研。

獲取主題和消費(fèi)者元數(shù)據(jù)

    /**
     * 獲取topic信息
     * @param adminClient kafkaAdminClient
     */
    private static void getAllTopics(AdminClient adminClient) throws ExecutionException, InterruptedException {
        Set<String> listTopicsName = adminClient.listTopics().names().get();
        //舊版為values()
        Map<String, KafkaFuture<TopicDescription>> map = adminClient.describeTopics(listTopicsName).topicNameValues();
        for (String topicName : map.keySet()) {
            TopicDescription topicDescription = map.get(topicName).get();
            for (TopicPartitionInfo partition : topicDescription.partitions()) {
                log.info("topicName:{}, partition:{}", topicName, partition.toString());
            }
        }
    }

    /**
     * 獲取group集合
     * @param adminClient kafkaAdminClient
     */
    private static Set<String> getAllGroups(AdminClient adminClient) throws ExecutionException, InterruptedException {
        ListConsumerGroupsResult groupList = adminClient.listConsumerGroups();
        ArrayList<ConsumerGroupListing> consumerGroupListings = (ArrayList<ConsumerGroupListing>)groupList.all().get();
        consumerGroupListings.forEach(x->log.info(x.groupId()));
        return consumerGroupListings.stream().map(x -> {
            log.info(x.groupId());
            return x.groupId();
        }).collect(Collectors.toSet());
    }

獲取消費(fèi)延遲Lag元數(shù)據(jù)

    private static void collectByGroup(String group) throws ExecutionException, InterruptedException {
        //獲取消費(fèi)者組元數(shù)據(jù)信息
        DescribeConsumerGroupsResult groupsResult = KafkaUtils.getAdminClient()
                .describeConsumerGroups(Collections.singleton(group));
        //獲取消費(fèi)者組描述
        ConsumerGroupDescription groupDescription = groupsResult.all().get().get(group);
        //ConsumerGroupState 打印消費(fèi)組狀態(tài)
        log.info("group name:{} state:{}", group, groupDescription.state().toString());
        List<TopicPartition> assignedTps = new ArrayList<>();
        List<PartitionAssignmentState> rowWithConsumer = new ArrayList<>();
        //獲取組內(nèi)成員描述
        Collection<MemberDescription> members = groupDescription.members();
        if (members != null) {
            //通過OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移ConsumerOffset
            ListConsumerGroupOffsetsResult offsetsResult = KafkaUtils.getAdminClient().listConsumerGroupOffsets(group);
            Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();
            if (offsets != null && !offsets.isEmpty()) {
                //判斷消費(fèi)者組的狀態(tài)是否正常 正常就打印lag信息
                if (groupDescription.state().toString().equals(ConsumerGroupState.STABLE.toString())) {
                    rowWithConsumer = getRowWithConsumer(groupDescription, offsets, members, assignedTps, group);
                }
            }
            //獲取沒有消費(fèi)者的消費(fèi)者組的lag信息
            if (offsets == null) {
                offsets = new HashMap<>();
            }
            List<PartitionAssignmentState> rowWithoutConsumer = getRowWithoutConsumer(groupDescription, offsets, assignedTps, group);
            if (!CollectionUtils.isEmpty(rowWithoutConsumer)) {
                rowWithConsumer.addAll(rowWithoutConsumer);
            }
        }
        rowWithConsumer.forEach(x-> log.info(x.toString()));
    }

    private static List<PartitionAssignmentState> getRowWithConsumer(ConsumerGroupDescription description,
                                                              Map<TopicPartition, OffsetAndMetadata> offsets,
                                                              Collection<MemberDescription> members,
                                                              List<TopicPartition> assignedTps, String group) {
        List<PartitionAssignmentState> rowWithConsumer = new ArrayList<>();
        //遍歷消費(fèi)者組內(nèi)成員
        for (MemberDescription member : members) {
            MemberAssignment assignment = member.assignment();
            if (assignment == null) {
                continue;
            }
            //獲取組內(nèi)所有主題分區(qū)
            Set<TopicPartition> tpSet = assignment.topicPartitions();
            if (tpSet.isEmpty()) {
                //如果消費(fèi)組沒有訂閱topic
                PartitionAssignmentState p = PartitionAssignmentState.builder()
                        .group(group)
                        .coordinator(description.coordinator())
                        .consumerId(member.consumerId())
                        .host(member.host())
                        .clientId(member.clientId())
                        .build();
                rowWithConsumer.add(p);
            } else {
                //如果消費(fèi)組訂閱了topic 獲取logEndOffsets(LEO)
                //如果未開啟事務(wù) 則lag = logEndOffsets(LEO) - offset
                //如果開啟了事務(wù) 則lag = logStableOffset(LSO) - offset 對(duì)應(yīng)ListOffsetRequest獲取LSO
                Map<TopicPartition, Long> logSizes = KafkaUtils.getConsumer().endOffsets(tpSet);
                assignedTps.addAll(tpSet);
                List<PartitionAssignmentState> tempList = tpSet.stream()
                        .sorted(Comparator.comparing(TopicPartition::partition))
                        .map(tp -> getPasWithConsumer(description, offsets, member, group, logSizes, tp))
                        .collect(Collectors.toList());
                rowWithConsumer.addAll(tempList);
            }
        }
        return rowWithConsumer;
    }

    private static PartitionAssignmentState getPasWithConsumer(ConsumerGroupDescription description,
                                                        Map<TopicPartition, OffsetAndMetadata> offsets,
                                                        MemberDescription member,
                                                        String group,
                                                        Map<TopicPartition, Long> logSizes,
                                                        TopicPartition tp) {
        long logSize = logSizes.get(tp);
        if (offsets.containsKey(tp)) {
            //獲取現(xiàn)在的消費(fèi)偏移量
            long offset = offsets.get(tp).offset();
            //消息總量-消費(fèi)量=消息積壓量
            long lag = logSize - offset < 0 ? 0 : logSize - offset;
            return PartitionAssignmentState.builder()
                    .group(group).coordinator(description.coordinator())
                    .lag(lag).topic(tp.topic()).partition(tp.partition()).offset(offset)
                    .consumerId(member.consumerId()).host(member.host())
                    .clientId(member.clientId()).logSize(logSize).build();
        } else {
            return PartitionAssignmentState.builder()
                    .group(group).coordinator(description.coordinator())
                    .topic(tp.topic()).partition(tp.partition())
                    .consumerId(member.consumerId()).host(member.host())
                    .clientId(member.clientId()).logSize(logSize).build();
        }
    }

    private static List<PartitionAssignmentState> getRowWithoutConsumer(ConsumerGroupDescription description,
                                                                 Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                 List<TopicPartition> assignedTps,
                                                                 String group) {
        Set<TopicPartition> tpSet = offsets.keySet();
        //過濾已分配消費(fèi)者的分區(qū)
        //獲取消息總大小logSize 和 已偏移量offset
        //按分區(qū)信息排序
        return tpSet.stream()
                .filter(tp->!assignedTps.contains(tp))
                .map(tp-> {
                    long logSize = 0;
                    Long endOffset = KafkaUtils.getConsumer().endOffsets(Collections.singleton(tp)).get(tp);
                    if (endOffset != null) {
                        logSize = endOffset;
                    }
                    long offset = 0;
                    if (offsets.get(tp) != null) {
                        offset = offsets.get(tp).offset();
                    }
                    return PartitionAssignmentState.builder()
                            .group(group)
                            .coordinator(description.coordinator())
                            .topic(tp.topic())
                            .partition(tp.partition())
                            .logSize(logSize)
                            .lag(logSize - offset < 0 ? 0 : logSize - offset)
                            .offset(offset).build();
                })
                .sorted(Comparator.comparing(PartitionAssignmentState::getPartition))
                .collect(Collectors.toList());
    }
}

Kafka自身提供的指標(biāo),需要開啟JMX

public static void main(String[] args) {
        MBeanServerConnection conn = init("192.168.1.201:9999");
        String objName = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
        String objAttr = "OneMinuteRate";
        try {
            ObjectName objectName = new ObjectName(objName);
            Object attribute = conn.getAttribute(objectName, objAttr);
            System.out.println(attribute.toString());
        } catch (MalformedObjectNameException | MBeanException | AttributeNotFoundException |
                 InstanceNotFoundException | ReflectionException | IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static MBeanServerConnection init(String ipAndPort) {
        String jmxUrl = "service:jmx:rmi:///jndi/rmi://" + ipAndPort+ "/jmxrmi";
        try {
            JMXServiceURL jmxServiceURL = new JMXServiceURL(jmxUrl);
            return JMXConnectorFactory.connect(jmxServiceURL, null).getMBeanServerConnection();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
kafka-Metrics-MessagesInPerSec.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容