著名的KafkaManager工具應(yīng)該都聽(tīng)說(shuō)過(guò),可是在實(shí)際的使用過(guò)程中發(fā)現(xiàn)一些問(wèn)題,最重要的是當(dāng)topic過(guò)多以后數(shù)據(jù)的刷新很慢,而且無(wú)法查看到當(dāng)前topic的消費(fèi)者組。
于是決定自己做一個(gè),做一個(gè)工具最好的選中無(wú)非是python了,速度快嘛,同時(shí)需要一個(gè)節(jié)目,這里選擇了PyQt5,當(dāng)然也可以選擇TK和wxpython,
OK,先上最終效果圖

首先需要安裝模塊,python的強(qiáng)大也是各種強(qiáng)大的模塊
這里我們需要用到的模塊是pykafka、PyQt5、pyqt5-tools、pyinstaller
安裝方式用pip
pip install pykafka
pip install PyQt5
pip install pyqt5-tools
pip install pyinstaller
1、先畫(huà)好界面

2、實(shí)現(xiàn)一個(gè)KafkaManager獲取數(shù)據(jù),這個(gè)類我們用一個(gè)定時(shí)器,參考timer的實(shí)現(xiàn)
class KafkaManger(Thread):
def __init__(self,host,timeInterval):
Thread.__init__(self)
self._host = host
self._interval = timeInterval
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
while not self.finished.is_set():
self._update()
time.sleep(self._interval)
接著,我們需要用到的是pykafka里面的Cluster
from pykafka import Cluster,handlers
在init里面添加鏈接kafka集群
self._handler = handlers.ThreadingHandler()
self._cluster = Cluster(hosts=self._host,handler=self._handler)
實(shí)現(xiàn)update函數(shù)來(lái)更新Cluster的數(shù)據(jù)
def _update(self):
self._cluster.update()
生產(chǎn)者相關(guān)信息的實(shí)現(xiàn)
獲取所有的topic
def gettopic(self):
return self._cluster.topics.keys()
獲取topic的offset情況
def getoffsets_topic(self,topic):
with self._lock_topic:
if topic in self._cluster.topics:
descrip = self._cluster.topics[topic]
earlistoffset = descrip.earliest_available_offsets()
latestoffset = descrip.latest_available_offsets()
topicdescrip = {}
for id, partition in descrip.partitions.items():
leader = partition.leader.id
Replicas = []
isr = []
for i in partition.replicas:
Replicas.append(i.id)
for i in partition.isr:
isr.append(i.id)
topicdescrip[id] = TopicDescrip(id, leader, Replicas, isr,
earlistoffset[id].offset[0],
latestoffset[id].offset[0])
return topicdescrip
else:
topicdescrip = {}
for i in range(8):
topicdescrip[i] = TopicDescrip(i,
0,
0,
0,
0,
0,
)
return topicdescrip
這樣獲取topic相關(guān)的信息就完成了,可以獲取到所有的topic和某個(gè)topic的詳細(xì)信息
獲取consumer的信息
獲取所有的consumer
def getconsumers(self):
return self._cluster.get_managed_group_descriptions().keys()
獲取consumer的消費(fèi)信息
def getoffsets(self,group_id):
with self._lock_consumer:
group_descrips = self._cluster.get_managed_group_descriptions()
if group_id in group_descrips:
if group_id == b'KafkaManagerOffsetCache':
return None
descrips = group_descrips[group_id]
if descrips[5] != {}:
descrip4topic = {}
for member_id, groupMember in descrips[5].items():
partitions = groupMember[4].partition_assignment[0][1]
topics = groupMember[3].topic_names
for topic in topics:
reqs = [PartitionOffsetFetchRequest(topic, i) for i in
self._cluster.topics[topic].partitions]
offset = self._cluster.get_group_coordinator(group_id).fetch_consumer_group_offsets(
group_id, reqs)
descrip = {}
for partition in partitions:
descrip[partition] = ConsumerDescrip(partition,
offset.topics[topic][partition].offset,
group_id,
member_id,
groupMember[1],
groupMember[2],
groupMember[3].topic_names,
)
descrip4topic[topic] = descrip
return descrip4topic
else:
descrip = {}
descrip4topic = {}
for i in range(8):
descrip[i] = ConsumerDescrip(i,
0,
b'-',
b'-',
b'-',
b'-',
[b'-'],
)
descrip4topic[b''] = descrip
return descrip4topic
return None
接下來(lái)我們還要更新topic與consumer的對(duì)應(yīng)關(guān)系
def updatetopic2group(self):
group_descrips = self._cluster.get_managed_group_descriptions()
for group_id,descrips in group_descrips.items():
if group_id == b'KafkaManagerOffsetCache' or descrips[5] == {}:
continue
for member_id,groupMember in descrips[5].items():
topics = groupMember[3].topic_names
for topic in topics:
self._topic2group[topic].add(group_id)
獲取對(duì)應(yīng)關(guān)系接口
def gettopic2group(self, topic):
if topic in self._topic2group:
return self._topic2group[topic]
return {}
刪除topic的接口,之前記得可以得到當(dāng)前topic在哪個(gè)broker的,后來(lái)沒(méi)找到,干脆直接遍歷所有的broker來(lái)刪除
def deletetopic(self,topic):
for id,broker in self._cluster.brokers.items():
try:
broker.delete_topics([topic,])
except Exception as e:
pass
接下來(lái)是QT的部分
之前畫(huà)好的界面用pyuic轉(zhuǎn)化為python代碼
然后定義KafkaTool
class KafkaTool(QtWidgets.QWidget,Ui_KafkaTool):
def __init__(self,parent = None):
super(KafkaTool,self).__init__(parent=parent)
self.setupUi(self)
self._started = False
self._initUi()
self._createconnections()
初始化界面,建立對(duì)應(yīng)的信號(hào)槽
def _initUi(self):
self.unconnect.setEnabled(False)
self.topic.setSortingEnabled(True)
self.consumer.setSortingEnabled(True)
self.stackedWidget.setEnabled(False)
self.tabWidget.clear()
self.topic_menu = QtWidgets.QMenu(self.topic)
self.topic_menu.addAction(self.action_delete)
self.topic_menu.addAction(self.action_fresh)
self.topic.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
def _createconnections(self):
self.connect.pressed.connect(self._connect)
self.unconnect.pressed.connect(self._unconnect)
self.fresh.pressed.connect(self._fresh_topic)
self.topic.doubleClicked.connect(self.showtopic)
self.freshconsumer.pressed.connect(self._fresh_consumer)
self.consumer.doubleClicked.connect(self.showconsumer)
self.lineEdit_search_topic.textChanged.connect(self.search_topic)
self.lineEdit_search_consume.textChanged.connect(self.search_consume)
self.topic.customContextMenuRequested.connect(self.show_topic_menu)
self.action_delete.triggered.connect(self.delete_topic)
self.action_fresh.triggered.connect(self._fresh_topic)
鏈接與斷開(kāi)集群
def _connect(self):
self._host = self.host.text()
try:
self._cluster = ClusterManager(host=self._host,timeInterval=5)
self._cluster.start()
self._started = True
self.stackedWidget.setEnabled(True)
self.connect.setEnabled(False)
self.unconnect.setEnabled(True)
except Exception as e:
print(e)
def _unconnect(self):
self._cluster.cancel()
self._cluster.join(10)
self._started = False
self.stackedWidget.setEnabled(False)
self.unconnect.setEnabled(False)
self.connect.setEnabled(True)
展示topic與consumer
def _fresh_topic(self):
try:
self.topic.clear()
topics = self._cluster.gettopic()
for topic in topics:
self.topic.addItem(topic.decode())
except Exception as e:
print(e)
def _fresh_consumer(self):
consumers = self._cluster.getconsumers()
self.updategrouplist(consumers)
def updategrouplist(self,group_ids):
self.consumer.clear()
for group_id in group_ids:
self.consumer.addItem(group_id.decode('utf-8'))
展示topic的詳細(xì)信息
def showtopic(self,index):
try:
topic = index.data()
group_ids = self._cluster.gettopic2group(topic.encode('utf-8'))
self.updategrouplist(group_ids)
offsets = self._cluster.getoffsets_topic(topic.encode('utf-8'))
total_offset = 0
self.topic_decrips.clear()
self.topic_decrips.setHorizontalHeaderLabels(['分區(qū)','Leader','Replicas','ISR','最早偏移量','最晚偏移量','總偏移量'])
for id,descrip in offsets.items():
self.topic_decrips.setItem(id, 0, QTableWidgetItem(str(id)))
self.topic_decrips.setItem(id, 1, QTableWidgetItem(str(descrip.Leader)))
self.topic_decrips.setItem(id, 2, QTableWidgetItem(str(descrip.Replicas)))
self.topic_decrips.setItem(id, 3, QTableWidgetItem(str(descrip.ISR)))
self.topic_decrips.setItem(id, 4, QTableWidgetItem(str(descrip.earlist)))
self.topic_decrips.setItem(id, 5, QTableWidgetItem(str(descrip.latist)))
total_offset += descrip.latist
#self.topic_decrips.setItem(id, 6, QTableWidgetItem(str(id)))
self.topic_decrips.setItem(0,6,QTableWidgetItem(str(total_offset)))
except Exception as e:
print(e)
展示consumer的詳細(xì)信息
def showconsumer(self,index):
try:
consumer = index.data()
offsets4topic = self._cluster.getoffsets(consumer.encode('utf-8'))
self.tabWidget.clear()
if offsets4topic is not None:
for topic,offsets in offsets4topic.items():
topicoffsets = self._cluster.getoffsets_topic(topic)
self.tabWidget.addTab(Table_Consumer(topicoffsets,offsets,self.tabWidget),topic.decode('utf-8','replace'))
except Exception as e:
print(e)
class Table_Consumer(Ui_Descrip,QtWidgets.QTableWidget):
def __init__(self,topicoffsets,descrips:dict,parent = None):
super(Table_Consumer,self).__init__(parent=parent)
self.setupUi(self)
self.setdata(topicoffsets,descrips)
def setdata(self,topicoffsets,descrips:dict):
for id, descrip in descrips.items():
self.consumer_descrips.setItem(id, 0, QTableWidgetItem(str(id)))
self.consumer_descrips.setItem(id, 1, QTableWidgetItem(str(topicoffsets[id].latist)))
self.consumer_descrips.setItem(id, 2, QTableWidgetItem(str(descrip.LogSize)))
self.consumer_descrips.setItem(id, 3, QTableWidgetItem(str(topicoffsets[id].latist - descrip.LogSize)))
self.consumer_descrips.setItem(id, 4, QTableWidgetItem(str(descrip.group_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 5, QTableWidgetItem(str(descrip.member_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 6, QTableWidgetItem(str(descrip.client_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 7, QTableWidgetItem(str(descrip.client_host.decode('utf-8'))))
實(shí)現(xiàn)topic與consumer的搜索框
def search_topic(self,a0):
num = self.topic.count()
for i in range(num):
self.topic.item(i).setHidden(True)
items = self.topic.findItems(a0,QtCore.Qt.MatchContains)
for item in items:
item.setHidden(False)
def search_consume(self,a0):
num = self.consumer.count()
for i in range(num):
self.consumer.item(i).setHidden(True)
items = self.consumer.findItems(a0,QtCore.Qt.MatchContains)
for item in items:
item.setHidden(False)
ok,大功告成,代碼簡(jiǎn)陋,輕看
計(jì)劃后續(xù)實(shí)現(xiàn)讀取指定offset的數(shù)據(jù),最后再搞個(gè)網(wǎng)頁(yè)版的,待更新...