Kafka

Kafka簡介

Kafka由scala編寫,提供java.scalaAPI,它是一個分布式消息隊列,一般用來緩存數(shù)據(jù),還可以實現(xiàn)攔截器的功能,過濾某些自定義數(shù)據(jù), 而Storm / Streaming通過消費Kafka內(nèi)的數(shù)據(jù)進行計算. 內(nèi)部有序,外部分區(qū)無序,消費者也有可以有組.

1.Kafka允許您發(fā)布和訂閱流記錄。在這方面,它類似于一個消息隊列或企業(yè)消息傳遞系統(tǒng)。

2.Kafka能讓你以容錯方式進行流記錄的存儲。

3.數(shù)據(jù)產(chǎn)生時你就可以進行流數(shù)據(jù)處理。


Kafka架構(gòu)

Kafka有自己的一套API: Consumer;

Kafka對消息保存時根據(jù) Topic(可以理解為一個隊列) 進行歸類,發(fā)送(消息)者稱為 Producer,消費(消息)者稱為 Consumer,此外kafka集群有多個kafka實例組成,每個實例 (server)成為 broker。

Kafka內(nèi)部機制流程

1. Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。 B配置的是1個分區(qū)

2. Consumer :消息消費者,向kafka broker取消息的客戶端

3. Topic:可以理解為一個隊列

4. Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個partion只會把消息發(fā)給該CG中的一個consumer。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic

5. Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic

6. Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。

7. Offset:(一個角標(biāo),找文件的角標(biāo))kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka


Kafka環(huán)境搭建

Kafka是集群,所以節(jié)點上都要安裝.

Kafka依賴zookeeper來保存一些meta信息,依次來保證系統(tǒng)的可用性.

注: 面對JDK.Hadoop.zookeeper都默認(rèn)好的

1. 官網(wǎng)下載Kafka?? kafka.apache.org/downloads.html

2. 解壓包到modules路徑下

3. cd進入kafka目錄下,且在此目錄下創(chuàng)建logs文件夾

4. 進入config目錄下,編輯server.properties,編輯內(nèi)容如下:

briker.id=0 ?? //broker的全局唯一編號,不能重復(fù),每個節(jié)點要一個唯一標(biāo)識id

delete.topic.enable=true ? ? ? //刪除topic的功能使能

log.dirs=/opt/modules/kafka_2.11-0.11.0.0/logs ?? //Kafka運行日志存放的路徑

//下面是配置連接zookeeper的集群地址,端口號2181

zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

5. 修改vi /etc/profile

export KAFKA_HOME=/opt/module/kafka???????? //配置KAFKA_HOME

export PATH=$PATH:$KAFKA_HOME/bin

6. 分發(fā)安裝包 xsync profile????? //xsync是我寫的分發(fā)腳本,沒有的就用命令一個個分發(fā)..

7. 在分發(fā)的節(jié)點上修改brokerid=1? /? brokerid=2????? //不得重復(fù)

server.properties下的參數(shù)解讀

num.network.threads=3? ? ? ?? //處理網(wǎng)絡(luò)請求的線程數(shù)量

num.io.threads=8? ? ? ? //用來處理磁盤IO的現(xiàn)成數(shù)量

socket.send.buffer.bytes=102400? ? ? ?? //接收套接字的緩沖區(qū)大小

socket.receive.buffer.bytes=102400? ? ? ? ? //接收套接字的緩沖區(qū)大小

socket.request.max.bytes=104857600? ? ? ?? //請求套接字的緩沖區(qū)大小

num.partitions=1? ? ? ?? //topic在當(dāng)前broker上的分區(qū)個數(shù)

num.recovery.threads.per.data.dir=1? ? ? ? ? //用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量

log.retention.hours=168? ? ? ? ? //segment文件保留的最長時間,超時將被刪除


Kafka命令行操作

啟動kafka: bin/kafka-server-start.sh config/server.properties ?? //所有節(jié)點都要啟動

查看服務(wù)器中所有topic: bin/kafka-topics.sh --list --zookeeper hadoop101:2181

創(chuàng)建topic: bin/kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 3 --partitions 1--topic first ? ? ? ? //有多少個節(jié)點就最多創(chuàng)建多少個副本

分組: bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties

注: --topic定義topic名? --replication-factor定義副本數(shù)? --partitions定義分區(qū)數(shù)? //針對以上兩個

刪除topic: bin/kafka-topics.sh --delete --zookeeperhadoop101:2181 --topic first


Kafka生產(chǎn)過程分析

寫入流程

producer寫入流程

1. producer先從zookeeper的"/brokers/.../state"節(jié)點找到該partition的leader

2. producer將消息發(fā)送給該leader

3. leader將消息寫入本地log

4. followers從leader pull消息,寫入本地log后向leader發(fā)送ACK

5. leader收到所有ISR中的replication的ACK后,增加HW(highwatermark,最后commit的offset)并向producer發(fā)送ACK

Kafka的存儲策略

無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

1. 基于時間:log.retention.hours=168

2. 基于大小:log.retention.bytes=1073741824


Kafka Stream

Kafka Streams是什么: Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。它建立在流處理的一系列重要功能基礎(chǔ)之上,比如正確區(qū)分事件和處理時間,處理遲到數(shù)據(jù)以及高效的應(yīng)用程序狀態(tài)管理。

特點1: 功能強大

高拓展性 / 彈性 / 容錯 / 有狀態(tài)和無狀態(tài)處理 / 基于事件時間的Window,Join,Aggergations

特點2: 輕量級

無需專門的集群 / 一個庫,而不是框架

特點3: 完全集成

100%的Kafka 0.10.0版本兼容 / 易于集成到現(xiàn)有的應(yīng)用程序 / 程序部署無需手工處理(這個指的應(yīng)該是Kafka多分區(qū)機制對Kafka Streams多實例的自動匹配)

特點4: 實時性

毫秒級延遲 / 并非微批處理 / 窗口允許亂序數(shù)據(jù) / 允許遲到數(shù)據(jù)

它的缺點: Kafka Stream能做的東西Storm和Spark Streamin已經(jīng)完美解決了,況且Streamin有一個Saprk這么好的生態(tài)圈,要Kafka Stream是錦上添花,增加學(xué)習(xí)成本(其實使我不會Kafka Stream)...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容