kafka快速開始教程

此教程假設(shè)你剛剛開始沒有任何 Kafka 或 ZooKeeper 數(shù)據(jù)。Kafka的控制臺腳本在類Unix和Windows平臺不同,Windows平臺使用bin\windows\\代替bin/,腳本的擴(kuò)展名改為.bat。

第一步:下載代碼

下載0.10.1.0發(fā)行版并解壓。

> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0

第二步:啟動服務(wù)

Kafka使用Zookeeper,所以如果你沒有的話需要首先啟動Zookeeper服務(wù)。你可以使用kafka自帶的腳本啟動一個簡單的單一節(jié)點(diǎn)Zookeeper實(shí)例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

現(xiàn)在啟動Kafka服務(wù):

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

第三步:創(chuàng)建一個主題

讓我們來創(chuàng)建一個名為test的topic,只使用單個分區(qū)和一個復(fù)本。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我們現(xiàn)在可以運(yùn)行l(wèi)ist topic命令看到我們的主題。

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

另外,當(dāng)沒有主題存在的時候,你也可以通過配置代理自動創(chuàng)建主題而不是手動創(chuàng)建。

第四步:發(fā)送消息

Kafka有自帶的命令行客戶端會從文件或者標(biāo)準(zhǔn)輸入接受數(shù)據(jù)當(dāng)作消息發(fā)送到Kafka集群。默認(rèn)情況下,每行作為一個獨(dú)立的消息發(fā)送。

運(yùn)行生產(chǎn)者控制臺并且打幾行消息到控制臺發(fā)送到服務(wù)器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

第5步:啟動一個消費(fèi)者

Kafka還有個消費(fèi)者控制臺,會把消息輸出到標(biāo)準(zhǔn)輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你上面的命令是在不同的終端運(yùn)行,那么你可以在生產(chǎn)者終端輸入消息然后在消費(fèi)者終端看到。

所有的命令行工具都有一些額外的參數(shù):如果沒有使用參數(shù)運(yùn)行命令,將會顯示它們的詳細(xì)用法。

第六步:設(shè)置多個代理集群

目前為止,我們已經(jīng)在單個代理上運(yùn)行了,但這不好玩。對于Kafka,單個代理只是大小為1的集群。所以沒什么改變除了多啟動幾個代理實(shí)例。只是為了感受一下,我們把集群擴(kuò)展到3個節(jié)點(diǎn)(仍然在我們的本地機(jī)器上)。

首先,我們?yōu)槊總€代理新建一個配置文件(在windows上使用copy命令):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現(xiàn)在編輯新文件設(shè)置一下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id屬性是唯一的,在集群的每個節(jié)點(diǎn)永久不變。因?yàn)槲覀冊趩闻_機(jī)器上運(yùn)行代理,必須重寫端口和日志目錄。

我們已經(jīng)有了Zookeeper并運(yùn)行了一個節(jié)點(diǎn),所以只需要啟動下面的兩個新節(jié)點(diǎn):

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
現(xiàn)在創(chuàng)建一個含有三個副本的主題:
```sh
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好,現(xiàn)在我們有了一個集群但是怎么知道哪個代理正在做什么?使用describe topice命令查看:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

這是輸出解釋。第一行給出了各個分區(qū)的概況,額外的每行都給出了一個分區(qū)的信息。由于我們只有一個主題的分區(qū),所以只有一行。

  • leader是負(fù)責(zé)當(dāng)前分區(qū)的所有讀寫請求。每個節(jié)點(diǎn)都將領(lǐng)導(dǎo)一個隨機(jī)選擇的分區(qū)。
  • replicas 是節(jié)點(diǎn)列表,復(fù)制分區(qū)日志,不管他們是不是leader或者即使它們還活著。
  • isrin-sync的集合。這是replicas列表當(dāng)前還活著的子集。

注意在我們的示例中節(jié)點(diǎn)1是唯一的主題分區(qū)領(lǐng)導(dǎo)者。

我們運(yùn)行同樣的命令查看我們已經(jīng)創(chuàng)建的原始主題在哪里:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

所以沒什么驚奇的,原始的主題沒有副本在節(jié)點(diǎn)0上,當(dāng)我們創(chuàng)建它時唯一存在的節(jié)點(diǎn)服務(wù)器。

讓我們發(fā)布一些消息到新的主題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在消費(fèi)這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現(xiàn)在測試容錯性。節(jié)點(diǎn)1是領(lǐng)導(dǎo)者,我們kill它。

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

領(lǐng)導(dǎo)關(guān)系已經(jīng)改為了從節(jié)點(diǎn)中的一個,節(jié)點(diǎn)1也不再in-sync復(fù)本集中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但是我們依然可以消費(fèi)消息即使之前接受的領(lǐng)導(dǎo)者已經(jīng)掛掉:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

第七部:使用Kafka Connect導(dǎo)入導(dǎo)出數(shù)據(jù)

在控制臺輸入輸出數(shù)據(jù)是很方便,但是你可能使用來自其他數(shù)據(jù)源的數(shù)據(jù)或者把Kafka的數(shù)據(jù)導(dǎo)出到其他的系統(tǒng)中。對于很多系統(tǒng),你可以直接使用Kafka Connect導(dǎo)入導(dǎo)出數(shù)據(jù)而不需要手寫自定義的集成代碼。

Kafka Connect是Kafka自帶的導(dǎo)入導(dǎo)出工具。它是運(yùn)行連接器的可擴(kuò)展工具,實(shí)現(xiàn)了集成外部系統(tǒng)的自定義邏輯。在快速教程里,我們會看到如何使用Kafka Connect的簡單連接器從文件導(dǎo)入數(shù)據(jù)到Kafka主題,再從kafka主題導(dǎo)出數(shù)據(jù)到文件。
首先,我們先創(chuàng)建一些測試數(shù)據(jù):

echo -e "foo\nbar" >test.txt

然后我們在Standalone模式啟動兩個連接器,Standalone模式表示他們運(yùn)行在一個本地進(jìn)程中。我們提供了三個配置文件作為參數(shù),第一個配置Kafka Connect進(jìn)程,包含通用的配置如Kafka 代理連接和數(shù)據(jù)序列話工具。剩余的文件每個都指定一個連接器。這些文件包含一個唯一連接器名,實(shí)例化的連接器類,和一些其他的連接器配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

這些示例配置文件,包含在Kafka中,使用默認(rèn)的本地集群配置并創(chuàng)建了兩個連接器:第一個是源連接器從文件讀取數(shù)據(jù)行每行都生成消息發(fā)送到kafka主題,第二個是目標(biāo)連接器從Kafka主題讀取消息生成行輸出到文件中。

在啟動的時候你會看到大量的日志信息,包含一些表示連接器初始化的。一旦Kafka Connect進(jìn)程啟動,源連接器開始從test.txt讀取行并發(fā)送到connect-test主題,sink連接器開始從connect-test主題讀取消息把他們寫到test.sink.txt文件。我們可以檢查輸出文件看到數(shù)據(jù)已經(jīng)通過管道傳遞完畢:

> cat test.sink.txt
foo
bar

注意數(shù)據(jù)存儲在Kafka的主題connect-test中,我們可以運(yùn)行消費(fèi)者控制臺查看主題數(shù)據(jù)(或者消費(fèi)者代碼處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

連接器持續(xù)生成數(shù)據(jù),所以我們可以給測試文件添加數(shù)據(jù),看它通過管道。

> echo "Another line" >> test.txt

你應(yīng)該看到這行出現(xiàn)在消費(fèi)者控制臺和目標(biāo)文件中。

第八步:用Kafka Streams處理數(shù)據(jù)

Kafka Streams 是一個客戶端庫,為了實(shí)時流計(jì)算和分析Kafka集群中的存儲數(shù)據(jù)。此快速教程示例將會描述如何運(yùn)行一個用這個庫編寫的流程序。這是WordCountDemo的主要示例代碼(改成Java8 lambda表達(dá)式為了容易閱讀)。

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

它實(shí)現(xiàn)了WordCount算法,計(jì)算輸入文本單詞直方圖。然而,不同你之前可能見過的WordCount例子(數(shù)據(jù)是有限),這個Demo程序有些不同,因?yàn)樗O(shè)計(jì)為操作無限且沒有邊界的流數(shù)據(jù)。與有界變量類似,它是一個有狀態(tài)的算法跟蹤和修改單詞的總數(shù)。然而,由于它必須假設(shè)潛在的輸入數(shù)據(jù)無限多,當(dāng)處理更多數(shù)據(jù)時它會定期輸出它的當(dāng)前狀態(tài)和結(jié)果,因?yàn)樗恢朗裁磿r候所有輸入數(shù)據(jù)會處理完成。

我們將給Kafka主題添加一些數(shù)據(jù),隨后會被Kafka流程序處理。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

下一步,我們使用生產(chǎn)者控制臺發(fā)送一些輸入數(shù)據(jù)給主題streams-file-input(在實(shí)踐中,流數(shù)據(jù)會在系統(tǒng)啟動時持續(xù)不斷的流向Kafka系統(tǒng))。

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

現(xiàn)在可以運(yùn)行WordCount示例程序處理輸入數(shù)據(jù):

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

不會有任何標(biāo)準(zhǔn)輸出,日志項(xiàng)作為結(jié)果持續(xù)寫到另一個名為streams-wordcount-outputKafka主題中。示例會運(yùn)行幾秒鐘后自動停止而不像以便的流處理程序。

我們現(xiàn)在通過讀取它的輸出主題檢查WordCount 示例輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

以下輸出數(shù)據(jù)將會打印在控制臺上:

all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

第一列是Kafka消息鍵,第二列是消息的值,都是java.lang.String格式。注意輸出實(shí)際上是一個持續(xù)更新的流,每個數(shù)據(jù)記錄(例如上面的每行)是一個單詞的更新總數(shù),又或者是如kafka這樣的鍵。對一個鍵有多條記錄,每個后面的記錄都會更新前面的。

現(xiàn)在你可以寫更多的消息發(fā)送到streams-file-input主題中,并在streams-wordcount-output主題中觀察添加的消息,查看更新的單詞總數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容