簡介
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。使用Scala編寫,它以可水平擴(kuò)展和高吞吐率而被廣泛使用。
Kafka架構(gòu)
Terminoliogy
Broker:Kafka集群包含一個或多個服務(wù)器,這種服務(wù)器被稱為broker
代理、中介者-
Topic:每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為 Topic
物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處
主題 Partition:Parition是物理上的概念,每個Topic包含一個或多個Partition
分割、分區(qū)
Producer:負(fù)責(zé)發(fā)布消息到Kafka broker
Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
-
Consumer Group:每個Consumer屬于一個特定的Consumer Group
可為每個Consumer指定group name,若不指定group name則屬于默認(rèn)的group
Kafka拓?fù)浣Y(jié)構(gòu)

如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費消息。
Topic & Partition
Topic =Queue

Topic在邏輯上可以被認(rèn)為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個 Partition,每個Partition在物理上對應(yīng)一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個topic,且分別有13個和19個分區(qū),則整個集群上會相應(yīng)會生成共32個文件夾(本文所用集群共8個節(jié)點,此處topic1和topic2 replication-factor均為1),如下圖所示。

每個日志文件都是一個log entrie序列,每個log entrie包含一個4字節(jié)整型數(shù)值(值為N+5),1個字節(jié)的"magic value",4個字節(jié)的CRC校驗碼,其后跟N個字節(jié)的消息體。每條消息都有一個當(dāng)前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes
這個log entries并非由一個文件構(gòu)成,而是分成多個segment,每個segment以該segment第一條消息的offset命名并以“.kafka”為后綴。另外會有一個索引文件,它標(biāo)明了每個segment下包含的log entry的offset范圍,如下圖所示。

因為每條消息都被append到該Partition中,屬于順序?qū)懘疟P,因此效率非常高(經(jīng)驗證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是Kafka高吞吐率的一個很重要的保證)。

對于傳統(tǒng)的message queue而言,一般會刪除已經(jīng)被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當(dāng)然,因為磁盤限制,不可能永久保留所有數(shù)據(jù)(實際上也沒必要),因此Kafka提供兩種策略刪除舊數(shù)據(jù)。一是基于時間,二是基于Partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數(shù)據(jù),也可在Partition文件超過1GB時刪除舊數(shù)據(jù),配置如下所示。
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
這里要注意,因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka性能無關(guān)。選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。另外,Kafka會為每一個Consumer Group保留一些metadata信息——當(dāng)前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息后遞增該offset。當(dāng)然,Consumer也可將offset設(shè)成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態(tài)的,它不需要標(biāo)記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機(jī)制,這也為Kafka的高吞吐率提供了有力保障。
備注:
Kafka讀取特定消息的時間復(fù)雜度為O(1),按順序讀取,即不受文件大小約束
Kafka broker是無狀態(tài)的,broker沒有鎖機(jī)制,提高了高吞吐率
Producer消息路由
Producer發(fā)送消息到broker時,會根據(jù)Paritition機(jī)制選擇將其存儲到哪一個Partition。如果Partition機(jī)制設(shè)置合理,所有消息可以均勻分布到不同的Partition里,這樣就實現(xiàn)了負(fù)載均衡。如果一個Topic對應(yīng)一個文件,那這個文件所在的機(jī)器I/O將會成為這個Topic的性能瓶頸,而有了Partition后,不同的消息可以并行寫入不同broker的不同Partition里,極大的提高了吞吐率??梢栽?KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認(rèn)Partition數(shù)量,也可在創(chuàng)建Topic時通過參數(shù)指定,同時也可以在Topic創(chuàng)建之后通過Kafka提供的工具修改。
備注:不同的消息可以并行寫入不同broker的不同Partition里
在發(fā)送一條消息時,可以指定這條消息的key,Producer根據(jù)這個key和Partition機(jī)制來判斷應(yīng)該將這條消息發(fā)送到哪個Parition。Paritition機(jī)制可以通過指定Producer的paritition. class這一參數(shù)來指定,該class必須實現(xiàn)kafka.producer.Partitioner接口。本例中如果key可以被解析為整數(shù)則將對應(yīng)的整數(shù)與Partition總數(shù)取余,該消息會被發(fā)送到該數(shù)對應(yīng)的Partition。(每個Parition都會有個序號,序號從0開始)
//發(fā)送消息時,按照此機(jī)制將消息發(fā)送到對應(yīng)的Parition上 (key---partition)
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner
{
public JasonPartitioner(VerifiableProperties verifiableProperties)
{
}
@Override
public int partition(Object key, int numPartitions)
{
try
{
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
}
catch (Exception e)
{
return Math.abs(key.hashCode() % numPartitions);
}
}
}
如果將上例中的類作為partition.class,并通過如下代碼發(fā)送20條消息(key分別為0,1,2,3)至topic3(包含4個Partition)。
//將20條信息發(fā)送到topic3對應(yīng)的4個Partition中
public void sendMessage() throws InterruptedException
{
for(int i = 1; i <= 5; i++)
{
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++)//4個partition
{
//第i個消息給第j個paitition
messageList.add(new KeyedMessage<String, String>
("topic2", j+"", "The " + i + message for key " + j));
}
producer.send(messageList);
}
producer.close();
}
則key相同的消息會被發(fā)送并存儲到同一個partition里,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程序調(diào)用Consumer后打印出的消息列表。

Consumer Group
(本節(jié)所有描述都是基于Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內(nèi)的一個Consumer消費,但多個Consumer Group可同時消費這一消息。

廣播:不同的Consumer Group
單播:在同一個Consumer Group
離線處理:Hadoop
實時處理:Storm
這是Kafka用來實現(xiàn)一個Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個Consumer)的手段。一個Topic可以對應(yīng)多個Consumer Group。如果需要實現(xiàn)廣播,只要每個Consumer有一個獨立的Group就可以了。要實現(xiàn)單播只要所有的Consumer在同一個Group里。用Consumer Group還可以將Consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的Topic。
實際上,Kafka的設(shè)計理念之一就是同時提供離線處理和實時處理。根據(jù)這一特性,可以使用Storm這種實時流處理系統(tǒng)對消息進(jìn)行實時在線處理,同時使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時將數(shù)據(jù)實時備份到另一個數(shù)據(jù)中心,只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。

下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先創(chuàng)建一個Topic (名為topic1,包含3個Partition),然后創(chuàng)建一個屬于group1的Consumer實例,并創(chuàng)建三個屬于group2的Consumer實例,最后通過Producer向topic1發(fā)送key分別為1,2,3的消息。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了key為1,2,3的消息。如下圖所示。

Push vs. Pull
作為一個消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實上,push模式和pull模式各有優(yōu)劣。
push模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。push模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)Consumer的消費能力以適當(dāng)?shù)乃俾氏M消息。
對于Kafka而言,pull模式更合適。pull模式可簡化broker的設(shè)計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
Kafka delivery guarantee
有這么幾種可能的delivery guarantee:
At most once 消息可能會丟,但絕不會重復(fù)傳輸
At least one 消息絕不會丟,但可能會重復(fù)傳輸
-
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
當(dāng)Producer向broker發(fā)送消息時,一旦這條消息被commit,因數(shù)replication的存在,它就不會丟。但是如果Producer發(fā)送數(shù)據(jù)給broker后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那Producer就無法判斷該條消息是否已經(jīng)commit。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是Producer可以生成一種類似于主鍵的東西,發(fā)生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還并未實現(xiàn),有希望在Kafka未來的版本中實現(xiàn)。(所以目前默認(rèn)情況下一條消息從Producer到broker是確保了At least once,可通過設(shè)置Producer異步發(fā)送實現(xiàn)At most once)。
接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息后,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當(dāng)然可以將Consumer設(shè)置為autocommit,即Consumer一旦讀到數(shù)據(jù)立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應(yīng)用程序并非在Consumer讀取完數(shù)據(jù)就結(jié)束了,而是要進(jìn)行進(jìn)一步處理,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
讀完消息先commit再處理消息。這種模式下,如果Consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應(yīng)于At most once
讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認(rèn)為是Exactly once。(筆者認(rèn)為這種說法比較牽強(qiáng),畢竟它不是Kafka本身提供的機(jī)制,主鍵本身也并不能完全保證操作的冪等性。而且實際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結(jié)果怎樣,因為處理方式多種多樣,我們不應(yīng)該把處理過程的特性——如是否冪等性,當(dāng)成Kafka本身的Feature)
如果一定要做到Exactly once,就需要協(xié)調(diào)offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統(tǒng)可能不支持兩階段提交。比如,Consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS,如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,要么都不完成,間接實現(xiàn)Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護(hù)的,可以將之存于HDFS中)
總之,Kafka默認(rèn)保證At least once,并且允許通過設(shè)置Producer異步提交來實現(xiàn)At most once。而Exactly once要求與外部存儲系統(tǒng)協(xié)作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。