kafka從2012年11月加入Apache,轉(zhuǎn)眼已經(jīng)過去了十個(gè)年頭。
0.8版本,引入了副本機(jī)制,棄用了舊版Scala客戶端,誕生了消息格式v0
0.10版本,消息格式升至v1
0.11版本,新增了事務(wù)能力,消息格式升至v2
1.0版本,支持Java 9的支持
2.0版本,放棄了對(duì) Java 7 的支持,并刪除了之前已棄用的 Scala 生產(chǎn)者和消費(fèi)者。
2.1版本,Java 11 支持
2.8版本,使用自有的K Raft替換 zk
3.0版本,不再支持 Java 8 和 Scala 2.12,棄用消息格式 v0 和 v1
3.1版本,Apache Kafka 支持 Java 17
雖然現(xiàn)在大家對(duì)kafka的使用依然會(huì)選擇對(duì)zookeeper強(qiáng)依賴,但是如果要使用Raft協(xié)議,那么kafka-manager(后改名為CMAK)已經(jīng)無法兼容。如果想要對(duì)kafka運(yùn)維監(jiān)控,并且集成自有告警系統(tǒng),需要自研。
獲取主題和消費(fèi)者元數(shù)據(jù)
/**
* 獲取topic信息
* @param adminClient kafkaAdminClient
*/
private static void getAllTopics(AdminClient adminClient) throws ExecutionException, InterruptedException {
Set<String> listTopicsName = adminClient.listTopics().names().get();
//舊版為values()
Map<String, KafkaFuture<TopicDescription>> map = adminClient.describeTopics(listTopicsName).topicNameValues();
for (String topicName : map.keySet()) {
TopicDescription topicDescription = map.get(topicName).get();
for (TopicPartitionInfo partition : topicDescription.partitions()) {
log.info("topicName:{}, partition:{}", topicName, partition.toString());
}
}
}
/**
* 獲取group集合
* @param adminClient kafkaAdminClient
*/
private static Set<String> getAllGroups(AdminClient adminClient) throws ExecutionException, InterruptedException {
ListConsumerGroupsResult groupList = adminClient.listConsumerGroups();
ArrayList<ConsumerGroupListing> consumerGroupListings = (ArrayList<ConsumerGroupListing>)groupList.all().get();
consumerGroupListings.forEach(x->log.info(x.groupId()));
return consumerGroupListings.stream().map(x -> {
log.info(x.groupId());
return x.groupId();
}).collect(Collectors.toSet());
}
獲取消費(fèi)延遲Lag元數(shù)據(jù)
private static void collectByGroup(String group) throws ExecutionException, InterruptedException {
//獲取消費(fèi)者組元數(shù)據(jù)信息
DescribeConsumerGroupsResult groupsResult = KafkaUtils.getAdminClient()
.describeConsumerGroups(Collections.singleton(group));
//獲取消費(fèi)者組描述
ConsumerGroupDescription groupDescription = groupsResult.all().get().get(group);
//ConsumerGroupState 打印消費(fèi)組狀態(tài)
log.info("group name:{} state:{}", group, groupDescription.state().toString());
List<TopicPartition> assignedTps = new ArrayList<>();
List<PartitionAssignmentState> rowWithConsumer = new ArrayList<>();
//獲取組內(nèi)成員描述
Collection<MemberDescription> members = groupDescription.members();
if (members != null) {
//通過OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移ConsumerOffset
ListConsumerGroupOffsetsResult offsetsResult = KafkaUtils.getAdminClient().listConsumerGroupOffsets(group);
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();
if (offsets != null && !offsets.isEmpty()) {
//判斷消費(fèi)者組的狀態(tài)是否正常 正常就打印lag信息
if (groupDescription.state().toString().equals(ConsumerGroupState.STABLE.toString())) {
rowWithConsumer = getRowWithConsumer(groupDescription, offsets, members, assignedTps, group);
}
}
//獲取沒有消費(fèi)者的消費(fèi)者組的lag信息
if (offsets == null) {
offsets = new HashMap<>();
}
List<PartitionAssignmentState> rowWithoutConsumer = getRowWithoutConsumer(groupDescription, offsets, assignedTps, group);
if (!CollectionUtils.isEmpty(rowWithoutConsumer)) {
rowWithConsumer.addAll(rowWithoutConsumer);
}
}
rowWithConsumer.forEach(x-> log.info(x.toString()));
}
private static List<PartitionAssignmentState> getRowWithConsumer(ConsumerGroupDescription description,
Map<TopicPartition, OffsetAndMetadata> offsets,
Collection<MemberDescription> members,
List<TopicPartition> assignedTps, String group) {
List<PartitionAssignmentState> rowWithConsumer = new ArrayList<>();
//遍歷消費(fèi)者組內(nèi)成員
for (MemberDescription member : members) {
MemberAssignment assignment = member.assignment();
if (assignment == null) {
continue;
}
//獲取組內(nèi)所有主題分區(qū)
Set<TopicPartition> tpSet = assignment.topicPartitions();
if (tpSet.isEmpty()) {
//如果消費(fèi)組沒有訂閱topic
PartitionAssignmentState p = PartitionAssignmentState.builder()
.group(group)
.coordinator(description.coordinator())
.consumerId(member.consumerId())
.host(member.host())
.clientId(member.clientId())
.build();
rowWithConsumer.add(p);
} else {
//如果消費(fèi)組訂閱了topic 獲取logEndOffsets(LEO)
//如果未開啟事務(wù) 則lag = logEndOffsets(LEO) - offset
//如果開啟了事務(wù) 則lag = logStableOffset(LSO) - offset 對(duì)應(yīng)ListOffsetRequest獲取LSO
Map<TopicPartition, Long> logSizes = KafkaUtils.getConsumer().endOffsets(tpSet);
assignedTps.addAll(tpSet);
List<PartitionAssignmentState> tempList = tpSet.stream()
.sorted(Comparator.comparing(TopicPartition::partition))
.map(tp -> getPasWithConsumer(description, offsets, member, group, logSizes, tp))
.collect(Collectors.toList());
rowWithConsumer.addAll(tempList);
}
}
return rowWithConsumer;
}
private static PartitionAssignmentState getPasWithConsumer(ConsumerGroupDescription description,
Map<TopicPartition, OffsetAndMetadata> offsets,
MemberDescription member,
String group,
Map<TopicPartition, Long> logSizes,
TopicPartition tp) {
long logSize = logSizes.get(tp);
if (offsets.containsKey(tp)) {
//獲取現(xiàn)在的消費(fèi)偏移量
long offset = offsets.get(tp).offset();
//消息總量-消費(fèi)量=消息積壓量
long lag = logSize - offset < 0 ? 0 : logSize - offset;
return PartitionAssignmentState.builder()
.group(group).coordinator(description.coordinator())
.lag(lag).topic(tp.topic()).partition(tp.partition()).offset(offset)
.consumerId(member.consumerId()).host(member.host())
.clientId(member.clientId()).logSize(logSize).build();
} else {
return PartitionAssignmentState.builder()
.group(group).coordinator(description.coordinator())
.topic(tp.topic()).partition(tp.partition())
.consumerId(member.consumerId()).host(member.host())
.clientId(member.clientId()).logSize(logSize).build();
}
}
private static List<PartitionAssignmentState> getRowWithoutConsumer(ConsumerGroupDescription description,
Map<TopicPartition, OffsetAndMetadata> offsets,
List<TopicPartition> assignedTps,
String group) {
Set<TopicPartition> tpSet = offsets.keySet();
//過濾已分配消費(fèi)者的分區(qū)
//獲取消息總大小logSize 和 已偏移量offset
//按分區(qū)信息排序
return tpSet.stream()
.filter(tp->!assignedTps.contains(tp))
.map(tp-> {
long logSize = 0;
Long endOffset = KafkaUtils.getConsumer().endOffsets(Collections.singleton(tp)).get(tp);
if (endOffset != null) {
logSize = endOffset;
}
long offset = 0;
if (offsets.get(tp) != null) {
offset = offsets.get(tp).offset();
}
return PartitionAssignmentState.builder()
.group(group)
.coordinator(description.coordinator())
.topic(tp.topic())
.partition(tp.partition())
.logSize(logSize)
.lag(logSize - offset < 0 ? 0 : logSize - offset)
.offset(offset).build();
})
.sorted(Comparator.comparing(PartitionAssignmentState::getPartition))
.collect(Collectors.toList());
}
}
Kafka自身提供的指標(biāo),需要開啟JMX
public static void main(String[] args) {
MBeanServerConnection conn = init("192.168.1.201:9999");
String objName = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
String objAttr = "OneMinuteRate";
try {
ObjectName objectName = new ObjectName(objName);
Object attribute = conn.getAttribute(objectName, objAttr);
System.out.println(attribute.toString());
} catch (MalformedObjectNameException | MBeanException | AttributeNotFoundException |
InstanceNotFoundException | ReflectionException | IOException e) {
throw new RuntimeException(e);
}
}
public static MBeanServerConnection init(String ipAndPort) {
String jmxUrl = "service:jmx:rmi:///jndi/rmi://" + ipAndPort+ "/jmxrmi";
try {
JMXServiceURL jmxServiceURL = new JMXServiceURL(jmxUrl);
return JMXConnectorFactory.connect(jmxServiceURL, null).getMBeanServerConnection();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

kafka-Metrics-MessagesInPerSec.png