Kafka簡介和應用

kafka簡介

參考網(wǎng)址

http://www.cnblogs.com/likehua/p/3999538.html
http://www.infoq.com/cn/articles/apache-kafka/
http://www.infoq.com/cn/articles/kafka-analysis-part-1
http://www.infoq.com/cn/articles/kafka-analysis-part-2
http://www.infoq.com/cn/articles/kafka-analysis-part-3
http://www.infoq.com/cn/articles/kafka-analysis-part-4
http://www.infoq.com/cn/articles/kafka-analysis-part-5
http://www.aboutyun.com/thread-12882-1-1.html

question

  • partitions是如何分配的,是每個server上都有所有的partitions么,還是在每個server上只有某一份partitions? 如果是前者,如何節(jié)省磁盤空間的?

  • 每個分區(qū),多個server, 一個leader,分區(qū)可以有多個備份。

  • 本質上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費.也就是說可以通過group在內部實現(xiàn)consumer的負載均衡,在外部實現(xiàn)不同topic消息的隔離。

  • 在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的.

使用場景

1.messaging

對于一些常規(guī)的消息系統(tǒng),kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優(yōu)勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業(yè)級特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)

2.Websit activity tracking

kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實時監(jiān)控,或者離線統(tǒng)計分析等

3.Log Aggregation

kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng).

設計思路

1、持久性

kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統(tǒng)的本身特性.且無論任何OS下,對文件系統(tǒng)本身的優(yōu)化幾乎沒有可能.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數(shù),broker會將消息暫時buffer起來,當消息的個數(shù)(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數(shù).

2、性能

需要考慮的影響性能點很多,除磁盤IO之外,我們還需要考慮網(wǎng)絡IO,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以將消息buffer起來,當消息的條數(shù)達到一定閥值時,批量發(fā)送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對于kafka broker端,似乎有個sendfile系統(tǒng)調用可以潛在的提升網(wǎng)絡IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內存中,socket直接讀取相應的內存區(qū)域即可,而無需進程再次copy和交換. 其實對于producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡IO更應該需要考慮.可以將任何在網(wǎng)絡上傳輸?shù)南⒍冀涍^壓縮.kafka支持gzip/snappy等多種壓縮方式.

3、生產者

負載均衡: producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發(fā)送到broker,中間不會經過任何"路由層".事實上,消息被路由到哪個partition上,有producer客戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現(xiàn)"消息均衡分發(fā)"是必要的.

其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經注冊了watch用來監(jiān)聽partition leader的變更事件.
異步發(fā)送:將多條消息暫且在客戶端buffer起來,并將他們批量的發(fā)送到broker,小數(shù)據(jù)IO太多,會拖慢整體的網(wǎng)絡延遲,批量延遲發(fā)送事實上提升了網(wǎng)絡效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未發(fā)送的消息將會丟失。

4、消費者

consumer端向broker發(fā)送"fetch"請求,并告知其獲取消息的offset;此后consumer將會獲得一定條數(shù)的消息;consumer端也可以重置offset來重新消費消息.

在JMS實現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.不過在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull(或者說fetch)消息;這中模式有些優(yōu)點,首先consumer端可以根據(jù)自己的消費能力適時的去fetch消息并處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數(shù)量,batch fetch.

其他JMS實現(xiàn),消息消費的位置是有prodiver保留,以便避免重復發(fā)送消息或者將沒有消費成功的消息重發(fā)等,同時還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態(tài)的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級.

集成

單機集成環(huán)境

參考資料

http://colobu.com/2014/11/19/kafka-spring-integration-in-practice/
https://github.com/smallnest/spring-kafka-demo

通過spring boot來集成kafka

http://www.itdecent.cn/p/048e954dab40

Github上一個用spring-boot來集成kafka,mongodb,myibatis等等的例子:

https://github.com/xho22/spring-boot-dubbo-mongo-mybatis-kafka-liquibase

kafka使用

在kafka啟動的時候要同時啟動zookeeper和kafka server

命令如下:

bin/kafka-server-start.sh config/server.properties

bin/zookeeper-server-start.sh config/zookeeper.properties

具體配置可以修改server.properties和zookeeper.properties.

如果出現(xiàn)下面的錯誤,則是因為沒有啟動kafka server.

Caused by: kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
    at org.springframework.integration.kafka.listener.KafkaTopicOffsetManager.createCompactedTopicIfNotFound(KafkaTopicOffsetManager.java:268)
    at org.springframework.integration.kafka.listener.KafkaTopicOffsetManager.afterPropertiesSet(KafkaTopicOffsetManager.java:210)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
    ... 22 more

創(chuàng)建topic

bin/kafka-topics.sh  --create --zookeeper 10.160.5.56:2181  --replication-factor 1 --partitions 1 --topic test

bin/kafka-topics.sh --zookeeper 10.160.5.56:2181 -list

生產者

bin/kafka-console-producer.sh  --broker-list  10.160.5.56:9092 --topic test

消費者

bin/kafka-console-consumer.sh --zookeeper 10.160.5.56:2181  --topic test --from-beginning

注意

  • 新版本出了spring-kafka,一部分功能從spring-integration-kafka中移出來了,但是除了官方使用之外,網(wǎng)上資料很少

https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容