利用python做一個(gè)kafka的查看工具

著名的KafkaManager工具應(yīng)該都聽(tīng)說(shuō)過(guò),可是在實(shí)際的使用過(guò)程中發(fā)現(xiàn)一些問(wèn)題,最重要的是當(dāng)topic過(guò)多以后數(shù)據(jù)的刷新很慢,而且無(wú)法查看到當(dāng)前topic的消費(fèi)者組。
于是決定自己做一個(gè),做一個(gè)工具最好的選中無(wú)非是python了,速度快嘛,同時(shí)需要一個(gè)節(jié)目,這里選擇了PyQt5,當(dāng)然也可以選擇TK和wxpython,

OK,先上最終效果圖


image.png

首先需要安裝模塊,python的強(qiáng)大也是各種強(qiáng)大的模塊
這里我們需要用到的模塊是pykafka、PyQt5、pyqt5-tools、pyinstaller
安裝方式用pip
pip install pykafka
pip install PyQt5
pip install pyqt5-tools
pip install pyinstaller


1、先畫(huà)好界面


image.png

2、實(shí)現(xiàn)一個(gè)KafkaManager獲取數(shù)據(jù),這個(gè)類我們用一個(gè)定時(shí)器,參考timer的實(shí)現(xiàn)

class KafkaManger(Thread):
    def __init__(self,host,timeInterval):
        Thread.__init__(self)
        self._host = host
        self._interval = timeInterval
        self.finished = Event()
    def cancel(self):
        self.finished.set()
    def run(self):
        while not self.finished.is_set():
            self._update()
            time.sleep(self._interval)

接著,我們需要用到的是pykafka里面的Cluster
from pykafka import Cluster,handlers
在init里面添加鏈接kafka集群

self._handler = handlers.ThreadingHandler()
self._cluster = Cluster(hosts=self._host,handler=self._handler)

實(shí)現(xiàn)update函數(shù)來(lái)更新Cluster的數(shù)據(jù)

    def _update(self):
        self._cluster.update()

生產(chǎn)者相關(guān)信息的實(shí)現(xiàn)
獲取所有的topic

    def gettopic(self):
        return self._cluster.topics.keys()

獲取topic的offset情況

    def getoffsets_topic(self,topic):
        with self._lock_topic:
            if topic in self._cluster.topics:
                descrip = self._cluster.topics[topic]
                earlistoffset = descrip.earliest_available_offsets()
                latestoffset = descrip.latest_available_offsets()
                topicdescrip = {}
                for id, partition in descrip.partitions.items():
                    leader = partition.leader.id
                    Replicas = []
                    isr = []
                    for i in partition.replicas:
                        Replicas.append(i.id)
                    for i in partition.isr:
                        isr.append(i.id)
                    topicdescrip[id] = TopicDescrip(id, leader, Replicas, isr,
                                                    earlistoffset[id].offset[0],
                                                    latestoffset[id].offset[0])
                return topicdescrip
            else:
                topicdescrip = {}
                for i in range(8):
                    topicdescrip[i] = TopicDescrip(i,
                                                   0,
                                                   0,
                                                   0,
                                                   0,
                                                   0,
                                                   )
                return topicdescrip

這樣獲取topic相關(guān)的信息就完成了,可以獲取到所有的topic和某個(gè)topic的詳細(xì)信息


獲取consumer的信息
獲取所有的consumer

    def getconsumers(self):
        return self._cluster.get_managed_group_descriptions().keys()

獲取consumer的消費(fèi)信息

    def getoffsets(self,group_id):
        with self._lock_consumer:
            group_descrips = self._cluster.get_managed_group_descriptions()
            if group_id in group_descrips:
                if group_id == b'KafkaManagerOffsetCache':
                    return None
                descrips = group_descrips[group_id]
                if descrips[5] != {}:
                    descrip4topic = {}
                    for member_id, groupMember in descrips[5].items():
                        partitions = groupMember[4].partition_assignment[0][1]
                        topics = groupMember[3].topic_names
                        for topic in topics:
                            reqs = [PartitionOffsetFetchRequest(topic, i) for i in
                                    self._cluster.topics[topic].partitions]
                            offset = self._cluster.get_group_coordinator(group_id).fetch_consumer_group_offsets(
                                group_id, reqs)
                            descrip = {}
                            for partition in partitions:
                                descrip[partition] = ConsumerDescrip(partition,
                                                                      offset.topics[topic][partition].offset,
                                                                      group_id,
                                                                      member_id,
                                                                      groupMember[1],
                                                                      groupMember[2],
                                                                      groupMember[3].topic_names,
                                                                        )
                            descrip4topic[topic] = descrip
                    return descrip4topic
                else:
                    descrip = {}
                    descrip4topic = {}
                    for i in range(8):
                        descrip[i] = ConsumerDescrip(i,
                                                 0,
                                                 b'-',
                                                 b'-',
                                                 b'-',
                                                 b'-',
                                                 [b'-'],
                                                 )
                    descrip4topic[b''] = descrip
                    return descrip4topic
            return None

接下來(lái)我們還要更新topic與consumer的對(duì)應(yīng)關(guān)系

    def updatetopic2group(self):
        group_descrips = self._cluster.get_managed_group_descriptions()
        for group_id,descrips in group_descrips.items():
            if group_id == b'KafkaManagerOffsetCache' or descrips[5] == {}:
                continue
            for member_id,groupMember in descrips[5].items():
                topics = groupMember[3].topic_names
                for topic in topics:
                    self._topic2group[topic].add(group_id)

獲取對(duì)應(yīng)關(guān)系接口

    def gettopic2group(self, topic):
        if topic in self._topic2group:
            return self._topic2group[topic]
        return {}

刪除topic的接口,之前記得可以得到當(dāng)前topic在哪個(gè)broker的,后來(lái)沒(méi)找到,干脆直接遍歷所有的broker來(lái)刪除

    def deletetopic(self,topic):
        for id,broker in self._cluster.brokers.items():
            try:
                broker.delete_topics([topic,])
            except Exception as e:
                pass

接下來(lái)是QT的部分
之前畫(huà)好的界面用pyuic轉(zhuǎn)化為python代碼
然后定義KafkaTool

class KafkaTool(QtWidgets.QWidget,Ui_KafkaTool):
    def __init__(self,parent = None):
        super(KafkaTool,self).__init__(parent=parent)
        self.setupUi(self)
        self._started = False
        self._initUi()
        self._createconnections()

初始化界面,建立對(duì)應(yīng)的信號(hào)槽

    def _initUi(self):
        self.unconnect.setEnabled(False)
        self.topic.setSortingEnabled(True)
        self.consumer.setSortingEnabled(True)
        self.stackedWidget.setEnabled(False)
        self.tabWidget.clear()
        self.topic_menu = QtWidgets.QMenu(self.topic)
        self.topic_menu.addAction(self.action_delete)
        self.topic_menu.addAction(self.action_fresh)
        self.topic.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)

    def _createconnections(self):
        self.connect.pressed.connect(self._connect)
        self.unconnect.pressed.connect(self._unconnect)
        self.fresh.pressed.connect(self._fresh_topic)
        self.topic.doubleClicked.connect(self.showtopic)
        self.freshconsumer.pressed.connect(self._fresh_consumer)
        self.consumer.doubleClicked.connect(self.showconsumer)
        self.lineEdit_search_topic.textChanged.connect(self.search_topic)
        self.lineEdit_search_consume.textChanged.connect(self.search_consume)
        self.topic.customContextMenuRequested.connect(self.show_topic_menu)
        self.action_delete.triggered.connect(self.delete_topic)
        self.action_fresh.triggered.connect(self._fresh_topic)

鏈接與斷開(kāi)集群

    def _connect(self):
        self._host = self.host.text()
        try:
            self._cluster = ClusterManager(host=self._host,timeInterval=5)
            self._cluster.start()
            self._started = True
            self.stackedWidget.setEnabled(True)
            self.connect.setEnabled(False)
            self.unconnect.setEnabled(True)
        except Exception as e:
            print(e)

    def _unconnect(self):
        self._cluster.cancel()
        self._cluster.join(10)
        self._started = False
        self.stackedWidget.setEnabled(False)
        self.unconnect.setEnabled(False)
        self.connect.setEnabled(True)

展示topic與consumer

    def _fresh_topic(self):
        try:
            self.topic.clear()
            topics = self._cluster.gettopic()
            for topic in topics:
                self.topic.addItem(topic.decode())
        except Exception as e:
            print(e)
    def _fresh_consumer(self):
            consumers = self._cluster.getconsumers()
            self.updategrouplist(consumers)
    def updategrouplist(self,group_ids):
        self.consumer.clear()
        for group_id in group_ids:
            self.consumer.addItem(group_id.decode('utf-8'))

展示topic的詳細(xì)信息

    def showtopic(self,index):
        try:
            topic = index.data()
            group_ids = self._cluster.gettopic2group(topic.encode('utf-8'))
            self.updategrouplist(group_ids)
            offsets = self._cluster.getoffsets_topic(topic.encode('utf-8'))
            total_offset = 0
            self.topic_decrips.clear()
            self.topic_decrips.setHorizontalHeaderLabels(['分區(qū)','Leader','Replicas','ISR','最早偏移量','最晚偏移量','總偏移量'])
            for id,descrip in offsets.items():
                self.topic_decrips.setItem(id, 0, QTableWidgetItem(str(id)))
                self.topic_decrips.setItem(id, 1, QTableWidgetItem(str(descrip.Leader)))
                self.topic_decrips.setItem(id, 2, QTableWidgetItem(str(descrip.Replicas)))
                self.topic_decrips.setItem(id, 3, QTableWidgetItem(str(descrip.ISR)))
                self.topic_decrips.setItem(id, 4, QTableWidgetItem(str(descrip.earlist)))
                self.topic_decrips.setItem(id, 5, QTableWidgetItem(str(descrip.latist)))
                total_offset += descrip.latist
                #self.topic_decrips.setItem(id, 6, QTableWidgetItem(str(id)))
            self.topic_decrips.setItem(0,6,QTableWidgetItem(str(total_offset)))
        except Exception as e:
            print(e)

展示consumer的詳細(xì)信息

    def showconsumer(self,index):
        try:
            consumer = index.data()
            offsets4topic = self._cluster.getoffsets(consumer.encode('utf-8'))
            self.tabWidget.clear()
            if offsets4topic is not None:
                for topic,offsets in offsets4topic.items():
                    topicoffsets = self._cluster.getoffsets_topic(topic)
                    self.tabWidget.addTab(Table_Consumer(topicoffsets,offsets,self.tabWidget),topic.decode('utf-8','replace'))
        except Exception as e:
            print(e)
class Table_Consumer(Ui_Descrip,QtWidgets.QTableWidget):
    def __init__(self,topicoffsets,descrips:dict,parent = None):
        super(Table_Consumer,self).__init__(parent=parent)
        self.setupUi(self)
        self.setdata(topicoffsets,descrips)

    def setdata(self,topicoffsets,descrips:dict):
        for id, descrip in descrips.items():
            self.consumer_descrips.setItem(id, 0, QTableWidgetItem(str(id)))
            self.consumer_descrips.setItem(id, 1, QTableWidgetItem(str(topicoffsets[id].latist)))
            self.consumer_descrips.setItem(id, 2, QTableWidgetItem(str(descrip.LogSize)))
            self.consumer_descrips.setItem(id, 3, QTableWidgetItem(str(topicoffsets[id].latist - descrip.LogSize)))
            self.consumer_descrips.setItem(id, 4, QTableWidgetItem(str(descrip.group_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 5, QTableWidgetItem(str(descrip.member_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 6, QTableWidgetItem(str(descrip.client_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 7, QTableWidgetItem(str(descrip.client_host.decode('utf-8'))))

實(shí)現(xiàn)topic與consumer的搜索框

    def search_topic(self,a0):
        num = self.topic.count()
        for i in range(num):
            self.topic.item(i).setHidden(True)
        items = self.topic.findItems(a0,QtCore.Qt.MatchContains)
        for item in items:
            item.setHidden(False)

    def search_consume(self,a0):
        num = self.consumer.count()
        for i in range(num):
            self.consumer.item(i).setHidden(True)
        items = self.consumer.findItems(a0,QtCore.Qt.MatchContains)
        for item in items:
            item.setHidden(False)

ok,大功告成,代碼簡(jiǎn)陋,輕看
計(jì)劃后續(xù)實(shí)現(xiàn)讀取指定offset的數(shù)據(jù),最后再搞個(gè)網(wǎng)頁(yè)版的,待更新...

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,899評(píng)論 13 425
  • 一、為什么需要消息系統(tǒng) 1.解耦: 允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。 2.冗...
    java成功之路閱讀 1,526評(píng)論 0 3
  • 分布式系統(tǒng)中,系統(tǒng)由多個(gè)子系統(tǒng)組成,數(shù)據(jù)需要在子系統(tǒng)中高性能、低延遲的流轉(zhuǎn)。Kafka是"發(fā)布-訂閱"消息系統(tǒng),是...
    Goooooooooooal閱讀 1,915評(píng)論 0 0
  • 微博上看到雪庵老師的新作品,一下子很受觸動(dòng)。畫(huà)面中,冰冷的籠子外,一只小鳥(niǎo)獨(dú)自佇立,瞪圓了眼睛瞅著籠門(mén),腦子...
    shine8181閱讀 511評(píng)論 0 0
  • 在圖書(shū)館的人都好可愛(ài), 晚上,圖書(shū)館閉館擁擠的電梯里,貌似是兩個(gè)熟人相遇 -喲,上自習(xí)呀~你考研? -不考 -那你...
    HH牛奶想做的很多閱讀 213評(píng)論 0 0

友情鏈接更多精彩內(nèi)容