消息中間件一般用于各個(gè)模塊、系統(tǒng)之間的異步通信,降低各個(gè)模塊之間的耦合性。
Kafka作為一個(gè)分布式的流平臺,這到底意味著什么?
我們認(rèn)為,一個(gè)流處理平臺具有三個(gè)關(guān)鍵能力:
? ? 發(fā)布和訂閱消息(流),在這方面,它類似于一個(gè)消息隊(duì)列或企業(yè)消息系統(tǒng)。
? ? 以容錯(cuò)的方式存儲消息(流)。
? ? 在消息流發(fā)生時(shí)處理它們。
什么是kakfa的優(yōu)勢?
它應(yīng)用于2大類應(yīng)用:
? ? 構(gòu)建實(shí)時(shí)的流數(shù)據(jù)管道,可靠地獲取系統(tǒng)和應(yīng)用程序之間的數(shù)據(jù)。
? ? 構(gòu)建實(shí)時(shí)流的應(yīng)用程序,對數(shù)據(jù)流進(jìn)行轉(zhuǎn)換或反應(yīng)。
要了解kafka是如何做這些事情的,讓我們從下到上深入探討kafka的能力。
首先幾個(gè)概念:
? ? kafka作為一個(gè)集群運(yùn)行在一個(gè)或多個(gè)服務(wù)器上。
? ? kafka集群存儲的消息是以topic為類別記錄的。
? ? 每個(gè)消息(也叫記錄record,我習(xí)慣叫消息)是由一個(gè)key,一個(gè)value和時(shí)間戳構(gòu)成。
kafka有四個(gè)核心API:
? ? 應(yīng)用程序使用Producer API發(fā)布消息到1個(gè)或多個(gè)topic(主題)。
? ? 應(yīng)用程序使用Consumer API來訂閱一個(gè)或多個(gè)topic,并處理產(chǎn)生的消息。
? ? 應(yīng)用程序使用Streams API充當(dāng)一個(gè)流處理器,從1個(gè)或多個(gè)topic消費(fèi)輸入流,并生產(chǎn)一個(gè)輸出流到1個(gè)或多個(gè)輸出topic,有效地將輸入流轉(zhuǎn)換到輸出流。
? ?Connector API允許構(gòu)建或運(yùn)行可重復(fù)使用的生產(chǎn)者或消費(fèi)者,將topic連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如,一個(gè)關(guān)系數(shù)據(jù)庫的連接器可捕獲每一個(gè)變化。
Kafka所使用的基本術(shù)語:
Topic
Kafka將消息種子(Feed)分門別類,每一類的消息稱之為一個(gè)主題(Topic).
Producer
發(fā)布消息的對象稱之為主題生產(chǎn)者(Kafka topic producer)
Consumer
訂閱消息并處理發(fā)布的消息的種子的對象稱之為主題消費(fèi)者(consumers)
Broker
已發(fā)布的消息保存在一組服務(wù)器中,稱之為Kafka集群。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(Broker). 消費(fèi)者可以訂閱一個(gè)或多個(gè)主題(topic),并從Broker拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。
Kafka的保證(Guarantees)
? ? 生產(chǎn)者發(fā)送到一個(gè)特定的Topic的分區(qū)上,消息將會按照它們發(fā)送的順序依次加入,也就是說,如果一個(gè)消息M1和M2使用相同的producer發(fā)送,M1先發(fā)送,那么M1將比M2的offset低,并且優(yōu)先的出現(xiàn)在日志中。
????? ??? ??? ??消費(fèi)者收到的消息也是此順序。
? ? 如果一個(gè)Topic配置了復(fù)制因子(replication factor)為N, 那么可以允許N-1服務(wù)器宕機(jī)而不丟失任何已經(jīng)提交(committed)的消息。
kafka作為一個(gè)消息系統(tǒng)
Kafka的流與傳統(tǒng)企業(yè)消息系統(tǒng)相比的概念如何?
傳統(tǒng)的消息有兩種模式:隊(duì)列和發(fā)布訂閱。 在隊(duì)列模式中,消費(fèi)者池從服務(wù)器讀取消息(每個(gè)消息只被其中一個(gè)讀?。? 發(fā)布訂閱模式:消息廣播給所有的消費(fèi)者。這兩種模式都有優(yōu)缺點(diǎn),隊(duì)列的優(yōu)點(diǎn)是允許多個(gè)消費(fèi)者瓜分處理數(shù)據(jù),這樣可以擴(kuò)展處理。但是,隊(duì)列不像多個(gè)訂閱者,一旦消息者進(jìn)程讀取后故障了,那么消息就丟了。而發(fā)布訂閱允許你廣播數(shù)據(jù)到多個(gè)消費(fèi)者,由于每個(gè)訂閱者都訂閱了消息,所以沒辦法縮放處理。
kafka中消費(fèi)者組有兩個(gè)概念:隊(duì)列:消費(fèi)者組(consumer group)允許同名的消費(fèi)者組成員瓜分處理。發(fā)布訂閱:允許你廣播消息給多個(gè)消費(fèi)者組(不同名)。
kafka的每個(gè)topic都具有這兩種模式。
kafka有比傳統(tǒng)的消息系統(tǒng)更強(qiáng)的順序保證。
傳統(tǒng)的消息系統(tǒng)按順序保存數(shù)據(jù),如果多個(gè)消費(fèi)者從隊(duì)列消費(fèi),則服務(wù)器按存儲的順序發(fā)送消息,但是,盡管服務(wù)器按順序發(fā)送,消息異步傳遞到消費(fèi)者,因此消息可能亂序到達(dá)消費(fèi)者。這意味著消息存在并行消費(fèi)的情況,順序就無法保證。消息系統(tǒng)常常通過僅設(shè)1個(gè)消費(fèi)者來解決這個(gè)問題,但是這意味著沒用到并行處理。
kafka做的更好。通過并行topic的parition ——kafka提供了順序保證和負(fù)載均衡。每個(gè)partition僅由同一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)到。并確保消費(fèi)者是該partition的唯一消費(fèi)者,并按順序消費(fèi)數(shù)據(jù)。每個(gè)topic有多個(gè)分區(qū),則需要對多個(gè)消費(fèi)者做負(fù)載均衡,但請注意,相同的消費(fèi)者組中不能有比分區(qū)更多的消費(fèi)者,否則多出的消費(fèi)者一直處于空等待,不會收到消息。
kafka的安裝和啟動
Zookeeper安裝
①、進(jìn)入Zookeeper解壓目錄,C:\softdownload\zookeeper\conf
②、將“zoo_sample.cfg”重命名為“zoo.cfg”。
③、配置啟動日記目錄,用#注解調(diào) dataDir=/tmp/zookeeper
dataDir=C:\\softdownload\\zookeeper\\data
④、系統(tǒng)環(huán)境變量中添加:ZOOKEEPER_HOME =?C:\softdownload\zookeeper
⑤、編輯系統(tǒng)變量path,加上: ZOOKEEPER_HOME%\bin;
⑥、確認(rèn)zoo.cfg文件中默認(rèn)的Zookeeper端口(默認(rèn)端口2181)。
打開新的cmd,進(jìn)入安裝目錄:cd C:\softdownload\zookeeper\bin,輸入zkserver,運(yùn)行Zookeeper。
安裝Kafka
①、進(jìn)入Kafka配置目錄,C:\softdownload\kafka\config
編輯文件“server.properties”
找到并用#注解“l(fā)og.dirs=/tmp/kafka-logs”
添加自己的日記目錄:log.dirs=C:\softdownload\kafka\kafka-logs
②、如果Zookeeper在某些其他的機(jī)器或集群上運(yùn)行,可以將“zookeeper.connect:2181”修改為自定義IP與端口。在這里使用了同一個(gè)機(jī)器,所以沒其他做修改。文件中的Kafka端口和broker.id也是可以配置的。默認(rèn)設(shè)置不變。機(jī)器的localhost也為127.0.0.1,這里我也修改為ipv4的,防止localhost為ipv6時(shí)受影響。
③、Kafka會按照默認(rèn),在9092端口上運(yùn)行,并連接zookeeper的默認(rèn)端口:2181。
在zookeeper的基礎(chǔ)上,運(yùn)行Kafka服務(wù)
進(jìn)入Kafka安裝目錄,C:\softdownload\kafka,切換到命令行窗口,運(yùn)行kafka。
輸入命令:.\bin\windows\kafka-server-start.bat .\config\server.properties
創(chuàng)建主題Topic
1. 現(xiàn)在創(chuàng)建主題,命名為“test”,replication factor=1(因?yàn)橹挥?個(gè)Kafka服務(wù)器在運(yùn)行)。如果集群中所運(yùn)行的Kafka服務(wù)器不止1個(gè),可以相應(yīng)增加replication-factor,從而提高數(shù)據(jù)可用性和系統(tǒng)容錯(cuò)性。
2. 在C:\softdownload\kafka\bin\windows打開新的命令行。
3. 輸入下面的命令,回車:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
創(chuàng)建Producer及Consumer來測試服務(wù)器。
1.在C:\softdownload\kafka\bin\windows打開新的命令行。
2.輸入以下命令,啟動producer,可以輸入消息:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
0.9版本以上新命令:.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test --producer.config .\config\producer.properties
3.在同樣的位置C:\softdownload\kafka\bin\windows再次打開新的命令行。
4.現(xiàn)在輸入下列命令啟動consumer,可以獲取消息:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
0.9版本以上新命令:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test?--new-consumer --from-beginning --consumer.config .\config\consumer.properties
5.兩個(gè)命令行窗口,producer可以輸入任何消息,consumer可以獲取消息。
使用JavaAPI連接遠(yuǎn)程服務(wù)器,要在service.properties配置host.name=192.168.111.111(遠(yuǎn)程IP地址),這主要是因?yàn)椋琸afka默認(rèn)是監(jiān)聽localhost的端口,如果不配置新端口名的話,就解析監(jiān)聽不到消息。
PS:上面總結(jié)的,有什么不足之處,歡迎更正補(bǔ)充。