Kafka官網(wǎng):http://kafka.apache.org/
入門
1.1 介紹
Kafka? 是一個分布式流處理系統(tǒng),這是什么意思呢?
我們認(rèn)為一個流數(shù)據(jù)平臺具有三個主要功能
- 1.它允許您發(fā)布和訂閱流記錄。在這方面,它類似于一個消息隊列或企業(yè)消息傳遞系統(tǒng)。
- 2.它能讓你以容錯方式進(jìn)行流數(shù)據(jù)的存儲。
- 3.數(shù)據(jù)產(chǎn)生時你就可以進(jìn)行流數(shù)據(jù)處理。
1.1.1 Kafka擅長哪些地方?
它被用于兩大類別的應(yīng)用程序
1.建立實時流數(shù)據(jù)通道,這個通道可以可靠的獲取在系統(tǒng)或應(yīng)用間的數(shù)據(jù)。
2.建立實時流媒體應(yīng)用來轉(zhuǎn)換流數(shù)據(jù)或?qū)α鲾?shù)據(jù)做出反應(yīng)。為了解Kafka怎么做這些事情,讓我們從下面開始深入探索Kafka的功能。
首先是幾個概念:
kafka作為集群運(yùn)行在一臺或多臺服務(wù)器。
Kafka群集存儲流記錄的類別稱為主題(topics)
Kafka的每條記錄包含一個鍵,一個值和一個時間戳。
1.1.2 Kafka 有個核心API:
Producer API 允許應(yīng)用推送流記錄到一個或多個Kafka主題上。
Consumer API 允許應(yīng)用程序訂閱一個或多個主題并且并處理產(chǎn)生的流記錄
Streams API 允許應(yīng)用程序作為一個流處理器,從一個或多個主題獲取流數(shù)據(jù),然后輸出流數(shù)據(jù)到一個或多個主題,有效地將輸入流轉(zhuǎn)換為輸出流。
Connector API 允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者(Producer)
或消費(fèi)者(Consumer)連接Kafka與現(xiàn)有應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如,一個連接器(connector)在關(guān)系數(shù)據(jù)庫中可能獲取每個變化的表。

Kafka客戶端和服務(wù)器之間的通信是用一個簡單的、高性能、語言無關(guān)的TCP協(xié)議。這個協(xié)議是版本可以向下兼容。我們不僅提供java客戶端,同時提供其它多種語言版本的客戶端
1.1.3 主題和日志
主題和日志(Topics-and-Logs)
- 我們首先深入kafka核心概念,kafka提供了一連串的記錄稱為主題。
一個主題是一連串記錄的一個類別或訂閱名稱。一個主題在Kafka總歸有多個訂閱者。所以,一個主題可以有零個、一個或多個消費(fèi)者去訂閱寫到這個主題里面的數(shù)據(jù)。
對于每一個主題,Kafka群集維護(hù)了一個分區(qū)日志,看起來是下面這樣的:

每個分區(qū)是一個有序的,可以不斷追加消息的消息序列。分區(qū)中的每個消息都會分配一個在分區(qū)內(nèi)是唯一的序列號,這個序列號叫做偏移量(offset)。
kafka集群可以配置一個周期來保留所有已經(jīng)發(fā)布的消息(無論這些消息是否已經(jīng)被消費(fèi))。比如:如果消息被設(shè)置保存兩天,那么兩天內(nèi),消息都是可以被消費(fèi)的。但是兩天后為了節(jié)省磁盤空間就會刪除消息。
Kafka的性能與數(shù)據(jù)大小無關(guān),因此數(shù)據(jù)長時間的保存沒有任何問題。

事實上,每個消費(fèi)者僅僅需要保存的元數(shù)據(jù)是消費(fèi)者在日志中的消費(fèi)位置(偏移量),這個偏移量是由消費(fèi)者控制:通常,消費(fèi)者讀取消息后會線性遞增偏移量,但是,消費(fèi)者可以按任意順序消費(fèi)消息。比如:消費(fèi)者可以重置偏移量到老版本。例如,一個消費(fèi)者可以重新設(shè)置偏移量到老的偏移量,重新處理以前或的數(shù)據(jù),或者跳到最近的數(shù)據(jù)開始處理。
kafka的組合特性意味著kafka消費(fèi)者成本非常低,consumer數(shù)量可以增加或減少而對整個集群影響很小。例如,你能夠使用我們的命令行工具“tail”顯示任何主題的內(nèi)容,但是不會改變?nèi)魏未嬖诘腸onsumer。
日志分區(qū)的目的。首先允許日志規(guī)模超出一臺服務(wù)器的文件大小限止。每個單獨(dú)的分區(qū)都必須受限于主機(jī)的文件限止,但一個主題可有多個分區(qū),因此可以處理無限數(shù)量的數(shù)據(jù)。其次可以作為并行的單元,關(guān)于這一點(diǎn)更多細(xì)節(jié)如下。
1.1.4 分布式(Distribution)
日志分區(qū)分布于群集的所有服務(wù)器上,每個服務(wù)器處理全部分區(qū)中的部分分區(qū)數(shù)據(jù)和請求。為了容錯,每個分區(qū)都被復(fù)制到一定數(shù)量(可配置)的不同服務(wù)器上。每個分區(qū)有一臺服務(wù)器作為“領(lǐng)導(dǎo)者(leader)”,
零或多個服務(wù)器作為“追隨者(followers)”。領(lǐng)導(dǎo)者讀取與寫入分區(qū)的同時,追隨者被動的進(jìn)行復(fù)制。如果leader宕機(jī),其中之一的follower會自動成為新的leader。每臺服務(wù)器都做為一些分區(qū)的leader,又做為其它分區(qū)的follower,這樣群集的負(fù)載平衡會很好。
1.1.5 生產(chǎn)者(Producers)
生產(chǎn)者向所選的主題發(fā)布數(shù)據(jù)。生產(chǎn)者負(fù)責(zé)選擇哪些消息應(yīng)該分配到主題內(nèi)的哪個分區(qū)。這種選擇分區(qū)方式,可以使用簡單的循環(huán)方式負(fù)載均衡;也可以通過一些語義分區(qū)函數(shù)實現(xiàn)(如:基于消息的key進(jìn)行劃分)。馬上你會看到更多分區(qū)劃分的使用。
1.1.6 消費(fèi)者(Consumers)
每個消費(fèi)者都屬于一個消費(fèi)組,每一條被推送到主題的記錄被傳遞給訂閱該主題的消費(fèi)組的其中一個消費(fèi)者。消費(fèi)者可以在不同進(jìn)程或者不同的機(jī)器上。如果所有的消費(fèi)者實例有相同的消費(fèi)組,消息將會有效地負(fù)載平衡給這些消費(fèi)者實例。
如果所有的消費(fèi)者實例在不同的消費(fèi)組中,那么每一條消息將會被廣播給所有的消費(fèi)者處理。

兩個服務(wù)器的kafka包含4個分區(qū)(P0-P3),有兩個consumer組。Consumer組A有兩個consumer實例,組B有4個。通常情況下,每個主題一些consumer組,每個 “l(fā)ogical subscriber”一個。每個組由許多consumer實例組成(擴(kuò)展及容錯)。也就是發(fā)布-訂閱,只是訂閱的是一堆consumer而不是單個線程。
Kafka的消費(fèi)實現(xiàn)是把分區(qū)日志平分給每個consumer實例。這個過程由Kafka協(xié)議動態(tài)處理。如果有新的實例加入組,kafka會從組中的其他成員中拿一些分區(qū)給它。如果某個實例掛了,它的分區(qū)分給剩余的實例。
Kafka只保證單個分區(qū)(partition)中記錄的順序,但不保證一個主題(topic)中不同分區(qū)記錄的順序。每個分區(qū)記錄的順序加上key的組合在大多數(shù)場景下都是沒有問題的。如果你希望所有記錄都排序,那只能有一個分區(qū)了,這意味著每個消費(fèi)組只有一個consumer進(jìn)程。
1.1.7 保證(Guarantees)
Kafka給予以下保證:
消息被生產(chǎn)者發(fā)送到一個特定的主題分區(qū),消息將以發(fā)送的順序追加到這個分區(qū)上面。比如,如果消息M1和消息M2被同一個生產(chǎn)者發(fā)送,M1先發(fā)送,M1的偏移量將比M2的小且更早添加到日志里面。
一個消費(fèi)者實例按照記錄存儲在日志上的順序讀取。
一個主題的副本數(shù)是N,我們可以容忍N(yùn)-1個服務(wù)器發(fā)生故障沒而不會丟失任何提交到日志中的記錄。
1.1.8 Kafka作為消息系統(tǒng)(Kafka as a Messaging System)
Kafka的流概念如何與傳統(tǒng)企業(yè)消息系統(tǒng)對比?
-
傳統(tǒng)消息系統(tǒng)有兩個模塊隊列(queuing)和發(fā)布-訂閱(publish-subscribe)。
在隊列中,消費(fèi)者們從服務(wù)器讀取記錄,每條記錄會發(fā)送到其中一個消費(fèi)者;
在發(fā)布-訂閱中,每條記錄會被廣播給所有消費(fèi)者。
這兩個模型尤其缺點(diǎn)和長處。隊列的長處是它允許你對數(shù)據(jù)分割到消費(fèi)者實例進(jìn)行處理,可以擴(kuò)展你的處理規(guī)模。但是隊列不支持多訂閱,當(dāng)某個數(shù)據(jù)被讀取后就在隊列中‘消失’了。發(fā)布-訂閱允許你廣播數(shù)據(jù)到多個處理進(jìn)程中,但是沒法擴(kuò)展處理能力,因為每條記錄被發(fā)送到了所有的訂閱者。
Kafka的消費(fèi)組囊括了這兩個概念。作為隊列,消費(fèi)組允許你把數(shù)據(jù)才分給一堆進(jìn)程處理(也就是消費(fèi)組里面的成員)。作為發(fā)布-訂閱,Kafka允許你把消息廣播到多個消費(fèi)組。
Kafka模型的有點(diǎn)是每個主題都有這兩個特性,它可以擴(kuò)展處理也可以進(jìn)行多訂閱。
Kafka消息的排序也比傳統(tǒng)消息系統(tǒng)好。傳統(tǒng)的隊列在服務(wù)器上保存消息的順序,如果多個消費(fèi)者從隊列中消費(fèi)消息,服務(wù)器就存儲順序發(fā)送消息。雖然服務(wù)器按照順序發(fā)送消息,但是消息抵達(dá)消費(fèi)者卻是異步的。也就是消息到達(dá)不同消費(fèi)者的次序會不一樣。
這個意味著并發(fā)消費(fèi)的時候記錄的順序會打亂。
因此消息系統(tǒng)有一個概念”exclusive consumer” 一次只允許一個進(jìn)程從隊列中進(jìn)行消費(fèi),這也意味著沒法并行處理。
Kafka不存在這樣的問題,它的主題有一個概念‘分區(qū)’,可以保證消息順和負(fù)載平衡。
Kafka將主題中的分區(qū)交給消費(fèi)組中的消費(fèi)者處理,每個分區(qū)被一個組中的消費(fèi)者處理。 這樣就保證一個消費(fèi)者只讀一個分區(qū)并且順序消費(fèi)數(shù)據(jù)。因為一個主題有很多分區(qū),可以平分給消費(fèi)實例進(jìn)行負(fù)載平衡。不過要注意,消費(fèi)者實例數(shù)量不要大于分區(qū),否則沒意義。
1.1.9 Kafka作為存儲系統(tǒng)(Kafka as a Storage System)
任何消息隊列系統(tǒng)都允許存儲動態(tài)信息。不同的是Kafka是一個非常好的存儲系統(tǒng)。
寫到Kafka的數(shù)據(jù)存儲到磁盤并且有副本用于容錯。Kafka允許發(fā)布者等待一個應(yīng)答信號,也就是說直到建立副本確保其存儲,或者寫入失敗一個寫入動作才算完成。
Kafka使用的磁盤結(jié)構(gòu)也易于擴(kuò)展–不管是50KB或者50TB都可以搞定。
Kafka重視存儲也允許客戶端控制讀取位置,你能把Kafka視為一種特殊用途的、致力于高性能、低延遲提交日志存儲、復(fù)制和傳播的分布式文件系統(tǒng)。
1.1.10 Kafka流處理(Kafka for Stream Processing)
僅僅對流數(shù)據(jù)進(jìn)行讀、寫和存儲是不夠的,其目的是要做流數(shù)據(jù)實時處理。在Kafka中一個流處理所做的就是不斷讀取主題的流數(shù)據(jù),對這些數(shù)據(jù)進(jìn)行處理計算,然后發(fā)布計算好的流數(shù)據(jù)到另外一個主題。
例如,一個零售程序可能把銷售和運(yùn)輸信息作為輸入流,然后通過計算把調(diào)整后的價格作為輸出流。簡單的處理可以直接用producer和consumer API。復(fù)雜的就需要用到Kafka提供的Stream API了。可以做聚合運(yùn)算或者與其他流做Join等操作。此工具幫助解決這種類型的應(yīng)用程序所面臨的困難問題:處理無序的數(shù)據(jù),代碼更新后重新處理數(shù)據(jù),執(zhí)行狀態(tài)計算等。
Stream API基于Kafka的核心屬性:它使用producer或consumer API進(jìn)行輸入,使用Kafka進(jìn)行狀態(tài)存儲,并采用同樣的容錯機(jī)制。
1.1.11 總結(jié) (Putting the Pieces Together)
這種消息傳遞、存儲和流處理的組合將Kafka角色變?yōu)榱魈幚砥脚_。
分布式文件系統(tǒng),如HDFS允許存儲靜態(tài)文件進(jìn)行批量處理。像這樣的系統(tǒng)可以存儲和處理歷史數(shù)據(jù)。
傳統(tǒng)的企業(yè)消息系統(tǒng)可以處理未來的信息,當(dāng)信息到達(dá)后應(yīng)用程序就進(jìn)行處理。
Kafka集兩者所長,作為流應(yīng)用平臺及流數(shù)據(jù)管道。通過結(jié)合存儲和低延遲訂閱,流應(yīng)用程序可以以同樣的方式對待過去和未來數(shù)據(jù)。即一個應(yīng)用程序可以處理存儲歷史數(shù)據(jù),他還能繼續(xù)處理后續(xù)到達(dá)的數(shù)據(jù)。這是一個廣義的概念流處理,貫穿了批處理以及消息驅(qū)動的應(yīng)用程序。同樣的流數(shù)據(jù)管道整合訂閱以及實時事件,使得Kafka管道延遲非常低??煽康卮鎯?shù)據(jù)的能力使它可以用于關(guān)鍵數(shù)據(jù)或與離線系統(tǒng)集成(定期加載或長時間進(jìn)行維護(hù))。數(shù)據(jù)一來流處理程序就可以進(jìn)行處理轉(zhuǎn)換。
1.2 使用案例
這里列舉了一些
Apache Kafka?流行的使用案例。 有關(guān)這些領(lǐng)域的概述,請參閱此
博文(
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying)。
1.2.1 消息處理(Messaging)
kafka是一個很好的傳統(tǒng)消息代理替代產(chǎn)品。
消息代理有幾種原因:解耦生產(chǎn)者與消息處理、緩存消息等。與大多數(shù)消息系統(tǒng)相比,kafka有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和容錯性,這使它成為大規(guī)模消息處理應(yīng)用很好的解決方案。在我們的經(jīng)驗中,消息傳遞的吞吐量通常情況下是比較低的。但需要端到端延遲低并且取決于Kafka持久性保證。
1.2.2 網(wǎng)站活動跟蹤(Website Activity Tracking)
kafka的原始用例(為此而生)是能重建一套可以實時發(fā)布,實時訂閱消息,用于處理用戶活動軌跡跟蹤的管道。也就是說網(wǎng)站的活動(頁面瀏覽、搜索、用戶其它行為)可以按活動類型分別發(fā)布到各自的主題;這些訂閱可以被用于后續(xù)各種用途:包括實時處理、實時監(jiān)控、加載到hadoop、離線數(shù)據(jù)倉庫處理或報告。
因為每個用戶瀏覽頁面都會產(chǎn)生活動消息,因此,活動跟蹤數(shù)據(jù)量非常大。
1.2.3 度量(Metrics)
kafka經(jīng)常被用于處理監(jiān)控數(shù)據(jù)。這涉及到從分布式應(yīng)用收集統(tǒng)計數(shù)據(jù),并且做為后續(xù)分析的一個統(tǒng)一的數(shù)據(jù)源。(即分布式統(tǒng)計數(shù)據(jù)查詢?nèi)肟诨虼恚?/p>
1.2.4 日志收集(Log Aggregation)
很多人把kafka做為日志收集解決方案。日志收集是從服務(wù)器上采集日志文件并把它們放入一個集中位置(如:文件服務(wù)器或HDFS)統(tǒng)一處理。kafka抽象了文件細(xì)節(jié),并給出一個日志或事件消息流。這允許更低的延時處理,更容易支持多數(shù)據(jù)源以及分布式消息處理。與Scribe和Flume相比,kafka提供同樣的良好性能,并提供更好的可用性(因為多個副本),和更低的延時。
1.2.5 流處理(Stream Processing)
很多kafka用戶,通過把數(shù)據(jù)處理分成多個步驟,每個步驟處理數(shù)據(jù)的不同功能并放入此步驟的topic中,并通過kafka topics串聯(lián)起所有步驟,形成一個數(shù)據(jù)處理通道。
如:一個處理新聞的流程:首先通過RSS收集新聞,并發(fā)布到”articles”主題中;第二步,從“articles”主題中取新聞并清洗重復(fù)內(nèi)容,然后發(fā)布一個新的主題中;最后,從上步的主題中取數(shù)據(jù)并推薦給用戶。這樣的處理管道是基于單個主題的實時數(shù)據(jù)流程圖。
從0.10.0.0版本開始,一個輕量但強(qiáng)大的,被稱為Kafka Streams的功能用于處理這樣的數(shù)據(jù)。除了Kafka Stream還有另外相似的開源工具:Apache Storm 和Apache Samza。
1.2.6 事件追溯(Event Sourcing)*
事件追溯是一種應(yīng)用程序設(shè)計風(fēng)格,按狀態(tài)更改時間順序保存記錄序列。Kafka強(qiáng)大的存儲能力很適合做這種程序的數(shù)據(jù)后端。
1.2.7 提交日志(Commit Log)*
Kafka可以做為分布式系統(tǒng)的外部提交日志服務(wù)器??梢詭椭植际焦?jié)點(diǎn)存儲數(shù)據(jù)失敗時,做為重新同步機(jī)制,在節(jié)點(diǎn)與操作之間復(fù)制日志,以恢復(fù)數(shù)據(jù)。log compaction的特性使得Kafka支持這種使用方法。這種使用方式與Apache BookKeeper非常相似。
1.3 快速開始
本教程,假設(shè)你沒有任何kafka知識。并且沒有現(xiàn)成的Kafka? 和ZooKeeper數(shù)據(jù)。Kafka的命令行腳本在Windows平臺和Unix平臺不一樣,在Windows平臺請用bin\windows\代替bin/ ,腳本的擴(kuò)展名請改為.bat.
1.3.1 Step 1: 下載代碼
下載
0.10.0.0版本代碼,并且解壓
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
1.3.2 Step 2:啟動服務(wù)
kafka依賴zookeeper,因此首先要啟動zookeeper;如果沒有安裝獨(dú)立的zookeeper,可以使用kafka內(nèi)嵌的zookepper。雖然簡單暴力,但并不建議。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
啟動Kafka服務(wù):
bin/kafka-server-start.sh config/server.properties
[2013-04-22聽15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22聽15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to聽1048576聽(kafka.utils.VerifiableProperties)
...
1.3.3 Step 3: 建立主題
我們建立一個名為“test”的單分區(qū)單副本主題:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看剛創(chuàng)建的主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以通過配置“自動創(chuàng)建主題”,這樣如果沒有提前創(chuàng)建主題,那么在發(fā)布消息時,如果此消息對應(yīng)的主題不存在,會自動創(chuàng)建。
1.3.4 Step 4: 發(fā)送信息
Kafka有個命令行客戶端可以通過文件或標(biāo)準(zhǔn)輸入向kafka集群發(fā)送消息。默認(rèn)每行都是一條消息。
啟動生產(chǎn)者(啟動成功進(jìn)入命令行阻塞狀態(tài),可以輸入數(shù)據(jù),回車發(fā)送)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
1.3.5 Step 5:啟動consumer
同樣的Kafka有個命令行可以獲取消息并標(biāo)準(zhǔn)輸出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你在另外一個終端運(yùn)行上面的命令,此時你在producer中發(fā)送消息,那么consumer終端會就顯示該消息。所有命令行工具都有很多選項;不帶參數(shù)運(yùn)行命令會顯示使用文檔。
1.3.6 Step 6:設(shè)置broker群集
到目前為止,我們都在單機(jī)上運(yùn)行Kafka,挺沒勁的。雖然多加機(jī)器操作上并沒有太大改變,不過讓我們感受下,讓我們將我們的集群擴(kuò)展到三個節(jié)點(diǎn)(仍然在我們的本地機(jī)器上)。
首先創(chuàng)建一個配置文件(在Windows中請使用copy命令代替):
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
編輯兩個配置文件如下:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id屬性是集群中每個節(jié)點(diǎn)的唯一且永久的名稱。因為我們正在同一臺機(jī)器上運(yùn)行這些,所以端口和日志目錄也要修改,否則數(shù)據(jù)會相互覆蓋。
因為Zookeeper已經(jīng)在單節(jié)點(diǎn)啟動,所以我們啟動兩個新的broker節(jié)點(diǎn)即可。
bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties &
...
現(xiàn)在創(chuàng)建一個新的主題,并設(shè)置3個副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
現(xiàn)在我們已經(jīng)創(chuàng)建一個集群,但是我們怎么知道每個broker都做了什么?可以執(zhí)行”describe toics”查看
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
這里解釋一下上面輸出的信息。所有分區(qū)信息的概覽,后面每一行都是其中一個分區(qū)的信息,因為我們只有一個分區(qū)因此只有一行
“l(fā)eader” 負(fù)責(zé)指定分區(qū)的所有讀寫操作,每個分區(qū)的Leader都是隨機(jī)選定的。
“replicas” 是復(fù)制此分區(qū)的日志的節(jié)點(diǎn)的列表,無論它們是否為Leader,或者它們當(dāng)前處于活動狀態(tài)。
“isr” 是“同步中”服務(wù)器列表,這個列表中的機(jī)器表示其處于活動狀態(tài),并且與Leader數(shù)據(jù)一致。
注意我們單一節(jié)點(diǎn)的例子,主題只有一個分區(qū),一個節(jié)點(diǎn),當(dāng)我們運(yùn)行
”describe toics”查看狀態(tài)的時候顯示如下信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
跟預(yù)料的一樣,這個主題沒有副本,且只有一個服務(wù)器
讓我們在新的主題中發(fā)送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
消費(fèi)他們
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
讓我們測一下容錯,斷掉Leader的進(jìn)程
ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
在windows中使用:
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar" kafka.Kafka config\server-1.properties 644
taskkill /pid 644 /f
Leader已經(jīng)被從屬者替代,而且也不在in-sync列表里面了:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是信息仍然可以讀取,即使原來的Leader宕機(jī)了
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
1.1.7 Step 7: 使用kafka connect 導(dǎo)入/導(dǎo)出數(shù)據(jù)
從控制臺輸入數(shù)據(jù)雖然比較方便,但是你可能希望從其他數(shù)據(jù)源導(dǎo)入數(shù)據(jù),或者從Kafka導(dǎo)出到其他系統(tǒng)。對于多數(shù)系統(tǒng),您可以使用Kafka Connect來導(dǎo)入或?qū)С鰯?shù)據(jù),而不是自己寫代碼處理。
Kafka Connect是Kafka自帶的一個工具用來導(dǎo)入導(dǎo)出數(shù)據(jù)。該工具可以通過connectors擴(kuò)展,實現(xiàn)與其他系統(tǒng)交互。在快速入門中,我們將看到如何使用Kafka Connect運(yùn)行簡單連接器(connectors)將數(shù)據(jù)從文件導(dǎo)入Kafka主題,并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。
首先建一些測試數(shù)據(jù):
echo -e "foo\nbar" > test.txt
接下來,我們將啟動在standalone模式下運(yùn)行的兩個連接器,這意味著它們在單個本地專用進(jìn)程中運(yùn)行。 我們提供三個配置文件作為參數(shù)。 第一個是Kafka Connect進(jìn)程的配置,包含常見的配置,比如要連接的Kafka服務(wù)器,數(shù)據(jù)序列化格式。其余的配置文件均指定要創(chuàng)建的連接器。 這些文件包括唯一的連接器名稱,實例化的連接器類以及連接器所需的任何其他配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置,并創(chuàng)建兩個連接器:第一個是source connector,從輸入文件讀取行,然后發(fā)送到Kafka主題,第二個是宿連接器(sink connector)它從Kafka主題讀取消息,并將每一行輸出到文件。
在啟動期間,您將看到一些日志消息,包括一些正在實例化的連接器。一旦Kafka Connect進(jìn)程啟動,源連接器開始從test.txt讀取行并將其生成到主題connect-test,然后sink連接器應(yīng)該開始從主題connect-test讀取消息,并將它們寫入文件test.sink.txt。
我們可以通過檢查輸出文件的內(nèi)容來驗證數(shù)據(jù)是否已通過整個管道傳送:
cat test.sink.txt
foo
bar
注意,數(shù)據(jù)存儲在Kafka主題connect-test中,因此我們還可以運(yùn)行控制臺consumer查看主題中的數(shù)據(jù)(或使用自定義consumer程序代碼來處理):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
我們可以繼續(xù)向文件中添加數(shù)據(jù),并查看它通過管道中的傳送:
echo "Another line" >> test.txt
1.1.8 Step 8: 使用kafka Stream處理數(shù)據(jù)
Kafka Streams是Kafka用于實時流處理和分析存儲在Kafka服務(wù)器中數(shù)據(jù)的庫。 這個快速入門示例演示用詞庫編寫的WordCountDemo程序(代碼轉(zhuǎn)為Java 8 lambda表達(dá)式方便閱讀)。
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> **new** KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
它實現(xiàn)WordCount算法,從輸入文本計算單詞出現(xiàn)的數(shù)量。 但是,與你看到的其他WordCount示例不同,該WordCount演示應(yīng)用程序設(shè)計為對無限的無界數(shù)據(jù)流進(jìn)行操作。 與有界變量類似,它有一種有狀態(tài)算法,用于跟蹤和更新單詞的計數(shù)。 然而,由于它必須假定潛在的無界輸入數(shù)據(jù),它將周期性地輸出其當(dāng)前狀態(tài)和結(jié)果,同時繼續(xù)處理更多的數(shù)據(jù),因為它不知道它何時處理了“全部”輸入數(shù)據(jù)。
我們現(xiàn)在將準(zhǔn)備輸入數(shù)據(jù)到Kafka主題,隨后將由Kafka Streams應(yīng)用程序處理。
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
Windows:
echo all streams lead to kafka> file-input.txt
echo hello kafka streams>> file-input.txt
echo|set /p=join kafka summit>> file-input.txt
接下來,我們使用控制臺生成器將輸入數(shù)據(jù)發(fā)送到名為streams-file-input的主題(實際上,流數(shù)據(jù)可能會連續(xù)流入Kafka,應(yīng)用程序?qū)⒉⑿羞\(yùn)行):
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
我們現(xiàn)在可以運(yùn)行WordCount演示應(yīng)用程序來處理輸入的數(shù)據(jù):
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
除了日志條目,將不會有任何STDOUT輸出,因為結(jié)果被連續(xù)寫回Kafka中名為streams-wordcount-output的另一個主題。 演示將運(yùn)行幾秒鐘,然后不像典型的流處理應(yīng)用程序會自動終止。
我們現(xiàn)在可以通過從其輸出主題中讀取數(shù)據(jù)來檢查WordCount演示應(yīng)用程序的輸出:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=**true** \
--property print.value=**true** \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
輸出如下:
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
這里,第一列是Kafka消息鍵,第二列是消息值,是java.lang.String格式。 注意,輸出實際上是連續(xù)的更新流,其中每個數(shù)據(jù)記錄(如上面輸出中的每一行)是每個單詞的更新計數(shù)。 對于具有相同鍵的多個記錄,后的每條統(tǒng)計記錄都是前一次的更新。
現(xiàn)在,您可以向streams-file-input主題寫入更多輸入消息,并觀察添加到
streams-wordcount-output主題的消息,查看跟新記錄。你可以通過Ctrl-C中斷consumer。