在學(xué)習(xí)kafka集群之前,先來(lái)學(xué)習(xí)下單節(jié)點(diǎn)kafka的一些基本操作,包括安裝及一些基本命令,以便后續(xù)集群環(huán)境的學(xué)習(xí)。
1.單節(jié)點(diǎn)安裝
kafka必須依賴于zookeeper,假定當(dāng)前zookeeper集群已搭建完成(如不熟悉zookeeper集群如何搭建,請(qǐng)參考http://www.itdecent.cn/p/0e813f6a6049)。
環(huán)境依賴:
1.已安裝完畢的zookeeper集群:
192.168.162.52:2181
192.168.162.235:2181
192.168.162.239:2181
2.軟件環(huán)境
centos6.8
jdk "1.8.0_111"
3.kafka軟件
cd /opt
mkdir kafka
cd /opt/kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz
tar -zxvf kafka_2.12-0.11.0.0.tgz
kafka配置 server.properties:
1.log.dirs 修改到指定目錄
log.dirs=/opt/kafka/kafka_2.12-0.11.0.0/kafka-logs
2.zookeeper配置
zookeeper.connect=192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
server.properties 參數(shù)配置詳解:
| 屬性 | 默認(rèn)值 | 說(shuō)明 |
|---|---|---|
| broker.id | /tmp/kafka-logs | Kafka數(shù)據(jù)存放的目錄??梢灾付ǘ鄠€(gè)目錄,中間用逗號(hào)分隔,當(dāng)新partition被創(chuàng)建的時(shí)會(huì)被存放到當(dāng)前存放partition最少的目錄 |
| port | 9092 | BrokerServer接受客戶端連接的端口號(hào) |
| zookeeper.connect | null | Zookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3 需要注意的是,消費(fèi)者的參數(shù)要和此參數(shù)一致 |
| message.max.bytes | 1000000 | 務(wù)器可以接收到的最大的消息大小。注意此參數(shù)要和consumer的maximum.message.size大小一致,否則會(huì)因?yàn)樯a(chǎn)者生產(chǎn)的消息太大導(dǎo)致消費(fèi)者無(wú)法消費(fèi)。 |
| num.io.threads | 8 | 服務(wù)器用來(lái)執(zhí)行讀寫請(qǐng)求的IO線程數(shù),此參數(shù)的數(shù)量至少要等于服務(wù)器上磁盤的數(shù)量。 |
| queued.max.requests | 500 | /O線程可以處理請(qǐng)求的隊(duì)列大小,若實(shí)際請(qǐng)求數(shù)超過(guò)此大小,網(wǎng)絡(luò)線程將停止接收新的請(qǐng)求。 |
| socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
| socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
| socket.request.max.bytes | 100 * 1024 * 1024 | 服務(wù)器允許請(qǐng)求的最大值, 用來(lái)防止內(nèi)存溢出,其值應(yīng)該小于 Java heap size. |
| num.partitions | 1 | 默認(rèn)partition數(shù)量,如果topic在創(chuàng)建時(shí)沒(méi)有指定partition數(shù)量,默認(rèn)使用此值,建議改為5 |
| log.segment.bytes | 1024 * 1024 * 1024 | Segment文件的大小,超過(guò)此值將會(huì)自動(dòng)新建一個(gè)segment,此值可以被topic級(jí)別的參數(shù)覆蓋。 |
| log.roll.{ms,hours} | 24 * 7 hours | 新建segment文件的時(shí)間,此值可以被topic級(jí)別的參數(shù)覆蓋。 |
| log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的保存周期,保存周期超過(guò)此時(shí)間日志就會(huì)被刪除。此參數(shù)可以被topic級(jí)別參數(shù)覆蓋。數(shù)據(jù)量大時(shí),建議減小此值。 |
| log.retention.bytes | -1 | 每個(gè)partition的最大容量,若數(shù)據(jù)量超過(guò)此值,partition數(shù)據(jù)將會(huì)被刪除。注意這個(gè)參數(shù)控制的是每個(gè)partition而不是topic。此參數(shù)可以被log級(jí)別參數(shù)覆蓋。 |
| log.retention.check.interval.ms | 5 minutes | 刪除策略的檢查周期 |
| auto.create.topics.enable | true | 自動(dòng)創(chuàng)建topic參數(shù),建議此值設(shè)置為false,嚴(yán)格控制topic管理,防止生產(chǎn)者錯(cuò)寫topic。 |
| default.replication.factor | 1 | 默認(rèn)副本數(shù)量,建議改為2。 |
| replica.lag.time.max.ms | 10000 | 在此窗口時(shí)間內(nèi)沒(méi)有收到follower的fetch請(qǐng)求,leader會(huì)將其從ISR(in-sync replicas)中移除。 |
| replica.lag.max.messages | 4000 | 如果replica節(jié)點(diǎn)落后leader節(jié)點(diǎn)此值大小的消息數(shù)量,leader節(jié)點(diǎn)就會(huì)將其從ISR中移除。 |
| replica.socket.timeout.ms | 30 * 1000 | replica向leader發(fā)送請(qǐng)求的超時(shí)時(shí)間。 |
| zookeeper.session.timeout.ms | 6000 | ZooKeeper session 超時(shí)時(shí)間。如果在此時(shí)間內(nèi)server沒(méi)有向zookeeper發(fā)送心跳,zookeeper就會(huì)認(rèn)為此節(jié)點(diǎn)已掛掉。 此值太低導(dǎo)致節(jié)點(diǎn)容易被標(biāo)記死亡;若太高,.會(huì)導(dǎo)致太遲發(fā)現(xiàn)節(jié)點(diǎn)死亡。 |
| zookeeper.connection.timeout.ms | 6000 | 客戶端連接zookeeper的超時(shí)時(shí)間。 |
| zookeeper.sync.time.ms | 2000 | H ZK follower落后 ZK leader的時(shí)間。 |
| controlled.shutdown.enable | true | 允許broker shutdown。如果啟用,broker在關(guān)閉自己之前會(huì)把它上面的所有l(wèi)eaders轉(zhuǎn)移到其它brokers上,建議啟用,增加集群穩(wěn)定性。 |
kafka 啟動(dòng):
cd /opt/kafka/kafka_2.12-0.11.0.0
nohup ./bin/kafka-server-start.sh ./config/server.properties &
2. Topic
2.1創(chuàng)建topic
cd /opt/kafka/kafka_2.12-0.11.0.0
bin/kafka-topics.sh --create --topic test0 --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --partitions 1 --replication-factor 1
--create: 指定創(chuàng)建topic動(dòng)作
--topic:指定新建topic的名稱
--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項(xiàng){zookeeper.connect}一樣
--config:指定當(dāng)前topic上有效的參數(shù)值,參數(shù)列表參考文檔為: [Topic-level configuration](http://kafka.apache.org/documentation/)
--partitions:指定當(dāng)前創(chuàng)建的kafka分區(qū)數(shù)量,默認(rèn)為1個(gè)
--replication-factor:指定每個(gè)分區(qū)的復(fù)制因子個(gè)數(shù),默認(rèn)1個(gè)

create topic
2.2 查看Topic
./bin/kafka-topics.sh --list --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181

查看topic
2.3 查看topic描述
./bin/kafka-topics.sh --describe --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
--describe: 指定是展示詳細(xì)信息命令
--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項(xiàng){zookeeper.connect}一樣
--topic:指定需要展示數(shù)據(jù)的topic名稱

describe
2.4 修改topic
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 10
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 3 ## Kafka分區(qū)數(shù)量只允許增加,不允許減少
2.5 刪除topic
默認(rèn)情況下Kafka的Topic不能直接刪除的,需要進(jìn)行相關(guān)參數(shù)配置
bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181
默認(rèn)情況下,刪除是標(biāo)記刪除,沒(méi)有實(shí)際刪除這個(gè)Topic;如果運(yùn)行刪除Topic,可以通過(guò)如下兩種方式:
方式一:通過(guò)delete命令刪除后,手動(dòng)將本地磁盤以及zk上的相關(guān)topic的信息刪除即可
方式二:配置server.properties文件,給定參數(shù)delete.topic.enable=true,重啟kafka服務(wù),此時(shí)執(zhí)行delete命令表示允許進(jìn)行Topic的刪除
3.啟動(dòng)生產(chǎn)者發(fā)送消息
./bin/kafka-console-producer.sh --broker-list 192.168.162.239:9092 --topic test0

生產(chǎn)者
生產(chǎn)者部分參數(shù)
| 屬性 | 默認(rèn)值 | 說(shuō)明 |
|---|---|---|
| metadata.broker.list | 啟動(dòng)時(shí)producer查詢brokers的列表,可以是集群中所有brokers的一個(gè)子集。注意,這個(gè)參數(shù)只是用來(lái)獲取topic的元信息用,producer會(huì)從元信息中挑選合適的broker并與之建立socket連接。格式是:host1:port1,host2:port2。 | |
| request.timeout.ms | 10000 | Broker等待ack的超時(shí)時(shí)間,若等待時(shí)間超過(guò)此值,會(huì)返回客戶端錯(cuò)誤信息。 |
| producer.type | sync | 同步異步模式。async表示異步,sync表示同步。如果設(shè)置成異步模式,可以允許生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會(huì)極大的提高broker性能,推薦設(shè)置為異步。 |
| serializer.class | kafka.serializer.DefaultEncoder | 序列號(hào)類,.默認(rèn)序列化成 byte[] 。 |
| key.serializer.class | Key的序列化類,默認(rèn)同上。 | |
| partitioner.class | kafka.producer.DefaultPartitioner | Partition類,默認(rèn)對(duì)key進(jìn)行hash。 |
| compression.codec | none | 指定producer消息的壓縮格式,可選參數(shù)為: “none”, “gzip” and “snappy”。 |
| compressed.topics | null | 啟用壓縮的topic名稱。若上面參數(shù)選擇了一個(gè)壓縮格式,那么壓縮僅對(duì)本參數(shù)指定的topic有效,若本參數(shù)為空,則對(duì)所有topic有效。 |
| message.send.max.retries | 3 | Producer發(fā)送失敗時(shí)重試次數(shù)。若網(wǎng)絡(luò)出現(xiàn)問(wèn)題,可能會(huì)導(dǎo)致不斷重試。 |
| queue.buffering.max.ms | 5000 | 啟用異步模式時(shí),producer緩存消息的時(shí)間。比如我們?cè)O(shè)置成1000時(shí),它會(huì)緩存1秒的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會(huì)造成時(shí)效性的降低。 |
| queue.buffering.max.messages | 10000 | 采用異步模式時(shí)producer buffer 隊(duì)列里最大緩存的消息數(shù)量,如果超過(guò)這個(gè)數(shù)值,producer就會(huì)阻塞或者丟掉消息。 |
| queue.enqueue.timeout.ms | -1 | 當(dāng)達(dá)到上面參數(shù)值時(shí)producer阻塞等待的時(shí)間。如果值設(shè)置為0,buffer隊(duì)列滿時(shí)producer不會(huì)阻塞,消息直接被丟掉。若值設(shè)置為-1,producer會(huì)被阻塞,不會(huì)丟消息。 |
| batch.num.messages | 200 | 采用異步模式時(shí),一個(gè)batch緩存的消息數(shù)量。達(dá)到這個(gè)數(shù)量值時(shí)producer才會(huì)發(fā)送消息。 |
4.啟動(dòng)消費(fèi)者接收消息
./bin/kafka-console-consumer.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --topic test0 --from-beginning

消費(fèi)者
消費(fèi)者部分參數(shù)
| 屬性 | 默認(rèn)值 | 說(shuō)明 |
|---|---|---|
| group.id | Consumer的組ID,相同goup.id的consumer屬于同一個(gè)組。 | |
| zookeeper.connect | Consumer的zookeeper連接串,要和broker的配置一致。 | |
| consumer.id | null | 如果不設(shè)置會(huì)自動(dòng)生成。 |
| socket.timeout.ms | 30 * 1000 | 網(wǎng)絡(luò)請(qǐng)求的socket超時(shí)時(shí)間。實(shí)際超時(shí)時(shí)間由max.fetch.wait + socket.timeout.ms 確定。 |
| socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests. |
| fetch.message.max.bytes | 1024 * 1024 | 查詢topic-partition時(shí)允許的最大消息大小。consumer會(huì)為每個(gè)partition緩存此大小的消息到內(nèi)存,因此,這個(gè)參數(shù)可以控制consumer的內(nèi)存使用量。這個(gè)值應(yīng)該至少比server允許的最大消息大小大,以免producer發(fā)送的消息大于consumer允許的消息。 |
| num.consumer.fetchers | 1 | The number fetcher threads used to fetch data. |
| auto.commit.enable | true | 如果此值設(shè)置為true,consumer會(huì)周期性的把當(dāng)前消費(fèi)的offset值保存到zookeeper。當(dāng)consumer失敗重啟之后將會(huì)使用此值作為新開始消費(fèi)的值。 |
| auto.commit.interval.ms | 60 * 1000 | Consumer提交offset值到zookeeper的周期。 |
| queued.max.message.chunks | 2 | 用來(lái)被consumer消費(fèi)的message chunks 數(shù)量, 每個(gè)chunk可以緩存fetch.message.max.bytes大小的數(shù)據(jù)量。 |