Kafka學(xué)習(xí)筆記一

1.簡(jiǎn)介

kafka是一個(gè)分布式的流平臺(tái),一個(gè)流平臺(tái)有以下三個(gè)能力:

  • 發(fā)布和訂閱記錄流,就像一個(gè)消息隊(duì)列或者企業(yè)級(jí)消息系統(tǒng)
  • 以容錯(cuò)的持久方式存儲(chǔ)記錄流。
  • 在記錄發(fā)生時(shí)處理記錄流。

Kafka通常用于兩大類(lèi)應(yīng)用程序:

  • 構(gòu)建實(shí)時(shí)流數(shù)據(jù)管道,在系統(tǒng)或應(yīng)用程序之間可靠地獲取數(shù)據(jù)
  • 構(gòu)建實(shí)時(shí)流應(yīng)用程序,轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流

為了理解Kafka是如何做這些事情的,讓我們深入研究一下Kafka的能力。
首先是幾個(gè)概念:

  • Kafka作為集群運(yùn)行在一個(gè)或多個(gè)服務(wù)器上,這些服務(wù)器可以跨多個(gè)數(shù)據(jù)中心
  • Kafka集群將記錄流存儲(chǔ)在稱(chēng)為主題的類(lèi)別中。
  • 每條記錄由一個(gè)鍵、一個(gè)值和一個(gè)時(shí)間戳組成。

卡夫卡有四個(gè)核心api:

  • 生產(chǎn)者接口,允許一個(gè)應(yīng)用發(fā)布記錄流到一個(gè)或者多個(gè)kafka主題中
  • 消費(fèi)者接口,允許一個(gè)用用訂閱一個(gè)或者多個(gè)主題并且處理產(chǎn)生給它們的記錄流。
  • 流接口,允許一個(gè)應(yīng)用作為一個(gè)流處理器,從一個(gè)或者多個(gè)主題中消費(fèi)一個(gè)輸入流,產(chǎn)生一個(gè)輸出流到一個(gè)或者多個(gè)主題中,有效的轉(zhuǎn)換輸入流到輸出流。
  • 連接接口,允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者或消費(fèi)者,將Kafka主題連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如,關(guān)系數(shù)據(jù)庫(kù)的連接器可能捕獲對(duì)表的每個(gè)更改。

在Kafka中,客戶端和服務(wù)器之間的通信使用簡(jiǎn)單、高性能、語(yǔ)言無(wú)關(guān)的TCP協(xié)議完成。該協(xié)議是版本化的,并與舊版本保持向后兼容性。我們?yōu)榭ǚ蚩ㄌ峁┝艘粋€(gè)Java客戶端,但是客戶端有多種語(yǔ)言版本。

Topics and Logs

讓我們首先深入卡夫卡為記錄流提供的核心抽象——主題。
主題是記錄發(fā)布到的類(lèi)別或提要名稱(chēng),主題在kafka中常是多訂閱用戶的,也就是說(shuō),一個(gè)主題可以有零個(gè)、一個(gè)或多個(gè)訂閱寫(xiě)入其中的數(shù)據(jù)的消費(fèi)者。

對(duì)于每個(gè)主題,Kafka集群維護(hù)一個(gè)分區(qū)日志,每個(gè)分區(qū)都是一個(gè)有序的,不變的記錄序列,這些記錄連續(xù)地附加到一個(gè)結(jié)構(gòu)化的提交日志中。分區(qū)中的每條記錄都被分配了一個(gè)名為偏移量的連續(xù)id號(hào),該偏移量惟一地標(biāo)識(shí)分區(qū)中的每條記錄。

卡夫卡群集使用可配置的保留期持久保存所有已發(fā)布的記錄,無(wú)論它們是否已被使用。例如,如果保留策略設(shè)置為兩天,則在記錄發(fā)布后的兩天內(nèi),該記錄可供使用,之后將被丟棄以釋放空間。Kafka的性能在數(shù)據(jù)大小方面是穩(wěn)定的,所以長(zhǎng)時(shí)間存儲(chǔ)數(shù)據(jù)不是問(wèn)題。

事實(shí)上,基于每個(gè)消費(fèi)者保留的唯一元數(shù)據(jù)是該消費(fèi)者在日志中的偏移量或位置。該偏移量由消費(fèi)者控制:通常消費(fèi)者在讀取記錄時(shí)會(huì)線性地推進(jìn)偏移量,但是,事實(shí)上,因?yàn)槲恢檬怯上M(fèi)者控制的,所以它可以按照自己喜歡的任何順序消費(fèi)記錄。例如,消費(fèi)者可以重置為較舊的偏移量來(lái)重新處理過(guò)去的數(shù)據(jù),或者跳到最近的記錄并從“現(xiàn)在”開(kāi)始消費(fèi)。

這些功能的結(jié)合意味著卡夫卡的消費(fèi)者非常便宜——他們可以來(lái)去自由,而不會(huì)對(duì)集群或其他消費(fèi)者產(chǎn)生太大影響。例如,您可以使用我們的命令行工具來(lái)“跟蹤”任何主題的內(nèi)容,而不改變?nèi)魏维F(xiàn)有消費(fèi)者所消費(fèi)的內(nèi)容。

日志中的分區(qū)有幾個(gè)目的。首先,它們?cè)试S日志擴(kuò)展到適合單個(gè)服務(wù)器的大小之外。每個(gè)單獨(dú)的分區(qū)必須適合托管它的服務(wù)器,但是一個(gè)主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。其次,它們充當(dāng)并行性的單位——稍后會(huì)詳細(xì)介紹。

Distribution

日志的分區(qū)分布在卡夫卡集群中的服務(wù)器上,每個(gè)服務(wù)器處理數(shù)據(jù)和分區(qū)共享請(qǐng)求。為了容錯(cuò),每個(gè)分區(qū)在可配置數(shù)量的服務(wù)器上復(fù)制。

每個(gè)分區(qū)有一個(gè)充當(dāng)“領(lǐng)導(dǎo)者”的服務(wù)器和零個(gè)或多個(gè)充當(dāng)“追隨者”的服務(wù)器。領(lǐng)導(dǎo)者處理分區(qū)的所有讀寫(xiě)請(qǐng)求,而追隨者被動(dòng)地復(fù)制領(lǐng)導(dǎo)者。如果領(lǐng)導(dǎo)者失敗,其中一個(gè)追隨者將自動(dòng)成為新的領(lǐng)導(dǎo)者。每個(gè)服務(wù)器充當(dāng)它的一些分區(qū)的領(lǐng)導(dǎo)者和其他分區(qū)的追隨者,因此集群內(nèi)的負(fù)載非常平衡。

Geo-Replication

Kafka MirrorMaker為集群提供地理復(fù)制支持。使用MirrorMaker,消息可以跨多個(gè)數(shù)據(jù)中心或云區(qū)域復(fù)制。您可以在主動(dòng)/被動(dòng)情況下使用它進(jìn)行備份和恢復(fù);或者在主動(dòng)/主動(dòng)場(chǎng)景中,將數(shù)據(jù)放置得更靠近用戶,或者支持?jǐn)?shù)據(jù)局部性要求。

Producers

生產(chǎn)者發(fā)布數(shù)據(jù)到選擇的主題上,生產(chǎn)者負(fù)責(zé)選擇將哪個(gè)記錄分配給主題中的哪個(gè)分區(qū)。這可以簡(jiǎn)單地通過(guò)循環(huán)方式來(lái)平衡負(fù)載,也可以根據(jù)一些語(yǔ)義分區(qū)函數(shù)(比如基于記錄中的某個(gè)鍵)來(lái)完成。稍后將詳細(xì)介紹分區(qū)的使用!

Consumers

消費(fèi)者用一個(gè)消費(fèi)者組名稱(chēng)來(lái)標(biāo)記自己,發(fā)布到一個(gè)主題的每個(gè)記錄被傳遞到每個(gè)訂閱消費(fèi)者組中的一個(gè)消費(fèi)者實(shí)例。消費(fèi)者實(shí)例可以在不同的過(guò)程中,也可以在不同的機(jī)器上。

如果所有的消費(fèi)者實(shí)例具有相同的消費(fèi)者組,那么記錄將在消費(fèi)者實(shí)例上有效地負(fù)載平衡。

如果所有的消費(fèi)者實(shí)例都有不同的消費(fèi)者組,那么每個(gè)記錄將被廣播給所有的消費(fèi)者進(jìn)程。

一個(gè)雙服務(wù)器卡夫卡集群托管四個(gè)分區(qū)(P0-P3),有兩個(gè)消費(fèi)群。消費(fèi)者組甲有兩個(gè)消費(fèi)者實(shí)例,組乙有四個(gè)。

然而,更常見(jiàn)的是,我們發(fā)現(xiàn)主題有少量的消費(fèi)者群體,每個(gè)“邏輯訂戶”有一個(gè)。每個(gè)組由許多可伸縮性和容錯(cuò)的消費(fèi)者實(shí)例組成。這只不過(guò)是發(fā)布-訂閱語(yǔ)義,其中訂閱者是一群消費(fèi)者,而不是一個(gè)流程。

卡夫卡實(shí)現(xiàn)消費(fèi)的方式是在消費(fèi)實(shí)例上劃分日志中的分區(qū),這樣每個(gè)實(shí)例在任何時(shí)間點(diǎn)都是分區(qū)“公平份額”的唯一消費(fèi)方??ǚ蚩▍f(xié)議動(dòng)態(tài)地處理保持團(tuán)體成員身份的過(guò)程。如果新實(shí)例加入該組,它們將從該組的其他成員那里接管一些分區(qū);如果一個(gè)實(shí)例死亡,它的分區(qū)將被分配給其余的實(shí)例。

卡夫卡只提供了一個(gè)分區(qū)內(nèi)記錄的總順序,而不是一個(gè)主題中不同分區(qū)之間的順序。對(duì)于大多數(shù)應(yīng)用程序來(lái)說(shuō),按分區(qū)排序以及按鍵劃分?jǐn)?shù)據(jù)的能力就足夠了。但是,如果您需要對(duì)記錄進(jìn)行總排序,這可以通過(guò)只有一個(gè)分區(qū)的主題來(lái)實(shí)現(xiàn),盡管這意味著每個(gè)消費(fèi)者組只有一個(gè)消費(fèi)者進(jìn)程。

Multi-tenancy

您可以將kafka部署為多租戶解決方案。多租戶是通過(guò)配置哪些主題可以生產(chǎn)或消費(fèi)數(shù)據(jù)來(lái)實(shí)現(xiàn)的。配額也有運(yùn)營(yíng)支持。管理員可以對(duì)請(qǐng)求定義和實(shí)施配額,以控制客戶端使用的代理資源。有關(guān)更多信息,請(qǐng)參見(jiàn)安全文檔。

Guarantees

kafka給出以下保障:

  • 生產(chǎn)者發(fā)送到特定主題分區(qū)的消息將按照發(fā)送順序進(jìn)行附加。也就是說(shuō),如果記錄M1和M2是由同一個(gè)生產(chǎn)者發(fā)送的,而M1是第一個(gè)發(fā)送的,那么M1的偏移量將低于M2,并且會(huì)出現(xiàn)在日志中的更早位置
  • 消費(fèi)者實(shí)例按照記錄在日志中的存儲(chǔ)順序查看記錄。
  • 對(duì)于復(fù)制因子為N的主題,我們將容忍多達(dá)N-1個(gè)服務(wù)器故障,而不會(huì)丟失提交給日志的任何記錄。

文檔的Design部分給出了關(guān)于這些保證的更多細(xì)節(jié)。

Kafka as a Messaging System

Kafka的流概念與傳統(tǒng)的企業(yè)消息系統(tǒng)相比如何?
消息傳遞傳統(tǒng)上有兩種模式:隊(duì)列和發(fā)布訂閱。在一個(gè)隊(duì)列中,一個(gè)消費(fèi)者池可以從服務(wù)器中讀取,并且每個(gè)記錄都到達(dá)其中一個(gè);在發(fā)布-訂閱中,記錄被廣播給所有消費(fèi)者。這兩種模式各有優(yōu)缺點(diǎn)。隊(duì)列的優(yōu)勢(shì)在于,它允許您將數(shù)據(jù)處理劃分到多個(gè)消費(fèi)者實(shí)例上,這使您可以擴(kuò)展您的處理。不幸的是,隊(duì)列不是多用戶的——一旦一個(gè)進(jìn)程讀取了數(shù)據(jù),它就消失了。發(fā)布-訂閱允許您向多個(gè)進(jìn)程廣播數(shù)據(jù),但是沒(méi)有擴(kuò)展處理的方法,因?yàn)槊總€(gè)消息都發(fā)送給每個(gè)訂閱者。

卡夫卡的消費(fèi)組概念概括了這兩個(gè)概念。與隊(duì)列一樣,消費(fèi)者組允許您將處理劃分為多個(gè)進(jìn)程(消費(fèi)者組的成員)。如同發(fā)布訂閱一樣,卡夫卡允許你向多個(gè)消費(fèi)者組廣播信息。

卡夫卡模型的優(yōu)點(diǎn)是每個(gè)主題都有這兩個(gè)屬性——它可以擴(kuò)展處理,也是多用戶的——沒(méi)有必要選擇其中一個(gè)。

卡夫卡也比傳統(tǒng)的信息系統(tǒng)有更強(qiáng)的順序保證。

傳統(tǒng)的隊(duì)列在服務(wù)器上按順序保留記錄,如果多個(gè)消費(fèi)者從隊(duì)列中消費(fèi),那么服務(wù)器將按照記錄的存儲(chǔ)順序分發(fā)記錄。然而,盡管服務(wù)器按順序分發(fā)記錄,但記錄是異步傳遞給消費(fèi)者的,因此它們可能會(huì)在不同的消費(fèi)者中無(wú)序到達(dá)。這實(shí)際上意味著在并行消費(fèi)的情況下,記錄的排序會(huì)丟失。消息傳遞系統(tǒng)通常通過(guò)“獨(dú)占消費(fèi)者”的概念來(lái)解決這個(gè)問(wèn)題,該概念只允許一個(gè)進(jìn)程從隊(duì)列中消費(fèi),但這當(dāng)然意味著在處理中沒(méi)有并行性。

卡夫卡做得更好。通過(guò)在主題中引入并行性(分區(qū))的概念,卡夫卡能夠在消費(fèi)者進(jìn)程池中提供排序保證和負(fù)載平衡。這是通過(guò)將主題中的分區(qū)分配給消費(fèi)者組中的消費(fèi)者來(lái)實(shí)現(xiàn)的,這樣每個(gè)分區(qū)正好被組中的一個(gè)消費(fèi)者使用。通過(guò)這樣做,我們確保消費(fèi)者是該分區(qū)的唯一讀者,并且按順序消費(fèi)數(shù)據(jù)。由于有許多分區(qū),這仍然平衡了許多消費(fèi)者實(shí)例的負(fù)載。但是,請(qǐng)注意,消費(fèi)者組中的消費(fèi)者實(shí)例不能多于分區(qū)。

Kafka as a Storage System

任何允許發(fā)布消息和消費(fèi)消息分離的消息隊(duì)列都是同樣作為一個(gè)存儲(chǔ)系統(tǒng)。Kafka不同的地方在于,他是一個(gè)很好的存儲(chǔ)系統(tǒng)。

寫(xiě)入卡夫卡的數(shù)據(jù)被寫(xiě)入磁盤(pán)并復(fù)制以實(shí)現(xiàn)容錯(cuò)??ǚ蚩ㄔ试S生產(chǎn)者等待確認(rèn),這樣寫(xiě)操作就不會(huì)被認(rèn)為是完整的,直到它被完全復(fù)制,并保證存留,即使寫(xiě)入的服務(wù)器發(fā)生故障。

卡夫卡的磁盤(pán)結(jié)構(gòu)很好地利用了可伸縮性——無(wú)論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),卡夫卡都會(huì)執(zhí)行相同的操作。

由于重視存儲(chǔ)并允許客戶端控制其讀取位置,您可以將卡夫卡視為一種專(zhuān)用分布式文件系統(tǒng),專(zhuān)門(mén)用于高性能、低延遲的提交日志存儲(chǔ)、復(fù)制和傳播。

有關(guān)卡夫卡提交日志存儲(chǔ)和復(fù)制設(shè)計(jì)的詳細(xì)信息,請(qǐng)閱讀本頁(yè)

Kafka for Stream Processing

僅僅讀取、寫(xiě)入和存儲(chǔ)數(shù)據(jù)流是不夠的,目的是實(shí)現(xiàn)數(shù)據(jù)流的實(shí)時(shí)處理。

在Kafka中,流處理器是指從輸入主題獲取連續(xù)的數(shù)據(jù)流,對(duì)這個(gè)輸入執(zhí)行一些處理,并產(chǎn)生連續(xù)的數(shù)據(jù)流到輸出主題。

例如,一個(gè)零售應(yīng)用程序可能接受銷(xiāo)售和發(fā)貨的輸入流,并根據(jù)這些數(shù)據(jù)計(jì)算出重新訂購(gòu)和價(jià)格調(diào)整的輸出流。

可以直接使用生產(chǎn)者和消費(fèi)者APIs進(jìn)行簡(jiǎn)單的處理。然而,對(duì)于更復(fù)雜的轉(zhuǎn)換,卡夫卡提供了一個(gè)完全集成的流應(yīng)用編程接口。這允許構(gòu)建執(zhí)行非平凡處理的應(yīng)用程序,這些應(yīng)用程序通過(guò)流計(jì)算聚合或?qū)⒘鬟B接在一起。

此功能有助于解決此類(lèi)應(yīng)用程序所面臨的難題:處理無(wú)序數(shù)據(jù)、在代碼更改時(shí)重新處理輸入、執(zhí)行有狀態(tài)計(jì)算等等。

流應(yīng)用編程接口建立在卡夫卡提供的核心原語(yǔ)之上:它使用生產(chǎn)者和消費(fèi)者應(yīng)用編程接口作為輸入,使用卡夫卡作為狀態(tài)存儲(chǔ),并在流處理器實(shí)例之間使用相同的組機(jī)制作為容錯(cuò)。

Putting the Pieces Together

這種消息傳遞、存儲(chǔ)和流處理的結(jié)合可能看起來(lái)不尋常,但對(duì)于卡夫卡作為流平臺(tái)的角色來(lái)說(shuō)卻是至關(guān)重要的。

像HDFS這樣的分布式文件系統(tǒng)允許為批處理存儲(chǔ)靜態(tài)文件。像這樣的系統(tǒng)實(shí)際上允許存儲(chǔ)和處理過(guò)去的歷史數(shù)據(jù)。

傳統(tǒng)的企業(yè)消息系統(tǒng)允許處理您訂閱后將到達(dá)的未來(lái)消息。以這種方式構(gòu)建的應(yīng)用程序在未來(lái)數(shù)據(jù)到達(dá)時(shí)對(duì)其進(jìn)行處理。

卡夫卡結(jié)合了這兩種能力,這種結(jié)合對(duì)于卡夫卡作為流媒體應(yīng)用平臺(tái)的使用以及流媒體數(shù)據(jù)管道都至關(guān)重要。

通過(guò)結(jié)合存儲(chǔ)和低延遲訂閱,流式應(yīng)用程序可以以相同的方式處理過(guò)去和未來(lái)的數(shù)據(jù)。也就是說(shuō),單個(gè)應(yīng)用程序可以處理歷史的、存儲(chǔ)的數(shù)據(jù),但不會(huì)在到達(dá)最后一條記錄時(shí)結(jié)束,而是可以在未來(lái)數(shù)據(jù)到達(dá)時(shí)繼續(xù)處理。這是流處理的廣義概念,包括批處理和消息驅(qū)動(dòng)的應(yīng)用程序。

同樣,對(duì)于流式數(shù)據(jù)管道,對(duì)實(shí)時(shí)事件的訂閱的組合使得將卡夫卡用于非常低延遲的管道成為可能;但是可靠地存儲(chǔ)數(shù)據(jù)的能力使得它可以用于必須保證數(shù)據(jù)交付的關(guān)鍵數(shù)據(jù),或者用于與離線系統(tǒng)集成,離線系統(tǒng)僅周期性地加載數(shù)據(jù),或者可能長(zhǎng)時(shí)間停機(jī)進(jìn)行維護(hù)。流處理設(shè)施使數(shù)據(jù)到達(dá)時(shí)進(jìn)行轉(zhuǎn)換成為可能。

有關(guān)卡夫卡提供的保證、APIs和功能的更多信息,請(qǐng)參見(jiàn)其他文檔。

2.使用案例

下面是kafka的一些常見(jiàn)的使用案例

Messaging

Kafka可以很好地替代傳統(tǒng)的消息代理。消息代理用于各種各樣的原因(將處理與數(shù)據(jù)生成器解耦,緩沖未處理的消息,等等)。與大多數(shù)消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量、內(nèi)置分區(qū)、復(fù)制和容錯(cuò)能力,這使它成為大型消息處理應(yīng)用程序的一個(gè)很好的解決方案。

根據(jù)我們的經(jīng)驗(yàn),消息傳遞的使用通常是相對(duì)較低的吞吐量,但是可能需要較低的端到端延遲,并且通常取決于卡夫卡提供的強(qiáng)大的持久性保證。

在這一領(lǐng)域是卡夫卡比得上傳統(tǒng)的消息系統(tǒng),例如ActiveMQ的或RabbitMQ的。

Website Activity Tracking

卡夫卡最初的使用案例是能夠?qū)⒂脩艋顒?dòng)跟蹤管道重建為一組實(shí)時(shí)發(fā)布訂閱源。這意味著網(wǎng)站活動(dòng)(頁(yè)面視圖、搜索或用戶可能采取的其他操作)發(fā)布到中心主題,每個(gè)活動(dòng)類(lèi)型有一個(gè)主題。這些源可用于訂閱一系列用例,包括實(shí)時(shí)處理、實(shí)時(shí)監(jiān)控,以及加載到Hadoop或離線數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)中進(jìn)行離線處理和報(bào)告。

活動(dòng)跟蹤通常非常大,因?yàn)槊總€(gè)用戶頁(yè)面視圖都會(huì)生成許多活動(dòng)消息。

Metrics

Kafka通常用于操作監(jiān)控?cái)?shù)據(jù)。這包括聚合來(lái)自分布式應(yīng)用程序的統(tǒng)計(jì)信息,以生成操作數(shù)據(jù)的集中提要。

Log Aggregation

許多人用卡夫卡來(lái)代替日志聚合解決方案。日志聚合通常從服務(wù)器收集物理日志文件,并將它們放在中心位置(可能是文件服務(wù)器或HDFS)進(jìn)行處理。卡夫卡將文件的細(xì)節(jié)抽象化,并將日志或事件數(shù)據(jù)更清晰地抽象化為消息流。這允許更低延遲的處理,更容易支持多個(gè)數(shù)據(jù)源和分布式數(shù)據(jù)消耗。與以日志為中心的系統(tǒng)(如抄寫(xiě)員或水槽)相比,卡夫卡提供了同樣好的性能、更強(qiáng)的復(fù)制耐久性保證以及低得多的端到端延遲。

Stream Processing

卡夫卡的許多用戶在由多個(gè)階段組成的處理管道中處理數(shù)據(jù),在這些階段,原始輸入數(shù)據(jù)從卡夫卡主題中被消費(fèi),然后被聚集、豐富或以其他方式轉(zhuǎn)換成新的主題以供進(jìn)一步消費(fèi)或后續(xù)處理。例如,用于推薦新聞文章的處理管道可能從RSS源抓取文章內(nèi)容,并將其發(fā)布到“文章”主題;進(jìn)一步的處理可能會(huì)對(duì)該內(nèi)容進(jìn)行規(guī)范化或重復(fù)數(shù)據(jù)消除,并將清理后的文章內(nèi)容發(fā)布到新主題;最后的處理階段可能會(huì)嘗試向用戶推薦該內(nèi)容。這種處理流水線基于各個(gè)主題創(chuàng)建實(shí)時(shí)數(shù)據(jù)流的圖形。從0.10.0.0開(kāi)始,Apache Kafka提供了一個(gè)稱(chēng)為Kafka Streams的輕量級(jí)但功能強(qiáng)大的流處理庫(kù)來(lái)執(zhí)行上述數(shù)據(jù)處理。除了卡夫卡流,其他開(kāi)源流處理工具包括阿帕奇風(fēng)暴(Apache Storm)和阿帕奇薩姆扎(Apache Samza)。

Event Sourcing

事件源是應(yīng)用程序設(shè)計(jì)的一種風(fēng)格,其中狀態(tài)變化被記錄為按時(shí)間順序排列的記錄序列。卡夫卡對(duì)非常大的存儲(chǔ)日志數(shù)據(jù)的支持使得它成為以這種風(fēng)格構(gòu)建的應(yīng)用程序的優(yōu)秀后端。

Commit Log

Kafka可以作為分布式系統(tǒng)的一種外部提交日志。日志幫助在節(jié)點(diǎn)之間復(fù)制數(shù)據(jù),并充當(dāng)失敗節(jié)點(diǎn)恢復(fù)數(shù)據(jù)的重新同步機(jī)制。Kafka中的日志壓縮特性有助于支持這種用法。在這種用法中,Kafka類(lèi)似于Apache BookKeeper項(xiàng)目。

快速開(kāi)始

本教程假設(shè)你是新手,并且沒(méi)有Kafka和Zookeeper數(shù)據(jù),因?yàn)閗afka控制臺(tái)腳本在unix和windows平臺(tái)是不同的,在windows平臺(tái)上,使用bin\windows\ 取代bin/,并且腳本擴(kuò)展名為.bat。

1.下載并解壓

下載地址2.3.0版本

tar -xzf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0

2.啟動(dòng)服務(wù)

kafka使用Zookeeper,所以你需要首先啟動(dòng)一個(gè)Zookeeper服務(wù)器,你可以使用包中的腳本,開(kāi)啟一個(gè)單節(jié)點(diǎn)的Zookeeper實(shí)例:

?  kafka_2.12-2.3.0 bin/zookeeper-server-start.sh config/zookeeper.properties
[2019-09-27 13:13:12,607] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-09-27 13:13:12,612] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2019-09-27 13:13:12,613] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2019-09-27 13:13:12,641] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2019-09-27 13:13:12,641] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2019-09-27 13:13:12,674] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:host.name=192.168.44.193 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.version=1.8.0_211 (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2019-09-27 13:13:12,674] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer)
......

現(xiàn)在就可以開(kāi)啟kafka服務(wù)了

?  kafka_2.12-2.3.0 bin/kafka-server-start.sh config/server.properties
[2019-09-27 13:14:35,753] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-27 13:14:36,630] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-27 13:14:36,631] INFO starting (kafka.server.KafkaServer)
[2019-09-27 13:14:36,633] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-09-27 13:14:36,674] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-09-27 13:14:36,687] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:host.name=192.168.44.193 (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.version=1.8.0_211 (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-09-27 13:14:36,687] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
......

現(xiàn)在,讓我們創(chuàng)建一個(gè)主題,名為test,只有一個(gè)分區(qū),一個(gè)副本

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

現(xiàn)在查看主題,運(yùn)行以下命令:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092



test

此外,您還可以配置代理,使其在發(fā)布不存在的主題時(shí)自動(dòng)創(chuàng)建主題,而不是手動(dòng)創(chuàng)建主題。

卡夫卡有一個(gè)命令行客戶端,它將從文件或標(biāo)準(zhǔn)輸入中獲取輸入,并將其作為消息發(fā)送給卡夫卡集群。默認(rèn)情況下,每一行都將作為單獨(dú)的消息發(fā)送。
運(yùn)行生成器,然后在控制臺(tái)中鍵入一些消息發(fā)送到服務(wù)器:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

卡夫卡還有一個(gè)命令行消費(fèi)者,可以將消息轉(zhuǎn)儲(chǔ)到標(biāo)準(zhǔn)輸出中。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果您在不同的終端上運(yùn)行上述每個(gè)命令,那么您現(xiàn)在應(yīng)該能夠在生產(chǎn)者終端中鍵入消息,并看到它們出現(xiàn)在消費(fèi)者終端中。

所有的命令行工具都有額外的選項(xiàng);在沒(méi)有參數(shù)的情況下運(yùn)行該命令將顯示詳細(xì)記錄它們的使用信息。

最后,設(shè)置多代理集群

到目前為止,我們一直在運(yùn)行一個(gè)單一代理,但這一點(diǎn)都不好玩。對(duì)kafka來(lái)說(shuō),一個(gè)單一的代理只是一個(gè)大小為1的集群,所以除了再啟動(dòng)幾個(gè)代理實(shí)例之外,沒(méi)有什么變化。但是為了感受一下,讓我們將集群擴(kuò)展到三個(gè)節(jié)點(diǎn)(仍然都在本地機(jī)器上)。

首先,我們?yōu)槊總€(gè)代理創(chuàng)建一個(gè)配置文件(在windows平臺(tái)上使用copy命令):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現(xiàn)在編輯新的文件,設(shè)置以下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id屬性是群集中每個(gè)節(jié)點(diǎn)的唯一和永久名稱(chēng)。我們必須覆蓋端口和日志目錄,只是因?yàn)槲覀冊(cè)谕慌_(tái)機(jī)器上運(yùn)行這些目錄,并且我們希望阻止代理在同一端口上注冊(cè)或覆蓋彼此的數(shù)據(jù)

我們已經(jīng)有了Zookeeper,我們的單個(gè)節(jié)點(diǎn)已經(jīng)啟動(dòng),所以我們只需要啟動(dòng)兩個(gè)新節(jié)點(diǎn):

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

現(xiàn)在創(chuàng)建一個(gè)復(fù)制因子為3的新主題:

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好吧,但是現(xiàn)在我們有了一個(gè)集群,我們?cè)趺粗滥膫€(gè)代理在做什么?要查看,運(yùn)行“describe topics”命令:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    
    

以下是對(duì)輸出的解釋。第一行給出了所有分區(qū)的概要,每一行給出了關(guān)于一個(gè)分區(qū)的信息。因?yàn)檫@個(gè)主題只有一個(gè)分區(qū),所以只有一行。

  • “l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀寫(xiě)的節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)將是隨機(jī)選擇的分區(qū)部分的領(lǐng)導(dǎo)者。
  • “replicas”是復(fù)制這個(gè)分區(qū)日志的節(jié)點(diǎn)列表,無(wú)論它們是主節(jié)點(diǎn)還是當(dāng)前活動(dòng)節(jié)點(diǎn)。
  • “isr”是一組“同步”副本。這是副本列表的子集,它當(dāng)前是活動(dòng)的,并趕上了領(lǐng)導(dǎo)者。

請(qǐng)注意,在我的示例中,節(jié)點(diǎn)1是主題唯一分區(qū)的領(lǐng)導(dǎo)者。

我們可以對(duì)我們創(chuàng)建的原始主題運(yùn)行相同的命令來(lái)查看它在哪里:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

因此,一點(diǎn)也不奇怪--原來(lái)的主題沒(méi)有副本且位于0服務(wù)節(jié)點(diǎn)上,也是我們創(chuàng)建的集群中唯一的服務(wù)節(jié)點(diǎn)。

讓我們?cè)谛碌闹黝}上發(fā)布一些消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在,來(lái)消費(fèi)這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在讓我們測(cè)試一下容錯(cuò)能力。節(jié)點(diǎn)1是領(lǐng)導(dǎo)者,所以讓我們殺了它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在windows平臺(tái)上,可使用:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

領(lǐng)導(dǎo)層已切換到其中一個(gè)追隨者,節(jié)點(diǎn)1不再在同步副本集中:

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是這些信息仍然可以被消費(fèi),即使最初記錄這些信息的領(lǐng)導(dǎo)者已經(jīng)離開(kāi)了:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

從控制臺(tái)寫(xiě)入數(shù)據(jù)并將其寫(xiě)入控制臺(tái)是一個(gè)方便的起點(diǎn),但是您可能希望使用其他來(lái)源的數(shù)據(jù)或?qū)afka的數(shù)據(jù)導(dǎo)出到其他系統(tǒng)。對(duì)于許多系統(tǒng),您可以使用Kafka Connect導(dǎo)入或?qū)С鰯?shù)據(jù),而不是編寫(xiě)定制的集成代碼。

Kafka Connect是Kafka附帶的一個(gè)工具,可以導(dǎo)入和導(dǎo)出數(shù)據(jù)到Kafka。它是一個(gè)運(yùn)行連接器的可擴(kuò)展工具,連接器實(shí)現(xiàn)了與外部系統(tǒng)交互的自定義邏輯。在這個(gè)快速入門(mén)中,我們將看到如何使用簡(jiǎn)單的連接器運(yùn)行Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入Kafka主題,并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。

首先,我們將創(chuàng)建一些用于測(cè)試的種子數(shù)據(jù):

> echo -e "foo\nbar" > test.txt

在windows平臺(tái):

> echo foo> test.txt
> echo bar>> test.txt

接下來(lái),我們將啟動(dòng)兩個(gè)以獨(dú)立模式運(yùn)行的連接器,這意味著它們運(yùn)行在單個(gè)本地專(zhuān)用進(jìn)程中。我們提供了三個(gè)配置文件作為參數(shù)。第一種始終是Kafka連接進(jìn)程的配置,包括要連接的常見(jiàn)配置,如Kafka代理和數(shù)據(jù)的序列化格式。其余的配置文件每個(gè)都指定要?jiǎng)?chuàng)建的連接器。這些文件包括一個(gè)惟一的連接器名稱(chēng)、要實(shí)例化的連接器類(lèi)以及連接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

kafka附帶的這些示例配置文件使用了您之前啟動(dòng)的默認(rèn)本地集群配置,并創(chuàng)建了兩個(gè)連接器:第一個(gè)是源連接器,它從輸入文件中讀取行,并將每一行發(fā)送到一個(gè)kafka主題;第二個(gè)是接收連接器,它從kafka主題中讀取消息,并將每一行生成輸出文件中的行。

在啟動(dòng)過(guò)程中,您會(huì)看到許多日志消息,包括一些指示連接器正在實(shí)例化的消息。一旦kafka連接進(jìn)程開(kāi)啟,源連接器應(yīng)該開(kāi)始從test.txt中讀取行,并將其生成到主題connect-test,而接收連接器應(yīng)該開(kāi)始從主題connect-test中讀取消息,并將其寫(xiě)入文件test.sink.txt.我們可以通過(guò)檢查輸出文件的內(nèi)容來(lái)驗(yàn)證數(shù)據(jù)是否已經(jīng)通過(guò)整個(gè)管道傳送:

> more test.sink.txt
foo
bar

數(shù)據(jù)被存貯在kafka名為connect-test的主題中,所以,我們可以運(yùn)行一個(gè)控制臺(tái)消費(fèi)者來(lái)查看主題中的數(shù)據(jù),如下:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

連接器繼續(xù)處理數(shù)據(jù),因此我們可以將數(shù)據(jù)添加到文件中,并看到它在管道中移動(dòng):

> echo Another line>> test.txt

您應(yīng)該看到這一行出現(xiàn)在控制臺(tái)消費(fèi)者輸出和接收器文件中。

kafka Streams是一個(gè)客戶庫(kù),用于構(gòu)建關(guān)鍵任務(wù)實(shí)時(shí)應(yīng)用和微服務(wù),其中輸入和/或輸出數(shù)據(jù)存儲(chǔ)在卡夫卡集群中。kafka Streams將在客戶端編寫(xiě)和部署標(biāo)準(zhǔn)的Java和Scala應(yīng)用程序的簡(jiǎn)單性與卡夫卡的服務(wù)器端集群技術(shù)的優(yōu)勢(shì)結(jié)合起來(lái),使這些應(yīng)用程序具有高度的可伸縮性、彈性、容錯(cuò)性、分布式等等。這個(gè)快速入門(mén)示例將演示如何運(yùn)行在這個(gè)庫(kù)中編碼的流式應(yīng)用程序。

Ecosystem

在主要發(fā)行版之外,有太多的工具與卡夫卡相結(jié)合。生態(tài)系統(tǒng)頁(yè)面列出了其中的許多,包括流處理系統(tǒng)、Hadoop集成、監(jiān)控和部署工具。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容