一、服務器環(huán)境
LINUX:centOS 6.5
JAVA:1.8.0_171
二、安裝配置
1、安裝配置ZooKeeper集群
(1)創(chuàng)建zookeeper目錄,快照日志存放目錄dataDir、事務日志存放目錄dataLogDir
[root@wzq ~]# cd /opt
[root@wzq opt]# mkdir zookeeper
[root@wzq zookeeper]# mkdir -p dataDir
[root@wzq zookeeper]#?mkdir dataLogDir
(2)下載解壓zookeeper
[root@wzq opt]#?wget?http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
[root@wzq opt]#?tar zxf??zookeeper-3.4.12.tar.gz
(3)修改配置文件
[root@wzq opt]# cd /zookeeper-3.4.12/conf/
[root@wzq conf]#?mv zoo_sample.cfg zoo.cfg
[root@console conf]# vim zoo.cfg
# 存放數(shù)據(jù)文件
dataDir=/opt/zookeeper/dataDir
# 存放日志文件
dataLogDir=/opt/zookeeper/dataLogDir
clientPort=2181
initLimit=5
syncLimit=2
# zookeeper cluster,2888為選舉端口,3888為心跳端口
server.1=192.168.2.128:2888:3888
server.2=192.168.2.129:2888:3888
server.3=192.168.2.133:2888:3888
在dataDir指定的目錄下面,創(chuàng)建一個myid文件,里面內容為一個數(shù)字,用來標識當前主機,conf/zoo.cfg文件中配置的server.X中X為什么數(shù)字,則myid文件中就輸入這個數(shù)字。
(4)啟動和關閉zk
[root@console bin]# ./zkServer.sh start
[root@console bin]# ./zkServer.sh stop
(5)查看zookeeper啟動狀態(tài),包括集群中各個結點的角色(leader、follower)
[root@wzq bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader
其它主機同理設置。
2、安裝配置kafka集群
(1)創(chuàng)建消息持久化目錄
[root@wzq ~]# cd /opt
[root@wzq opt]# mkdir kafka
[root@wzq opt]#?mkdir /kafkaLogs
(2)下載解壓kafka
[root@wzq kafka]# wget http://mirrors.cnnic.cn/apache/kafka/1.1.0/ kafka_2.12-1.1.0.tgz
[root@wzq kafka]# tar zxf kafka_2.12-1.1.0
(3)修改配置
[root@wzq kafka]# cd kafka_2.12-1.1.0/config/
[root@wzq config]# vim server.properties




auto.create.topics.enable=false ? ?#關閉自動創(chuàng)建topic
delete.topic.enable=true ?#添加啟用刪除topic配置
其它主機同理設置。
(4)啟動集群
[root@wzq bin]# ./kafka-server-start.sh -daemon ../config/server.properties
(5)測試集群
創(chuàng)建topic
為Topic創(chuàng)建分區(qū)時,--partitions(分區(qū)數(shù))最好是broker數(shù)量的整數(shù)倍,這樣才能使一個Topic的分區(qū)均勻的分布在整個Kafka集群中。
[root@wzq kafka_2.12-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.2.128:2181,192.168.2.129:2181,192.168.2.133 --replication-factor 1 --partitions 1 --topic TEST1
創(chuàng)建一個producer程序
[root@wzq bin]# ./kafka-console-producer.sh --broker-list 192.168.2.129:9092 --topic TEST1
創(chuàng)建consumer程序
[root@wzq kafka_2.12-1.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.129:9092 --topic TEST1 --from-beginning
producer發(fā)送消息,consumer接收,至此,KAFKA搭建完成。


PS:
(1)單純的kafka broker集群沒有意義,一臺宕機照樣出錯,必須帶上zookeeper集群一起。
(2)broker和zookeeper都至少要三臺服務器,奇數(shù)臺。
(3)每次打開consumer都會收到歷史消息,消息保存在kafkaLogs/TEST1-0/00000000000000000000.log
(4)broker的server.properties參數(shù)解釋
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=19092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.7.100 #這個參數(shù)默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數(shù)
num.io.threads=8 #這個是borker進行I/O處理的線程數(shù)
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數(shù)這個目錄,如果配置多個目錄,新創(chuàng)建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區(qū)數(shù)最少就放那一個
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當數(shù)據(jù)到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區(qū)數(shù),一個topic默認1個分區(qū)數(shù)
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880? #消息保存的最大值5M
default.replication.factor=2? #kafka保存消息的副本數(shù),如果一個副本失效了,另一個還可以繼續(xù)提供服務
replica.fetch.max.bytes=5242880? #取消息的最大直接數(shù)
log.segment.bytes=1073741824 #這個參數(shù)是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設置zookeeper的連接端口