Kafka簡介
Kafka由scala編寫,提供java.scalaAPI,它是一個分布式消息隊列,一般用來緩存數(shù)據(jù),還可以實現(xiàn)攔截器的功能,過濾某些自定義數(shù)據(jù), 而Storm / Streaming通過消費Kafka內(nèi)的數(shù)據(jù)進行計算. 內(nèi)部有序,外部分區(qū)無序,消費者也有可以有組.
1.Kafka允許您發(fā)布和訂閱流記錄。在這方面,它類似于一個消息隊列或企業(yè)消息傳遞系統(tǒng)。
2.Kafka能讓你以容錯方式進行流記錄的存儲。
3.數(shù)據(jù)產(chǎn)生時你就可以進行流數(shù)據(jù)處理。
Kafka架構(gòu)
Kafka有自己的一套API: Consumer;
Kafka對消息保存時根據(jù) Topic(可以理解為一個隊列) 進行歸類,發(fā)送(消息)者稱為 Producer,消費(消息)者稱為 Consumer,此外kafka集群有多個kafka實例組成,每個實例 (server)成為 broker。

1. Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。 B配置的是1個分區(qū)
2. Consumer :消息消費者,向kafka broker取消息的客戶端
3. Topic:可以理解為一個隊列
4. Consumer Group (CG):這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個partion只會把消息發(fā)給該CG中的一個consumer。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic
5. Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic
6. Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
7. Offset:(一個角標(biāo),找文件的角標(biāo))kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka
Kafka環(huán)境搭建
Kafka是集群,所以節(jié)點上都要安裝.
Kafka依賴zookeeper來保存一些meta信息,依次來保證系統(tǒng)的可用性.
注: 面對JDK.Hadoop.zookeeper都默認(rèn)好的
1. 官網(wǎng)下載Kafka?? kafka.apache.org/downloads.html
2. 解壓包到modules路徑下
3. cd進入kafka目錄下,且在此目錄下創(chuàng)建logs文件夾
4. 進入config目錄下,編輯server.properties,編輯內(nèi)容如下:
briker.id=0 ?? //broker的全局唯一編號,不能重復(fù),每個節(jié)點要一個唯一標(biāo)識id
delete.topic.enable=true ? ? ? //刪除topic的功能使能
log.dirs=/opt/modules/kafka_2.11-0.11.0.0/logs ?? //Kafka運行日志存放的路徑
//下面是配置連接zookeeper的集群地址,端口號2181
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
5. 修改vi /etc/profile
export KAFKA_HOME=/opt/module/kafka???????? //配置KAFKA_HOME
export PATH=$PATH:$KAFKA_HOME/bin
6. 分發(fā)安裝包 xsync profile????? //xsync是我寫的分發(fā)腳本,沒有的就用命令一個個分發(fā)..
7. 在分發(fā)的節(jié)點上修改brokerid=1? /? brokerid=2????? //不得重復(fù)
server.properties下的參數(shù)解讀
num.network.threads=3? ? ? ?? //處理網(wǎng)絡(luò)請求的線程數(shù)量
num.io.threads=8? ? ? ? //用來處理磁盤IO的現(xiàn)成數(shù)量
socket.send.buffer.bytes=102400? ? ? ?? //接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400? ? ? ? ? //接收套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600? ? ? ?? //請求套接字的緩沖區(qū)大小
num.partitions=1? ? ? ?? //topic在當(dāng)前broker上的分區(qū)個數(shù)
num.recovery.threads.per.data.dir=1? ? ? ? ? //用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
log.retention.hours=168? ? ? ? ? //segment文件保留的最長時間,超時將被刪除
Kafka命令行操作
啟動kafka: bin/kafka-server-start.sh config/server.properties ?? //所有節(jié)點都要啟動
查看服務(wù)器中所有topic: bin/kafka-topics.sh --list --zookeeper hadoop101:2181
創(chuàng)建topic: bin/kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 3 --partitions 1--topic first ? ? ? ? //有多少個節(jié)點就最多創(chuàng)建多少個副本
分組: bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
注: --topic定義topic名? --replication-factor定義副本數(shù)? --partitions定義分區(qū)數(shù)? //針對以上兩個
刪除topic: bin/kafka-topics.sh --delete --zookeeperhadoop101:2181 --topic first
Kafka生產(chǎn)過程分析
寫入流程

1. producer先從zookeeper的"/brokers/.../state"節(jié)點找到該partition的leader
2. producer將消息發(fā)送給該leader
3. leader將消息寫入本地log
4. followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
5. leader收到所有ISR中的replication的ACK后,增加HW(highwatermark,最后commit的offset)并向producer發(fā)送ACK
Kafka的存儲策略
無論消息是否被消費,kafka都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
1. 基于時間:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824
Kafka Stream
Kafka Streams是什么: Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。它建立在流處理的一系列重要功能基礎(chǔ)之上,比如正確區(qū)分事件和處理時間,處理遲到數(shù)據(jù)以及高效的應(yīng)用程序狀態(tài)管理。
特點1: 功能強大
高拓展性 / 彈性 / 容錯 / 有狀態(tài)和無狀態(tài)處理 / 基于事件時間的Window,Join,Aggergations
特點2: 輕量級
無需專門的集群 / 一個庫,而不是框架
特點3: 完全集成
100%的Kafka 0.10.0版本兼容 / 易于集成到現(xiàn)有的應(yīng)用程序 / 程序部署無需手工處理(這個指的應(yīng)該是Kafka多分區(qū)機制對Kafka Streams多實例的自動匹配)
特點4: 實時性
毫秒級延遲 / 并非微批處理 / 窗口允許亂序數(shù)據(jù) / 允許遲到數(shù)據(jù)
它的缺點: Kafka Stream能做的東西Storm和Spark Streamin已經(jīng)完美解決了,況且Streamin有一個Saprk這么好的生態(tài)圈,要Kafka Stream是錦上添花,增加學(xué)習(xí)成本(其實使我不會Kafka Stream)...