Kafka學(xué)習(xí)筆記(二) :初探Kafka

看完上一篇,相信大家對消息系統(tǒng)以及Kafka的整體構(gòu)成都有了初步了解,學(xué)習(xí)一個(gè)東西最好的辦法,就是去使用它,今天就讓我們一起窺探一下Kafka,并完成自己的處女作。

消息在Kafka中的歷程

雖然我們掌握東西要一步一步來,但是我們在大致了解了一個(gè)東西后,會有利于我們對它的理解和學(xué)習(xí),所以我們可以先來看一下一條消息從發(fā)出到最后被消息者接收到底經(jīng)歷了什么?

message-flow.png

上圖簡要的說明了消息在Kafka中的整個(gè)流轉(zhuǎn)過程(假設(shè)已經(jīng)部署好了整個(gè)Kafka系統(tǒng),并創(chuàng)建了相應(yīng)的Topic,分區(qū)等細(xì)節(jié)后續(xù)再單獨(dú)講):

  • 1.消息生產(chǎn)者將消息發(fā)布到具體的Topic,根據(jù)一定算法或者隨機(jī)被分發(fā)到具體的分區(qū)中;
  • 2.根據(jù)實(shí)際需求,是否需要實(shí)現(xiàn)處理消息邏輯;
  • 3.若需要,則實(shí)現(xiàn)具體邏輯后將結(jié)果發(fā)布到輸出Topic;
  • 4.消費(fèi)者根據(jù)需求訂閱相關(guān)Topic,并消費(fèi)消息;

總的來說,怎么流程還是比較清晰和簡單的,下面就跟我一起來練習(xí)Kafka的基本操作,最后實(shí)現(xiàn)一個(gè)單詞計(jì)數(shù)的小demo。

基礎(chǔ)操作

以下代碼及相應(yīng)測試在以下環(huán)境測試通過:Mac OS + JDK1.8,Linux系統(tǒng)應(yīng)該也能跑通,Windows有興趣的同學(xué)可以去官網(wǎng)下載相應(yīng)版本進(jìn)行相應(yīng)的測試練習(xí)。

下載Kafka

Mac系統(tǒng)同學(xué)可以使用brew安裝:

brew install kafka

Linux系統(tǒng)同學(xué)可以從官網(wǎng)下載源碼解壓,也可以直接執(zhí)行以下命令:

cd 
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1

啟動

Kafka使用Zookeeper來維護(hù)集群信息,所以這里我們先要啟動Zookeeper,Kafka與Zookeeper的相關(guān)聯(lián)系跟結(jié)合后續(xù)再深入了解,畢竟不能一口吃成一個(gè)胖子。

bin/zookeeper-server-start.sh config/zookeeper.properties

接著我們啟動一個(gè)Kafka Server節(jié)點(diǎn):

bin/kafka-server-start.sh config/server.properties

這時(shí)候Kafka系統(tǒng)已經(jīng)算是啟動起來了。

創(chuàng)建Topic

在一切就緒之后,我們要開始做極其重要的一步,那就是創(chuàng)建Topic,Topic是整個(gè)系統(tǒng)流轉(zhuǎn)的核心,另外Topic本身也包含著很多復(fù)雜的參數(shù),比如復(fù)制因子個(gè)數(shù),分區(qū)個(gè)數(shù)等,這里為了從簡,我們將對應(yīng)的參數(shù)都設(shè)為1,方便大家測試:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test

其中參數(shù)的具體含義:

屬性 功能
--create 代表創(chuàng)建Topic
--zookeeper zookeeper集群信息
--replication-factor 復(fù)制因子
--partitions 分區(qū)信息
--topic Topic名稱

這時(shí)候我們已經(jīng)創(chuàng)建好了一個(gè)叫kakfa-test的Topic了。

向Topic發(fā)送消息

在有了Topic后我們就可以向其發(fā)送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test

然后我們向控制臺輸入一些消息:

this is my first test kafka
so good

這時(shí)候消息已經(jīng)被發(fā)布在kakfa-test這個(gè)主題上了。

從Topic獲取消息

現(xiàn)在Topic上已經(jīng)有消息了,現(xiàn)在可以從中獲取消息被消費(fèi):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning

這時(shí)候我們可以在控制臺看到:

this is my first test kafka
so good

至此我們就測試了最簡單的Kafka Demo,希望大家能自己動手去試試,雖然很簡單,但是這能讓你對整個(gè)Kafka流程能更熟悉。

WordCount

下面我們來利用上面的一些基本操作來實(shí)現(xiàn)一個(gè)簡單WordCount程序,它具備以下功能:

  • 1.支持詞組持續(xù)輸入,即生產(chǎn)者不斷生成消息;
  • 2.程序自動從輸入Topic中獲取原始數(shù)據(jù),然后經(jīng)過處理,將處理結(jié)果發(fā)布在計(jì)數(shù)Topic中;
  • 3.消費(fèi)者可以從計(jì)數(shù)Topic獲取相應(yīng)的WordCount的結(jié)果;

1.啟動kafka

與上文的啟動一樣,按照其操作即可。

2.創(chuàng)建輸入Topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1

3.向Topic輸入消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input

4.流處理邏輯

這部分內(nèi)容是整個(gè)例子的核心,這部分代碼有Java 8+和Scala版本,個(gè)人認(rèn)為流處理用函數(shù)式語法表達(dá)的更加簡潔清晰,推薦大家用函數(shù)式的思維去嘗試寫以下,發(fā)現(xiàn)自己再也不想寫Java匿名內(nèi)部類這種語法了。

我們先來看一個(gè)Java 8的版本:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
        Pattern pattern = Pattern.compile("\\W+");
        source
           .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
           .toStream()
           .to("kafka-word-count-output");
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

是不是很驚訝,用java也能寫出如此簡潔的代碼,所以說如果有適用場景,推薦大家嘗試的用函數(shù)式的思維去寫寫java代碼。

我們再來看看Scala版本的:


object WordCount {
  def main(args: Array[String]) {
    val props: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    val source: KStream[String, String] = builder.stream("kafka-word-count-input")
    source
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
    val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
    streams.start()
  }
}

可以發(fā)現(xiàn)使用Java 8函數(shù)式風(fēng)格編寫的代碼已經(jīng)跟Scala很相似了。

5.啟動處理邏輯

很多同學(xué)電腦上并沒有裝sbt,所以這里演示的利用Maven構(gòu)建的Java版本,具體執(zhí)行步驟請參考戳這里kafka-word-count上的說明。

6.啟動消費(fèi)者進(jìn)程

最后我們啟動消費(fèi)者進(jìn)程,并在生產(chǎn)者中輸入一些單詞,比如:

kafka-word-count-input.png

最后我們可以在消費(fèi)者進(jìn)程中看到以下輸出:

bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092  --property print.key=true
kafka-word-count-output.png

總結(jié)

本篇文章主要是講解了Kafka的基本運(yùn)行過程和一些基礎(chǔ)操作,但這是我們學(xué)習(xí)一個(gè)東西必不可少的一步,只有把基礎(chǔ)扎實(shí)好,才能更深入的去了解它,理解它為什么這么設(shè)計(jì),我在這個(gè)過程中也遇到很多麻煩,所以還是希望大家能夠自己動手去實(shí)踐一下,最終能收獲更多。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容