Kafka入門

Kafka官網(wǎng):http://kafka.apache.org/
入門
1.1 介紹
Kafka? 是一個分布式流處理系統(tǒng),這是什么意思呢?
我們認(rèn)為一個流數(shù)據(jù)平臺具有三個主要功能

  • 1.它允許您發(fā)布和訂閱流記錄。在這方面,它類似于一個消息隊列或企業(yè)消息傳遞系統(tǒng)。
  • 2.它能讓你以容錯方式進(jìn)行流數(shù)據(jù)的存儲。
  • 3.數(shù)據(jù)產(chǎn)生時你就可以進(jìn)行流數(shù)據(jù)處理。

1.1.1 Kafka擅長哪些地方?
它被用于兩大類別的應(yīng)用程序

  • 1.建立實時流數(shù)據(jù)通道,這個通道可以可靠的獲取在系統(tǒng)或應(yīng)用間的數(shù)據(jù)。

  • 2.建立實時流媒體應(yīng)用來轉(zhuǎn)換流數(shù)據(jù)或?qū)α鲾?shù)據(jù)做出反應(yīng)。為了解Kafka怎么做這些事情,讓我們從下面開始深入探索Kafka的功能。

首先是幾個概念:

  • kafka作為集群運(yùn)行在一臺或多臺服務(wù)器。

  • Kafka群集存儲流記錄的類別稱為主題(topics)

  • Kafka的每條記錄包含一個鍵,一個值和一個時間戳。

1.1.2 Kafka 有個核心API:

  • Producer API 允許應(yīng)用推送流記錄到一個或多個Kafka主題上。

  • Consumer API 允許應(yīng)用程序訂閱一個或多個主題并且并處理產(chǎn)生的流記錄

  • Streams API 允許應(yīng)用程序作為一個流處理器,從一個或多個主題獲取流數(shù)據(jù),然后輸出流數(shù)據(jù)到一個或多個主題,有效地將輸入流轉(zhuǎn)換為輸出流。

  • Connector API 允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者(Producer)
    或消費(fèi)者(Consumer)連接Kafka與現(xiàn)有應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如,一個連接器(connector)在關(guān)系數(shù)據(jù)庫中可能獲取每個變化的表。

Paste_Image.png

Kafka客戶端和服務(wù)器之間的通信是用一個簡單的、高性能、語言無關(guān)的TCP協(xié)議。這個協(xié)議是版本可以向下兼容。我們不僅提供java客戶端,同時提供其它多種語言版本的客戶端

1.1.3 主題和日志
主題和日志(Topics-and-Logs)

  • 我們首先深入kafka核心概念,kafka提供了一連串的記錄稱為主題。
    一個主題是一連串記錄的一個類別或訂閱名稱。一個主題在Kafka總歸有多個訂閱者。所以,一個主題可以有零個、一個或多個消費(fèi)者去訂閱寫到這個主題里面的數(shù)據(jù)。
    對于每一個主題,Kafka群集維護(hù)了一個分區(qū)日志,看起來是下面這樣的:
Paste_Image.png
  • 每個分區(qū)是一個有序的,可以不斷追加消息的消息序列。分區(qū)中的每個消息都會分配一個在分區(qū)內(nèi)是唯一的序列號,這個序列號叫做偏移量(offset)。

  • kafka集群可以配置一個周期來保留所有已經(jīng)發(fā)布的消息(無論這些消息是否已經(jīng)被消費(fèi))。比如:如果消息被設(shè)置保存兩天,那么兩天內(nèi),消息都是可以被消費(fèi)的。但是兩天后為了節(jié)省磁盤空間就會刪除消息。

  • Kafka的性能與數(shù)據(jù)大小無關(guān),因此數(shù)據(jù)長時間的保存沒有任何問題。

Paste_Image.png
  • 事實上,每個消費(fèi)者僅僅需要保存的元數(shù)據(jù)是消費(fèi)者在日志中的消費(fèi)位置(偏移量),這個偏移量是由消費(fèi)者控制:通常,消費(fèi)者讀取消息后會線性遞增偏移量,但是,消費(fèi)者可以按任意順序消費(fèi)消息。比如:消費(fèi)者可以重置偏移量到老版本。例如,一個消費(fèi)者可以重新設(shè)置偏移量到老的偏移量,重新處理以前或的數(shù)據(jù),或者跳到最近的數(shù)據(jù)開始處理。

  • kafka的組合特性意味著kafka消費(fèi)者成本非常低,consumer數(shù)量可以增加或減少而對整個集群影響很小。例如,你能夠使用我們的命令行工具“tail”顯示任何主題的內(nèi)容,但是不會改變?nèi)魏未嬖诘腸onsumer。

  • 日志分區(qū)的目的。首先允許日志規(guī)模超出一臺服務(wù)器的文件大小限止。每個單獨(dú)的分區(qū)都必須受限于主機(jī)的文件限止,但一個主題可有多個分區(qū),因此可以處理無限數(shù)量的數(shù)據(jù)。其次可以作為并行的單元,關(guān)于這一點(diǎn)更多細(xì)節(jié)如下。

1.1.4 分布式(Distribution)
日志分區(qū)分布于群集的所有服務(wù)器上,每個服務(wù)器處理全部分區(qū)中的部分分區(qū)數(shù)據(jù)和請求。為了容錯,每個分區(qū)都被復(fù)制到一定數(shù)量(可配置)的不同服務(wù)器上。每個分區(qū)有一臺服務(wù)器作為“領(lǐng)導(dǎo)者(leader)”,
零或多個服務(wù)器作為“追隨者(followers)”。領(lǐng)導(dǎo)者讀取與寫入分區(qū)的同時,追隨者被動的進(jìn)行復(fù)制。如果leader宕機(jī),其中之一的follower會自動成為新的leader。每臺服務(wù)器都做為一些分區(qū)的leader,又做為其它分區(qū)的follower,這樣群集的負(fù)載平衡會很好。

1.1.5 生產(chǎn)者(Producers)
生產(chǎn)者向所選的主題發(fā)布數(shù)據(jù)。生產(chǎn)者負(fù)責(zé)選擇哪些消息應(yīng)該分配到主題內(nèi)的哪個分區(qū)。這種選擇分區(qū)方式,可以使用簡單的循環(huán)方式負(fù)載均衡;也可以通過一些語義分區(qū)函數(shù)實現(xiàn)(如:基于消息的key進(jìn)行劃分)。馬上你會看到更多分區(qū)劃分的使用。

1.1.6 消費(fèi)者(Consumers)

  • 每個消費(fèi)者都屬于一個消費(fèi)組,每一條被推送到主題的記錄被傳遞給訂閱該主題的消費(fèi)組的其中一個消費(fèi)者。消費(fèi)者可以在不同進(jìn)程或者不同的機(jī)器上。如果所有的消費(fèi)者實例有相同的消費(fèi)組,消息將會有效地負(fù)載平衡給這些消費(fèi)者實例。

  • 如果所有的消費(fèi)者實例在不同的消費(fèi)組中,那么每一條消息將會被廣播給所有的消費(fèi)者處理。

Paste_Image.png
  • 兩個服務(wù)器的kafka包含4個分區(qū)(P0-P3),有兩個consumer組。Consumer組A有兩個consumer實例,組B有4個。通常情況下,每個主題一些consumer組,每個 “l(fā)ogical subscriber”一個。每個組由許多consumer實例組成(擴(kuò)展及容錯)。也就是發(fā)布-訂閱,只是訂閱的是一堆consumer而不是單個線程。

  • Kafka的消費(fèi)實現(xiàn)是把分區(qū)日志平分給每個consumer實例。這個過程由Kafka協(xié)議動態(tài)處理。如果有新的實例加入組,kafka會從組中的其他成員中拿一些分區(qū)給它。如果某個實例掛了,它的分區(qū)分給剩余的實例。

  • Kafka只保證單個分區(qū)(partition)中記錄的順序,但不保證一個主題(topic)中不同分區(qū)記錄的順序。每個分區(qū)記錄的順序加上key的組合在大多數(shù)場景下都是沒有問題的。如果你希望所有記錄都排序,那只能有一個分區(qū)了,這意味著每個消費(fèi)組只有一個consumer進(jìn)程。

1.1.7 保證(Guarantees)
Kafka給予以下保證:

  • 消息被生產(chǎn)者發(fā)送到一個特定的主題分區(qū),消息將以發(fā)送的順序追加到這個分區(qū)上面。比如,如果消息M1和消息M2被同一個生產(chǎn)者發(fā)送,M1先發(fā)送,M1的偏移量將比M2的小且更早添加到日志里面。

  • 一個消費(fèi)者實例按照記錄存儲在日志上的順序讀取。

  • 一個主題的副本數(shù)是N,我們可以容忍N(yùn)-1個服務(wù)器發(fā)生故障沒而不會丟失任何提交到日志中的記錄。

1.1.8 Kafka作為消息系統(tǒng)(Kafka as a Messaging System)
Kafka的流概念如何與傳統(tǒng)企業(yè)消息系統(tǒng)對比?

  • 傳統(tǒng)消息系統(tǒng)有兩個模塊隊列(queuing)發(fā)布-訂閱(publish-subscribe)。

    • 在隊列中,消費(fèi)者們從服務(wù)器讀取記錄,每條記錄會發(fā)送到其中一個消費(fèi)者;

    • 在發(fā)布-訂閱中,每條記錄會被廣播給所有消費(fèi)者。

    • 這兩個模型尤其缺點(diǎn)和長處。隊列的長處是它允許你對數(shù)據(jù)分割到消費(fèi)者實例進(jìn)行處理,可以擴(kuò)展你的處理規(guī)模。但是隊列不支持多訂閱,當(dāng)某個數(shù)據(jù)被讀取后就在隊列中‘消失’了。發(fā)布-訂閱允許你廣播數(shù)據(jù)到多個處理進(jìn)程中,但是沒法擴(kuò)展處理能力,因為每條記錄被發(fā)送到了所有的訂閱者。

  • Kafka的消費(fèi)組囊括了這兩個概念。作為隊列,消費(fèi)組允許你把數(shù)據(jù)才分給一堆進(jìn)程處理(也就是消費(fèi)組里面的成員)。作為發(fā)布-訂閱,Kafka允許你把消息廣播到多個消費(fèi)組。

  • Kafka模型的有點(diǎn)是每個主題都有這兩個特性,它可以擴(kuò)展處理也可以進(jìn)行多訂閱。

  • Kafka消息的排序也比傳統(tǒng)消息系統(tǒng)好。傳統(tǒng)的隊列在服務(wù)器上保存消息的順序,如果多個消費(fèi)者從隊列中消費(fèi)消息,服務(wù)器就存儲順序發(fā)送消息。雖然服務(wù)器按照順序發(fā)送消息,但是消息抵達(dá)消費(fèi)者卻是異步的。也就是消息到達(dá)不同消費(fèi)者的次序會不一樣。
    這個意味著并發(fā)消費(fèi)的時候記錄的順序會打亂。

因此消息系統(tǒng)有一個概念”exclusive consumer” 一次只允許一個進(jìn)程從隊列中進(jìn)行消費(fèi),這也意味著沒法并行處理。

  • Kafka不存在這樣的問題,它的主題有一個概念‘分區(qū)’,可以保證消息順和負(fù)載平衡。

  • Kafka將主題中的分區(qū)交給消費(fèi)組中的消費(fèi)者處理,每個分區(qū)被一個組中的消費(fèi)者處理。 這樣就保證一個消費(fèi)者只讀一個分區(qū)并且順序消費(fèi)數(shù)據(jù)。因為一個主題有很多分區(qū),可以平分給消費(fèi)實例進(jìn)行負(fù)載平衡。不過要注意,消費(fèi)者實例數(shù)量不要大于分區(qū),否則沒意義。

1.1.9 Kafka作為存儲系統(tǒng)(Kafka as a Storage System)

  • 任何消息隊列系統(tǒng)都允許存儲動態(tài)信息。不同的是Kafka是一個非常好的存儲系統(tǒng)。
    寫到Kafka的數(shù)據(jù)存儲到磁盤并且有副本用于容錯。

  • Kafka允許發(fā)布者等待一個應(yīng)答信號,也就是說直到建立副本確保其存儲,或者寫入失敗一個寫入動作才算完成。

  • Kafka使用的磁盤結(jié)構(gòu)也易于擴(kuò)展–不管是50KB或者50TB都可以搞定。

  • Kafka重視存儲也允許客戶端控制讀取位置,你能把Kafka視為一種特殊用途的、致力于高性能、低延遲提交日志存儲、復(fù)制和傳播的分布式文件系統(tǒng)。

1.1.10 Kafka流處理(Kafka for Stream Processing)

  • 僅僅對流數(shù)據(jù)進(jìn)行讀、寫和存儲是不夠的,其目的是要做流數(shù)據(jù)實時處理。在Kafka中一個流處理所做的就是不斷讀取主題的流數(shù)據(jù),對這些數(shù)據(jù)進(jìn)行處理計算,然后發(fā)布計算好的流數(shù)據(jù)到另外一個主題。
    例如,一個零售程序可能把銷售和運(yùn)輸信息作為輸入流,然后通過計算把調(diào)整后的價格作為輸出流。

  • 簡單的處理可以直接用producer和consumer API。復(fù)雜的就需要用到Kafka提供的Stream API了。可以做聚合運(yùn)算或者與其他流做Join等操作。此工具幫助解決這種類型的應(yīng)用程序所面臨的困難問題:處理無序的數(shù)據(jù),代碼更新后重新處理數(shù)據(jù),執(zhí)行狀態(tài)計算等。

  • Stream API基于Kafka的核心屬性:它使用producer或consumer API進(jìn)行輸入,使用Kafka進(jìn)行狀態(tài)存儲,并采用同樣的容錯機(jī)制。

1.1.11 總結(jié) (Putting the Pieces Together)

  • 這種消息傳遞、存儲和流處理的組合將Kafka角色變?yōu)榱魈幚砥脚_。

  • 分布式文件系統(tǒng),如HDFS允許存儲靜態(tài)文件進(jìn)行批量處理。像這樣的系統(tǒng)可以存儲和處理歷史數(shù)據(jù)。

  • 傳統(tǒng)的企業(yè)消息系統(tǒng)可以處理未來的信息,當(dāng)信息到達(dá)后應(yīng)用程序就進(jìn)行處理。

  • Kafka集兩者所長,作為流應(yīng)用平臺及流數(shù)據(jù)管道。通過結(jié)合存儲和低延遲訂閱,流應(yīng)用程序可以以同樣的方式對待過去和未來數(shù)據(jù)。即一個應(yīng)用程序可以處理存儲歷史數(shù)據(jù),他還能繼續(xù)處理后續(xù)到達(dá)的數(shù)據(jù)。這是一個廣義的概念流處理,貫穿了批處理以及消息驅(qū)動的應(yīng)用程序。同樣的流數(shù)據(jù)管道整合訂閱以及實時事件,使得Kafka管道延遲非常低??煽康卮鎯?shù)據(jù)的能力使它可以用于關(guān)鍵數(shù)據(jù)或與離線系統(tǒng)集成(定期加載或長時間進(jìn)行維護(hù))。數(shù)據(jù)一來流處理程序就可以進(jìn)行處理轉(zhuǎn)換。

1.2 使用案例
這里列舉了一些
Apache Kafka?流行的使用案例。 有關(guān)這些領(lǐng)域的概述,請參閱此
博文
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying)。

1.2.1 消息處理(Messaging)

  • kafka是一個很好的傳統(tǒng)消息代理替代產(chǎn)品。

  • 消息代理有幾種原因:解耦生產(chǎn)者與消息處理、緩存消息等。與大多數(shù)消息系統(tǒng)相比,kafka有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和容錯性,這使它成為大規(guī)模消息處理應(yīng)用很好的解決方案。在我們的經(jīng)驗中,消息傳遞的吞吐量通常情況下是比較低的。但需要端到端延遲低并且取決于Kafka持久性保證。

  • 在此域中,Kafka可與傳統(tǒng)的消息傳遞系統(tǒng)(如ActiveMQRabbitMQ)相媲美。

1.2.2 網(wǎng)站活動跟蹤(Website Activity Tracking)
kafka的原始用例(為此而生)是能重建一套可以實時發(fā)布,實時訂閱消息,用于處理用戶活動軌跡跟蹤的管道。也就是說網(wǎng)站的活動(頁面瀏覽、搜索、用戶其它行為)可以按活動類型分別發(fā)布到各自的主題;這些訂閱可以被用于后續(xù)各種用途:包括實時處理、實時監(jiān)控、加載到hadoop、離線數(shù)據(jù)倉庫處理或報告。
因為每個用戶瀏覽頁面都會產(chǎn)生活動消息,因此,活動跟蹤數(shù)據(jù)量非常大。

1.2.3 度量(Metrics)
kafka經(jīng)常被用于處理監(jiān)控數(shù)據(jù)。這涉及到從分布式應(yīng)用收集統(tǒng)計數(shù)據(jù),并且做為后續(xù)分析的一個統(tǒng)一的數(shù)據(jù)源。(即分布式統(tǒng)計數(shù)據(jù)查詢?nèi)肟诨虼恚?/p>

1.2.4 日志收集(Log Aggregation)
很多人把kafka做為日志收集解決方案。日志收集是從服務(wù)器上采集日志文件并把它們放入一個集中位置(如:文件服務(wù)器或HDFS)統(tǒng)一處理。kafka抽象了文件細(xì)節(jié),并給出一個日志或事件消息流。這允許更低的延時處理,更容易支持多數(shù)據(jù)源以及分布式消息處理。與Scribe和Flume相比,kafka提供同樣的良好性能,并提供更好的可用性(因為多個副本),和更低的延時。

1.2.5 流處理(Stream Processing)

  • 很多kafka用戶,通過把數(shù)據(jù)處理分成多個步驟,每個步驟處理數(shù)據(jù)的不同功能并放入此步驟的topic中,并通過kafka topics串聯(lián)起所有步驟,形成一個數(shù)據(jù)處理通道。

  • 如:一個處理新聞的流程:首先通過RSS收集新聞,并發(fā)布到”articles”主題中;第二步,從“articles”主題中取新聞并清洗重復(fù)內(nèi)容,然后發(fā)布一個新的主題中;最后,從上步的主題中取數(shù)據(jù)并推薦給用戶。這樣的處理管道是基于單個主題的實時數(shù)據(jù)流程圖。

  • 從0.10.0.0版本開始,一個輕量但強(qiáng)大的,被稱為Kafka Streams的功能用于處理這樣的數(shù)據(jù)。除了Kafka Stream還有另外相似的開源工具:Apache StormApache Samza。

1.2.6 事件追溯(Event Sourcing)*
事件追溯是一種應(yīng)用程序設(shè)計風(fēng)格,按狀態(tài)更改時間順序保存記錄序列。Kafka強(qiáng)大的存儲能力很適合做這種程序的數(shù)據(jù)后端。

1.2.7 提交日志(Commit Log)*
Kafka可以做為分布式系統(tǒng)的外部提交日志服務(wù)器??梢詭椭植际焦?jié)點(diǎn)存儲數(shù)據(jù)失敗時,做為重新同步機(jī)制,在節(jié)點(diǎn)與操作之間復(fù)制日志,以恢復(fù)數(shù)據(jù)。log compaction的特性使得Kafka支持這種使用方法。這種使用方式與Apache BookKeeper非常相似。

1.3 快速開始
本教程,假設(shè)你沒有任何kafka知識。并且沒有現(xiàn)成的Kafka? 和ZooKeeper數(shù)據(jù)。Kafka的命令行腳本在Windows平臺和Unix平臺不一樣,在Windows平臺請用bin\windows\代替bin/ ,腳本的擴(kuò)展名請改為.bat.

1.3.1 Step 1: 下載代碼
下載
0.10.0.0版本代碼,并且解壓

tar -xzf kafka_2.11-0.10.1.0.tgz

cd kafka_2.11-0.10.1.0

1.3.2 Step 2:啟動服務(wù)
kafka依賴zookeeper,因此首先要啟動zookeeper;如果沒有安裝獨(dú)立的zookeeper,可以使用kafka內(nèi)嵌的zookepper。雖然簡單暴力,但并不建議。

bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

...

啟動Kafka服務(wù):

bin/kafka-server-start.sh config/server.properties

[2013-04-22聽15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22聽15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to聽1048576聽(kafka.utils.VerifiableProperties)

...

1.3.3 Step 3: 建立主題
我們建立一個名為“test”的單分區(qū)單副本主題:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看剛創(chuàng)建的主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以通過配置“自動創(chuàng)建主題”,這樣如果沒有提前創(chuàng)建主題,那么在發(fā)布消息時,如果此消息對應(yīng)的主題不存在,會自動創(chuàng)建。

1.3.4 Step 4: 發(fā)送信息
Kafka有個命令行客戶端可以通過文件或標(biāo)準(zhǔn)輸入向kafka集群發(fā)送消息。默認(rèn)每行都是一條消息。
啟動生產(chǎn)者(啟動成功進(jìn)入命令行阻塞狀態(tài),可以輸入數(shù)據(jù),回車發(fā)送)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

1.3.5 Step 5:啟動consumer
同樣的Kafka有個命令行可以獲取消息并標(biāo)準(zhǔn)輸出

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

如果你在另外一個終端運(yùn)行上面的命令,此時你在producer中發(fā)送消息,那么consumer終端會就顯示該消息。所有命令行工具都有很多選項;不帶參數(shù)運(yùn)行命令會顯示使用文檔。

1.3.6 Step 6:設(shè)置broker群集
到目前為止,我們都在單機(jī)上運(yùn)行Kafka,挺沒勁的。雖然多加機(jī)器操作上并沒有太大改變,不過讓我們感受下,讓我們將我們的集群擴(kuò)展到三個節(jié)點(diǎn)(仍然在我們的本地機(jī)器上)。
首先創(chuàng)建一個配置文件(在Windows中請使用copy命令代替):

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

編輯兩個配置文件如下:

config/server-1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

listeners=PLAINTEXT://:9094

log.dir=/tmp/kafka-logs-2

broker.id屬性是集群中每個節(jié)點(diǎn)的唯一且永久的名稱。因為我們正在同一臺機(jī)器上運(yùn)行這些,所以端口和日志目錄也要修改,否則數(shù)據(jù)會相互覆蓋。

因為Zookeeper已經(jīng)在單節(jié)點(diǎn)啟動,所以我們啟動兩個新的broker節(jié)點(diǎn)即可。

bin/kafka-server-start.sh config/server-1.properties &

...

bin/kafka-server-start.sh config/server-2.properties &

...

現(xiàn)在創(chuàng)建一個新的主題,并設(shè)置3個副本。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

現(xiàn)在我們已經(jīng)創(chuàng)建一個集群,但是我們怎么知道每個broker都做了什么?可以執(zhí)行”describe toics”查看

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

這里解釋一下上面輸出的信息。所有分區(qū)信息的概覽,后面每一行都是其中一個分區(qū)的信息,因為我們只有一個分區(qū)因此只有一行

“l(fā)eader” 負(fù)責(zé)指定分區(qū)的所有讀寫操作,每個分區(qū)的Leader都是隨機(jī)選定的。
“replicas” 是復(fù)制此分區(qū)的日志的節(jié)點(diǎn)的列表,無論它們是否為Leader,或者它們當(dāng)前處于活動狀態(tài)。
“isr” 是“同步中”服務(wù)器列表,這個列表中的機(jī)器表示其處于活動狀態(tài),并且與Leader數(shù)據(jù)一致。
注意我們單一節(jié)點(diǎn)的例子,主題只有一個分區(qū),一個節(jié)點(diǎn),當(dāng)我們運(yùn)行
”describe toics”查看狀態(tài)的時候顯示如下信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

跟預(yù)料的一樣,這個主題沒有副本,且只有一個服務(wù)器
讓我們在新的主題中發(fā)送一些消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

消費(fèi)他們

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

讓我們測一下容錯,斷掉Leader的進(jìn)程

ps aux | grep server-1.properties

7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

kill -9 7564

在windows中使用:

wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"

java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar" kafka.Kafka config\server-1.properties 644

taskkill /pid 644 /f

Leader已經(jīng)被從屬者替代,而且也不在in-sync列表里面了:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0

但是信息仍然可以讀取,即使原來的Leader宕機(jī)了

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

1.1.7 Step 7: 使用kafka connect 導(dǎo)入/導(dǎo)出數(shù)據(jù)
從控制臺輸入數(shù)據(jù)雖然比較方便,但是你可能希望從其他數(shù)據(jù)源導(dǎo)入數(shù)據(jù),或者從Kafka導(dǎo)出到其他系統(tǒng)。對于多數(shù)系統(tǒng),您可以使用Kafka Connect來導(dǎo)入或?qū)С鰯?shù)據(jù),而不是自己寫代碼處理。
Kafka Connect是Kafka自帶的一個工具用來導(dǎo)入導(dǎo)出數(shù)據(jù)。該工具可以通過connectors擴(kuò)展,實現(xiàn)與其他系統(tǒng)交互。在快速入門中,我們將看到如何使用Kafka Connect運(yùn)行簡單連接器(connectors)將數(shù)據(jù)從文件導(dǎo)入Kafka主題,并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。

首先建一些測試數(shù)據(jù):

echo -e "foo\nbar" > test.txt

接下來,我們將啟動在standalone模式下運(yùn)行的兩個連接器,這意味著它們在單個本地專用進(jìn)程中運(yùn)行。 我們提供三個配置文件作為參數(shù)。 第一個是Kafka Connect進(jìn)程的配置,包含常見的配置,比如要連接的Kafka服務(wù)器,數(shù)據(jù)序列化格式。其余的配置文件均指定要創(chuàng)建的連接器。 這些文件包括唯一的連接器名稱,實例化的連接器類以及連接器所需的任何其他配置。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置,并創(chuàng)建兩個連接器:第一個是source connector,從輸入文件讀取行,然后發(fā)送到Kafka主題,第二個是宿連接器(sink connector)它從Kafka主題讀取消息,并將每一行輸出到文件。
在啟動期間,您將看到一些日志消息,包括一些正在實例化的連接器。一旦Kafka Connect進(jìn)程啟動,源連接器開始從test.txt讀取行并將其生成到主題connect-test,然后sink連接器應(yīng)該開始從主題connect-test讀取消息,并將它們寫入文件test.sink.txt。
我們可以通過檢查輸出文件的內(nèi)容來驗證數(shù)據(jù)是否已通過整個管道傳送:

cat test.sink.txt

foo

bar

注意,數(shù)據(jù)存儲在Kafka主題connect-test中,因此我們還可以運(yùn)行控制臺consumer查看主題中的數(shù)據(jù)(或使用自定義consumer程序代碼來處理):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

...

我們可以繼續(xù)向文件中添加數(shù)據(jù),并查看它通過管道中的傳送:

echo "Another line" >> test.txt

1.1.8 Step 8: 使用kafka Stream處理數(shù)據(jù)
Kafka Streams是Kafka用于實時流處理和分析存儲在Kafka服務(wù)器中數(shù)據(jù)的庫。 這個快速入門示例演示用詞庫編寫的WordCountDemo程序(代碼轉(zhuǎn)為Java 8 lambda表達(dá)式方便閱讀)。

KTable wordCounts = textLines 
    // Split each text line, by whitespace, into words.

    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

 

    // Ensure the words are available as record keys for the next aggregate operation.

    .map((key, value) -> **new** KeyValue<>(value, value))

 

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".

    .countByKey("Counts")

它實現(xiàn)WordCount算法,從輸入文本計算單詞出現(xiàn)的數(shù)量。 但是,與你看到的其他WordCount示例不同,該WordCount演示應(yīng)用程序設(shè)計為對無限的無界數(shù)據(jù)流進(jìn)行操作。 與有界變量類似,它有一種有狀態(tài)算法,用于跟蹤和更新單詞的計數(shù)。 然而,由于它必須假定潛在的無界輸入數(shù)據(jù),它將周期性地輸出其當(dāng)前狀態(tài)和結(jié)果,同時繼續(xù)處理更多的數(shù)據(jù),因為它不知道它何時處理了“全部”輸入數(shù)據(jù)。
我們現(xiàn)在將準(zhǔn)備輸入數(shù)據(jù)到Kafka主題,隨后將由Kafka Streams應(yīng)用程序處理。

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

Windows:

echo all streams lead to kafka> file-input.txt

echo hello kafka streams>> file-input.txt

echo|set /p=join kafka summit>> file-input.txt

接下來,我們使用控制臺生成器將輸入數(shù)據(jù)發(fā)送到名為streams-file-input的主題(實際上,流數(shù)據(jù)可能會連續(xù)流入Kafka,應(yīng)用程序?qū)⒉⑿羞\(yùn)行):

bin/kafka-topics.sh --create \

        --zookeeper localhost:2181 \

        --replication-factor 1 \

        --partitions 1 \

        --topic streams-file-input

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

我們現(xiàn)在可以運(yùn)行WordCount演示應(yīng)用程序來處理輸入的數(shù)據(jù):

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

除了日志條目,將不會有任何STDOUT輸出,因為結(jié)果被連續(xù)寫回Kafka中名為streams-wordcount-output的另一個主題。 演示將運(yùn)行幾秒鐘,然后不像典型的流處理應(yīng)用程序會自動終止。

我們現(xiàn)在可以通過從其輸出主題中讀取數(shù)據(jù)來檢查WordCount演示應(yīng)用程序的輸出:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

        --topic streams-wordcount-output \

        --from-beginning \

        --formatter kafka.tools.DefaultMessageFormatter \

        --property print.key=**true** \

        --property print.value=**true** \

        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

輸出如下:

all 1

lead 1

to 1

hello 1

streams 2

join 1

kafka 3

summit 1

這里,第一列是Kafka消息鍵,第二列是消息值,是java.lang.String格式。 注意,輸出實際上是連續(xù)的更新流,其中每個數(shù)據(jù)記錄(如上面輸出中的每一行)是每個單詞的更新計數(shù)。 對于具有相同鍵的多個記錄,后的每條統(tǒng)計記錄都是前一次的更新。
現(xiàn)在,您可以向streams-file-input主題寫入更多輸入消息,并觀察添加到
streams-wordcount-output主題的消息,查看跟新記錄。你可以通過Ctrl-C中斷consumer。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,985評論 4 54
  • 一、基本概念 介紹 Kafka是一個分布式的、可分區(qū)的、可復(fù)制的消息系統(tǒng)。它提供了普通消息系統(tǒng)的功能,但具有自己獨(dú)...
    ITsupuerlady閱讀 1,716評論 0 9
  • Kafka作為一個分布式的流平臺,這到底意味著什么? 我們認(rèn)為,一個流處理平臺具有三個關(guān)鍵能力: 發(fā)布和訂閱消息(...
    晴天哥_王志閱讀 531評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,641評論 19 139
  • 挺好,這樣!
    vspigcom閱讀 159評論 0 1

友情鏈接更多精彩內(nèi)容