

kafka三大特性
第一、發(fā)布和訂閱
第二、實時的流處理
第三、安全地存儲流數(shù)據(jù)在集群節(jié)點(diǎn)上
kafka的架構(gòu)

First a few concepts:
Kafka is run as a cluster on one or more servers that can span multiple datacenters.
解析:kafka是一個以集群形式運(yùn)行在一個或多個機(jī)器上,跨多數(shù)據(jù)中心的服務(wù)。
The Kafka cluster stores streams of?records?in categories called?topics.
解析:kafa集群使用topics把流數(shù)據(jù)記錄存儲時做分類。
Each record consists of a key, a value, and a timestamp.
解析:每一條記錄包含一個key,一個value,和一個時間戳

查看官方文檔的QUICKSTART模塊

Zookeeper的下載地址,統(tǒng)一從cdh5那個地址(http://archive.cloudera.com/cdh5/cdh/5/)下載。
解壓zk
配置環(huán)境變量~/.bash_profile
配置zk的數(shù)據(jù)存儲目錄
下載kafka

1、單節(jié)點(diǎn)單broker的部署及使用
解壓kafka
tar -zxvf kafka_2.11-2.2.0.tgz
設(shè)置kafka的根目錄和啟動bin目錄配置在環(huán)境變量里面
PATH=$PATH:$HOME/bin
export KAFKA_HOME=/usr/local/kafka_2.11-2.1.1
export PATH=$KAFKA_HOME/bin:$PATH
配置環(huán)境變量當(dāng)前登錄用戶的私有環(huán)境變量~/.bash_profile
配置kafka的配置文件server.properties
$KAFKA_HOME/config/server.properties
broker.id=0? //這個配置是broker的編號,且該編號唯一對應(yīng)一個broker不能重復(fù)。
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured
listeners=PLAINTEXT://10.101.3.3:9092,如果不設(shè)置他會默認(rèn)本機(jī)hostname
log.dirs //存儲kafka的日志
zookeeper.connect //zk的地址
啟動zookeeper
如果你已經(jīng)有了zookeeper,那就跳過該步驟,如果沒有那就直接使用kafka自己安裝包里的zookeeper,啟動命令
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動kafka
后臺方式: ./kafka-server-start.sh -daemon?${KAFKA_HOME}/config/server.properties &
會話方式:./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
查看是否啟動 jps 命令

查看kafka、zookeeper使用的配置文件? ?jps -m

創(chuàng)建一個topic:指定ZK
./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic mytopic
備注:1、創(chuàng)建topic的時候,需要使用--create命令 2、需要指定zk的地址--zookeeper 10.101.3.3:2181 3、指定副本系數(shù)--replication-factor 1 4、指定分區(qū)?--partitions 1? 5、指定topic的名字--topic mytopic

查看所有已經(jīng)創(chuàng)建的topic
bin/kafka-topics.sh --list --zookeeper 10.101.3.3:2181

生產(chǎn)消息:指定Broker(localhost最好是ip或者域名,不要使用localhost)
./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic mytopic
備注:啟動消息生產(chǎn)者,也就是啟動一個broker,需要使用kafka-console-producer.sh命令指定broker的服務(wù)暴漏端口和對應(yīng)的topic。如果你出現(xiàn)以下錯誤,就是因為ip或域名沒有設(shè)置正確導(dǎo)致的

消費(fèi)消息:指定生產(chǎn)者關(guān)聯(lián)的Broker
./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic mytopic --from-beginning

--from-beginning參數(shù)的解析
這個參數(shù)就是告訴消費(fèi)者服務(wù),要全部監(jiān)聽所有生產(chǎn)者的消息。如果不帶有此參數(shù),那么該消費(fèi)服務(wù)只會接受啟動時間之后監(jiān)聽的消息,之前的消息是不會監(jiān)聽的。
查看所有topic的設(shè)置詳情
bin/kafka-topics.sh --describe --zookeeper localhost:2181?

查看某一個topic的 設(shè)置詳情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic這里只查看名叫test的topic的設(shè)置信息

關(guān)閉kafka的服務(wù)
bin/kafka-server-stop.sh
關(guān)閉前

關(guān)閉后

2、單節(jié)點(diǎn)多Broker部署及應(yīng)用
首先看下圖,是官網(wǎng)原圖:意思就是把kafka的server.properties配置文件復(fù)制多份。

我們就按照官網(wǎng)說的來,我部署三個節(jié)點(diǎn)

需要把server-1.properties、server-1.properties、server-1.properties的broker.id和日志目錄和listeners配置分別改一下
config/server-1.properties:
????broker.id=1
????listeners=PLAINTEXT://:9093
????log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
????broker.id=2
????listeners=PLAINTEXT://:9094
????log.dirs=/tmp/kafka-logs-2
config/server-3.properties:
????broker.id=3
listeners=PLAINTEXT://:9095
????log.dirs=/tmp/kafka-logs-3


啟動多broker(后臺方式啟動)
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

啟動后,可以看到3各kafka的服務(wù)。

kafka服務(wù)對應(yīng)的配置文件:jps -m可以查看到

創(chuàng)建一個topic
./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 3 --partitions 1 --topic my_m_topic
備注:這時候創(chuàng)建topic時,需要根據(jù)broker的節(jié)點(diǎn)數(shù)量,指定副本系數(shù)--replication-factor,這時候就應(yīng)該為3了。
查看所有已創(chuàng)建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看我們多副本topic的詳情
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_m_topic

其中Leader:代表主副本。Replicas:代表副本所在的broker機(jī)器編號1、2、3? Isr:代表存活的副本。
生產(chǎn)消息:指定Broker(localhost最好是ip或者域名,不要使用localhost),這里的--broker-list后面需要多個broker節(jié)點(diǎn)ip
./kafka-console-producer.sh --broker-list 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095?--topic my_m_topic

消費(fèi)消息:指定生產(chǎn)者關(guān)聯(lián)的zk
./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9093,10.101.3.3:9094,10.101.3.3:9095?--from-beginning --topic my_m_topic?
刪除topic
server.properties?設(shè)置 delete.topic.enable=true
如果沒有設(shè)置 delete.topic.enable=true,則調(diào)用kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)
調(diào)用命令刪除topic:
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_m3_topic
刪除kafka存儲目錄(server.properties文件log.dirs配置,默認(rèn)為"/data/kafka-logs")相關(guān)topic的數(shù)據(jù)目錄。
recovery-point-offset-checkpoint和replication-offset-checkpoint中的1變?yōu)?,刪除topic對應(yīng)的名字
kafka的容錯性
如果kafka的多個節(jié)點(diǎn)中,我們殺掉任何一個節(jié)點(diǎn),都不會影響消息的傳輸。如果我們干掉leader節(jié)點(diǎn),他內(nèi)部會把其他副節(jié)點(diǎn)轉(zhuǎn)正,成為leader,也不會影響消息傳輸。
