分布式消息通信Kafka(一) - 基本操作

0 相關(guān)源碼

1 Kafka的簡介

1.1 什么是 Kafka

Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng),具有高性能、高吞吐量的特點(diǎn)而被 廣泛應(yīng)用與大數(shù)據(jù)傳輸場景。
它是由 LinkedIn 公司開發(fā),使用 Scala 語言編 寫,之后成為 Apache 基金會(huì)的一個(gè)頂級(jí)項(xiàng)目。
Kafka 提供了類似 JMS 的特 性,但是在設(shè)計(jì)和實(shí)現(xiàn)上是完全不同的,而且他也不是 JMS 規(guī)范的實(shí)現(xiàn)。

1.2 Kafka 產(chǎn)生的背景

Kafka 作為一個(gè)消息系統(tǒng),早起設(shè)計(jì)的目的是用作 LinkedIn 的活動(dòng)流(Activity Stream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)。

  • 活動(dòng)流數(shù)據(jù)是所有的網(wǎng)站對(duì)用戶的使用情況做分析的時(shí)候要用到的最常規(guī)的部分,活動(dòng)數(shù)據(jù)包括頁面的訪問量(PV)、被查看內(nèi)容方面的信息以及搜索內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動(dòng)以日志的形式寫入某種文件,然后周期性的對(duì)這些文件進(jìn)行統(tǒng)計(jì)分 析。
  • 運(yùn)營數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU、IO 使用率、請(qǐng)求時(shí)間、服務(wù)日志等)。

1.3 Kafka 的應(yīng)用場景

由于具有更好的吞吐量、內(nèi)置分區(qū)、冗余及容錯(cuò)性的優(yōu)點(diǎn)(每秒可以處理幾十萬消息),讓 Kafka 成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案。所以在企業(yè)級(jí)應(yīng)用長,主要會(huì)應(yīng)用于如下幾個(gè)方面

1.3.1 行為跟蹤

Kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為。通過發(fā)布-訂閱模式實(shí)時(shí)記錄到對(duì)應(yīng)的 topic 中,通過后端大數(shù)據(jù)平臺(tái)接入處理分析,并做更進(jìn)一步的實(shí)時(shí)處理和監(jiān)控

1.3.2 日志收集

日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如ApacheFlume,很多公司使用 Kafka 代理日志聚合。

日志聚合表示從服務(wù)器上收集日志文件,然后放到一個(gè)集中的平臺(tái)(文件服務(wù)器)進(jìn)行處理。

在實(shí)際應(yīng)用開發(fā)中,我們應(yīng)用程序的 log 都會(huì)輸出到本地磁盤上, 排查問題的話通過 Linux 命令搞定,如果應(yīng)用程序組成了負(fù)載均衡集群,并且集群的機(jī)器有幾十臺(tái)以上,那么想通過日志快速定位到問題,就是很麻煩的事情了。
所以一般都會(huì)做一個(gè)日志統(tǒng)一收集平臺(tái)管理 log 日志,來快速查詢重要應(yīng)用的問題。所以很多公司的套路都是 把應(yīng)用日志集中到 Kafka 上,然后分別導(dǎo)入到 es 和 hdfs 上,用來做實(shí)時(shí)檢索分析和離線統(tǒng)計(jì)數(shù)據(jù)備份等。而另一方面,Kafka 本身又提供了很好的 API 來集成日志并且做日志收集

主要設(shè)計(jì)目標(biāo)

  • 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問性能
  • 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條以上消息的傳輸
  • 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)Partition內(nèi)的消息順序傳輸
  • 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理
  • Scale out:支持在線水平擴(kuò)展

2 為何使用消息系統(tǒng)

  • 解耦
      在項(xiàng)目啟動(dòng)之初來預(yù)測將來項(xiàng)目會(huì)碰到什么需求,是極其困難的。消息系統(tǒng)在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

  • 冗余
      有些情況下,處理數(shù)據(jù)的過程會(huì)失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的”插入-獲取-刪除”范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

  • 擴(kuò)展性
      因?yàn)橄㈥?duì)列解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡單。

  • 靈活性 & 峰值處理能力
      在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。

  • 可恢復(fù)性
      系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

  • 順序保證
      在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。Kafka保證一個(gè)Partition內(nèi)的消息的有序性。

  • 緩沖
      在任何重要的系統(tǒng)中,都會(huì)有需要不同的處理時(shí)間的元素。例如,加載一張圖片比應(yīng)用過濾器花費(fèi)更少的時(shí)間。消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行———寫入隊(duì)列的處理會(huì)盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。

  • 異步通信
      很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

3 常用Message Queue對(duì)比

  • RabbitMQ
      RabbitMQ是使用Erlang編寫的一個(gè)開源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級(jí),更適合于企業(yè)級(jí)的開發(fā)。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)。對(duì)路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。

  • Redis
      Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫,開發(fā)維護(hù)很活躍。雖然它是一個(gè)Key-Value數(shù)據(jù)庫存儲(chǔ)系統(tǒng),但它本身支持MQ功能,所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來使用。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬次,每10萬次記錄一次執(zhí)行時(shí)間。測試數(shù)據(jù)分為128Bytes、512Bytes、1K和10K四個(gè)不同大小的數(shù)據(jù)。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ,而如果數(shù)據(jù)大小超過了10K,Redis則慢的無法忍受;出隊(duì)時(shí),無論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis。

  • ZeroMQ
      ZeroMQ號(hào)稱最快的消息隊(duì)列系統(tǒng),尤其針對(duì)大吞吐量的需求場景。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長的高級(jí)/復(fù)雜的隊(duì)列,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪葸@個(gè)服務(wù)器角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但是ZeroMQ僅提供非持久性的隊(duì)列,也就是說如果宕機(jī),數(shù)據(jù)將會(huì)丟失。其中,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)。

  • ActiveMQ
      ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類似于ZeroMQ,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。同時(shí)類似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場景。

  • Kafka/Jafka
      Kafka是Apache下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵化而來的,即Kafka的一個(gè)升級(jí)版。具有以下特性:快速持久化,可以在O(1)的系統(tǒng)開銷下進(jìn)行消息持久化;高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng),Broker、Producer、Consumer都原生自動(dòng)支持分布式,自動(dòng)實(shí)現(xiàn)負(fù)載均衡;支持Hadoop數(shù)據(jù)并行加載,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka通過Hadoop的并行加載機(jī)制統(tǒng)一了在線和離線的消息處理。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。

4 Kafka 本身的架構(gòu)

4.1 典型的 Kafka 集群包含

4.1.1 若干 Producer

可以是應(yīng)用節(jié)點(diǎn)產(chǎn)生的消息,也可以是通過 Flume 收集日志產(chǎn)生的事件

Producer使用 push 模式將消息發(fā)布到 broker,

4.1.2 若干 Broker

kafka 支持水平擴(kuò)展

多個(gè) broker 協(xié)同工作

4.1.3 若干Consumer Group

consumer 通過監(jiān)聽使用 pull 模式從 broker 訂閱并消費(fèi)消息

4.1.4 一個(gè) zookeeper 集群

Kafka 通過 zookeeper 管理集群配置及服務(wù)協(xié)同.

producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中。三者通過 zookeeper 管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)。

下圖有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn),producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費(fèi)消息的過程是 pull,主動(dòng)去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer


Terminology

  • Broker
      Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker
  • Topic
      每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。(物理上不同Topic的消息分開存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
  • Partition
      Parition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.
  • Producer
      負(fù)責(zé)發(fā)布消息到Kafka broker
  • Consumer
      消息消費(fèi)者,向Kafka broker讀取消息的客戶端。
  • Consumer Group
      每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)。

拓?fù)浣Y(jié)構(gòu)


  如上圖所示,一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費(fèi)消息。

Topic & Partition

Topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消費(fèi)都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里。
為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè)Partition的所有消息和索引文件。
若創(chuàng)建topic1和topic2兩個(gè)topic,且分別有13個(gè)和19個(gè)分區(qū),則整個(gè)集群上會(huì)相應(yīng)會(huì)生成共32個(gè)文件夾(本文所用集群共8個(gè)節(jié)點(diǎn),此處topic1和topic2 replication-factor均為1),如下圖所示。
  


每個(gè)日志文件都是一個(gè)log entry序列,每個(gè)log entry包含一個(gè)4字節(jié)整型數(shù)值(值為N+5),1個(gè)字節(jié)的”magic value”,4個(gè)字節(jié)的CRC校驗(yàn)碼,其后跟N個(gè)字節(jié)的消息體。每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲(chǔ)的消息格式如下:

  message length : 4 bytes (value: 1+4+n)
  “magic” value : 1 byte
  crc : 4 bytes
  payload : n bytes

這個(gè)log entry并非由一個(gè)文件構(gòu)成,而是分成多個(gè)segment,每個(gè)segment以該segment第一條消息的offset命名并以“.kafka”為后綴。
另外會(huì)有一個(gè)索引文件,它標(biāo)明了每個(gè)segment下包含的log entry的offset范圍,如下圖所示。

因?yàn)槊織l消息都被append到該P(yáng)artition中,屬于順序?qū)懘疟P,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是Kafka高吞吐率的一個(gè)很重要的保證)。
  


對(duì)于傳統(tǒng)的message queue而言,一般會(huì)刪除已經(jīng)被消費(fèi)的消息,而Kafka集群會(huì)保留所有的消息,無論其被消費(fèi)與否。
當(dāng)然,因?yàn)榇疟P限制,不可能永久保留所有數(shù)據(jù)(也沒必要),因此Kafka提供兩種策略刪除舊數(shù)據(jù)。

  • 一是基于時(shí)間
  • 二是基于Partition文件大小。

例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數(shù)據(jù),也可在Partition文件超過1GB時(shí)刪除舊數(shù)據(jù),配置如下所示。

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

注意,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka性能無關(guān)。選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。
另外,Kafka會(huì)為每一個(gè)Consumer Group保留一些metadata信息——當(dāng)前消費(fèi)的消息的position,也即offset。這個(gè)offset由Consumer控制。正常情況下Consumer會(huì)在消費(fèi)完一條消息后遞增該offset。當(dāng)然,Consumer也可將offset設(shè)成一個(gè)較小的值,重新消費(fèi)一些消息。因?yàn)閛ffet由Consumer控制,所以Kafka broker是無狀態(tài)的,它不需要標(biāo)記哪些消息被哪些消費(fèi)過,也不需要通過broker去保證同一個(gè)Consumer Group只有一個(gè)Consumer能消費(fèi)某一條消息,因此也就不需要鎖機(jī)制,這也為Kafka的高吞吐率提供了有力保障。

Producer消息路由

Producer發(fā)送消息到broker時(shí),會(huì)根據(jù)Paritition機(jī)制選擇將其存儲(chǔ)到哪一個(gè)Partition。
如果Partition機(jī)制設(shè)置合理,所有消息可以均勻分布到不同的Partition里,這樣就實(shí)現(xiàn)了負(fù)載均衡。如果一個(gè)Topic對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器I/O將會(huì)成為這個(gè)Topic的性能瓶頸,而有了Partition后,不同的消息可以并行寫入不同broker的不同Partition里,極大的提高了吞吐率??梢栽?code>$KAFKA_HOME/config/server.properties中通過配置項(xiàng)num.partitions來指定新建Topic的默認(rèn)Partition數(shù)量,也可在創(chuàng)建Topic時(shí)通過參數(shù)指定,同時(shí)也可以在Topic創(chuàng)建之后通過Kafka提供的工具修改。

在發(fā)送一條消息時(shí),可以指定這條消息的key,Producer根據(jù)這個(gè)key和Partition機(jī)制來判斷應(yīng)該將這條消息發(fā)送到哪個(gè)Parition。Paritition機(jī)制可以通過指定Producer的paritition. class這一參數(shù)來指定,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口。本例中如果key可以被解析為整數(shù)則將對(duì)應(yīng)的整數(shù)與Partition總數(shù)取余,該消息會(huì)被發(fā)送到該數(shù)對(duì)應(yīng)的Partition。(每個(gè)Parition都會(huì)有個(gè)序號(hào),序號(hào)從0開始)

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
    
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

如果將上例中的類作為partition.class,并通過如下代碼發(fā)送20條消息(key分別為0,1,2,3)至topic3(包含4個(gè)Partition)。

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", String.valueOf(j), String.format("The %d message for key %d", i,  j));
        }
        producer.send(messageList);
    }
  producer.close();
}

則key相同的消息會(huì)被發(fā)送并存儲(chǔ)到同一個(gè)partition里,而且key的序號(hào)正好和Partition序號(hào)相同。(Partition序號(hào)從0開始,本例中的key也從0開始)。下圖所示是通過Java程序調(diào)用Consumer后打印出的消息列表。
  

Consumer Group

(本節(jié)所有描述都是基于Consumer hight level API而非low level API)。
  使用Consumer high level API時(shí),同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi),但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息。
  


  這是Kafka用來實(shí)現(xiàn)一個(gè)Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個(gè)Consumer)的手段。一個(gè)Topic可以對(duì)應(yīng)多個(gè)Consumer Group。如果需要實(shí)現(xiàn)廣播,只要每個(gè)Consumer有一個(gè)獨(dú)立的Group就可以了。要實(shí)現(xiàn)單播只要所有的Consumer在同一個(gè)Group里。用Consumer Group還可以將Consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的Topic。
  實(shí)際上,Kafka的設(shè)計(jì)理念之一就是同時(shí)提供離線處理和實(shí)時(shí)處理。根據(jù)這一特性,可以使用Storm這種實(shí)時(shí)流處理系統(tǒng)對(duì)消息進(jìn)行實(shí)時(shí)在線處理,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)實(shí)時(shí)備份到另一個(gè)數(shù)據(jù)中心,只需要保證這三個(gè)操作所使用的Consumer屬于不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
  
kafka sample deployment in linkedin

下面這個(gè)例子更清晰地展示了Kafka Consumer Group的特性。首先創(chuàng)建一個(gè)Topic (名為topic1,包含3個(gè)Partition),然后創(chuàng)建一個(gè)屬于group1的Consumer實(shí)例,并創(chuàng)建三個(gè)屬于group2的Consumer實(shí)例,最后通過Producer向topic1發(fā)送key分別為1,2,3的消息。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息,同時(shí)group2中的3個(gè)Consumer分別收到了key為1,2,3的消息。如下圖所示。
  

Push vs. Pull

作為一個(gè)消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實(shí)上,push模式和pull模式各有優(yōu)劣。
  push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的。push模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
  對(duì)于Kafka而言,pull模式更合適。pull模式可簡化broker的設(shè)計(jì),Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義。

Kafka delivery guarantee

有這么幾種可能的delivery guarantee:

  • At most once 消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸

  • At least one 消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸

  • Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次,很多時(shí)候這是用戶所想要的。

    當(dāng)Producer向broker發(fā)送消息時(shí),一旦這條消息被commit,因數(shù)replication的存在,它就不會(huì)丟。但是如果Producer發(fā)送數(shù)據(jù)給broker后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那Producer就無法判斷該條消息是否已經(jīng)commit。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是Producer可以生成一種類似于主鍵的東西,發(fā)生故障時(shí)冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還并未實(shí)現(xiàn),有希望在Kafka未來的版本中實(shí)現(xiàn)。(所以目前默認(rèn)情況下一條消息從Producer到broker是確保了At least once,可通過設(shè)置Producer異步發(fā)送實(shí)現(xiàn)At most once)。
      接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對(duì)Kafka consumer high level API)。Consumer在從broker讀取消息后,可以選擇commit,該操作會(huì)在Zookeeper中保存該Consumer在該P(yáng)artition中讀取的消息的offset。該Consumer下一次再讀該P(yáng)artition時(shí)會(huì)從下一條開始讀取。如未commit,下一次讀取的開始位置會(huì)跟上一次commit之后的開始位置相同。當(dāng)然可以將Consumer設(shè)置為autocommit,即Consumer一旦讀到數(shù)據(jù)立即自動(dòng)commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實(shí)際使用中應(yīng)用程序并非在Consumer讀取完數(shù)據(jù)就結(jié)束了,而是要進(jìn)行進(jìn)一步處理,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。

  • 讀完消息先commit再處理消息。這種模式下,如果Consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對(duì)應(yīng)于At most once

  • 讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer crash了,下次重新開始工作時(shí)還會(huì)處理剛剛未commit的消息,實(shí)際上該消息已經(jīng)被處理過了。這就對(duì)應(yīng)于At least once。在很多使用場景下,消息都有一個(gè)主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認(rèn)為是Exactly once。(筆者認(rèn)為這種說法比較牽強(qiáng),畢竟它不是Kafka本身提供的機(jī)制,主鍵本身也并不能完全保證操作的冪等性。而且實(shí)際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結(jié)果怎樣,因?yàn)樘幚矸绞蕉喾N多樣,我們不應(yīng)該把處理過程的特性——如是否冪等性,當(dāng)成Kafka本身的Feature)

  • 如果一定要做到Exactly once,就需要協(xié)調(diào)offset和實(shí)際操作的輸出。經(jīng)典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個(gè)地方,會(huì)更簡潔和通用。這種方式可能更好,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交。比如,Consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS,如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,要么都不完成,間接實(shí)現(xiàn)Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護(hù)的,可以將之存于HDFS中)
      總之,Kafka默認(rèn)保證At least once,并且允許通過設(shè)置Producer異步提交來實(shí)現(xiàn)At most once。而Exactly once要求與外部存儲(chǔ)系統(tǒng)協(xié)作,幸運(yùn)的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

5 Kafka 的安裝部署

下載安裝包

  • tar -zxvf 解壓安裝包

5.1 Kafka 目錄介紹

  • /bin 操作 : kafka 的可執(zhí)行腳本


  • /config 配置文件


  • /libs 依賴庫目錄


  • /logs 日志數(shù)據(jù)目錄

5.2 啟動(dòng)/停止 Kafka

  1. 需要先啟動(dòng) zookeeper,如果沒有搭建 zookeeper 環(huán)境,可以直接運(yùn)行Kafka 內(nèi)嵌的 zookeeper
    啟動(dòng)命令
bin/zookeeper-server-start.sh config/zookeeper.properties &
  1. 進(jìn)入 kafka 目錄,運(yùn)行
bin/kafka-server-start.sh -daemon config/server.properties &
  1. 進(jìn)入 kafka 目錄,運(yùn)行
bin/kafka-server-stop.sh config/server.properties

6 Kafka 的基本操作

6.1 創(chuàng)建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 - -partitions 1 --topic test
  • replication-factor
    表該 topic 需要在不同的 broker 中保存幾份,這里設(shè)置 成 1,表示在兩個(gè) broker 中保存兩份
  • partitions
    分區(qū)數(shù)

6.2 查看 topic

./kafka-topics.sh --list --zookeeper localhost:2181

6.3 查看 topic 屬性

bin/kafka-topics.sh --list --zookeeper localhost:2181

6.4 消費(fèi)消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommend1 --from-beginning

6.5 發(fā)送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommend1

to have launchd start kafka now and restart at login:

brew services start kafka

Or, if you don't want/need a background service you can just run:

  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

To have launchd start zookeeper now and restart at login:

brew services start zookeeper

Or, if you don't want/need a background service you can just run:

zkServer start

關(guān)注公眾號(hào):JavaEdge,掌握更多Java流行核心技術(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容