kafka基本操作

在學(xué)習(xí)kafka集群之前,先來(lái)學(xué)習(xí)下單節(jié)點(diǎn)kafka的一些基本操作,包括安裝及一些基本命令,以便后續(xù)集群環(huán)境的學(xué)習(xí)。

1.單節(jié)點(diǎn)安裝

kafka必須依賴于zookeeper,假定當(dāng)前zookeeper集群已搭建完成(如不熟悉zookeeper集群如何搭建,請(qǐng)參考http://www.itdecent.cn/p/0e813f6a6049)。

環(huán)境依賴:
1.已安裝完畢的zookeeper集群:
192.168.162.52:2181
192.168.162.235:2181
192.168.162.239:2181
2.軟件環(huán)境
centos6.8   
jdk "1.8.0_111"
3.kafka軟件
cd /opt
mkdir kafka
cd /opt/kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz
tar -zxvf  kafka_2.12-0.11.0.0.tgz

kafka配置 server.properties:

1.log.dirs 修改到指定目錄
log.dirs=/opt/kafka/kafka_2.12-0.11.0.0/kafka-logs
2.zookeeper配置 
zookeeper.connect=192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181

server.properties 參數(shù)配置詳解:

屬性 默認(rèn)值 說(shuō)明
broker.id /tmp/kafka-logs Kafka數(shù)據(jù)存放的目錄??梢灾付ǘ鄠€(gè)目錄,中間用逗號(hào)分隔,當(dāng)新partition被創(chuàng)建的時(shí)會(huì)被存放到當(dāng)前存放partition最少的目錄
port 9092 BrokerServer接受客戶端連接的端口號(hào)
zookeeper.connect null Zookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3 需要注意的是,消費(fèi)者的參數(shù)要和此參數(shù)一致
message.max.bytes 1000000 務(wù)器可以接收到的最大的消息大小。注意此參數(shù)要和consumer的maximum.message.size大小一致,否則會(huì)因?yàn)樯a(chǎn)者生產(chǎn)的消息太大導(dǎo)致消費(fèi)者無(wú)法消費(fèi)。
num.io.threads 8 服務(wù)器用來(lái)執(zhí)行讀寫請(qǐng)求的IO線程數(shù),此參數(shù)的數(shù)量至少要等于服務(wù)器上磁盤的數(shù)量。
queued.max.requests 500 /O線程可以處理請(qǐng)求的隊(duì)列大小,若實(shí)際請(qǐng)求數(shù)超過(guò)此大小,網(wǎng)絡(luò)線程將停止接收新的請(qǐng)求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 服務(wù)器允許請(qǐng)求的最大值, 用來(lái)防止內(nèi)存溢出,其值應(yīng)該小于 Java heap size.
num.partitions 1 默認(rèn)partition數(shù)量,如果topic在創(chuàng)建時(shí)沒(méi)有指定partition數(shù)量,默認(rèn)使用此值,建議改為5
log.segment.bytes 1024 * 1024 * 1024 Segment文件的大小,超過(guò)此值將會(huì)自動(dòng)新建一個(gè)segment,此值可以被topic級(jí)別的參數(shù)覆蓋。
log.roll.{ms,hours} 24 * 7 hours 新建segment文件的時(shí)間,此值可以被topic級(jí)別的參數(shù)覆蓋。
log.retention.{ms,minutes,hours} 7 days Kafka segment log的保存周期,保存周期超過(guò)此時(shí)間日志就會(huì)被刪除。此參數(shù)可以被topic級(jí)別參數(shù)覆蓋。數(shù)據(jù)量大時(shí),建議減小此值。
log.retention.bytes -1 每個(gè)partition的最大容量,若數(shù)據(jù)量超過(guò)此值,partition數(shù)據(jù)將會(huì)被刪除。注意這個(gè)參數(shù)控制的是每個(gè)partition而不是topic。此參數(shù)可以被log級(jí)別參數(shù)覆蓋。
log.retention.check.interval.ms 5 minutes 刪除策略的檢查周期
auto.create.topics.enable true 自動(dòng)創(chuàng)建topic參數(shù),建議此值設(shè)置為false,嚴(yán)格控制topic管理,防止生產(chǎn)者錯(cuò)寫topic。
default.replication.factor 1 默認(rèn)副本數(shù)量,建議改為2。
replica.lag.time.max.ms 10000 在此窗口時(shí)間內(nèi)沒(méi)有收到follower的fetch請(qǐng)求,leader會(huì)將其從ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 如果replica節(jié)點(diǎn)落后leader節(jié)點(diǎn)此值大小的消息數(shù)量,leader節(jié)點(diǎn)就會(huì)將其從ISR中移除。
replica.socket.timeout.ms 30 * 1000 replica向leader發(fā)送請(qǐng)求的超時(shí)時(shí)間。
zookeeper.session.timeout.ms 6000 ZooKeeper session 超時(shí)時(shí)間。如果在此時(shí)間內(nèi)server沒(méi)有向zookeeper發(fā)送心跳,zookeeper就會(huì)認(rèn)為此節(jié)點(diǎn)已掛掉。 此值太低導(dǎo)致節(jié)點(diǎn)容易被標(biāo)記死亡;若太高,.會(huì)導(dǎo)致太遲發(fā)現(xiàn)節(jié)點(diǎn)死亡。
zookeeper.connection.timeout.ms 6000 客戶端連接zookeeper的超時(shí)時(shí)間。
zookeeper.sync.time.ms 2000 H ZK follower落后 ZK leader的時(shí)間。
controlled.shutdown.enable true 允許broker shutdown。如果啟用,broker在關(guān)閉自己之前會(huì)把它上面的所有l(wèi)eaders轉(zhuǎn)移到其它brokers上,建議啟用,增加集群穩(wěn)定性。

kafka 啟動(dòng):

cd /opt/kafka/kafka_2.12-0.11.0.0
nohup ./bin/kafka-server-start.sh  ./config/server.properties  &

2. Topic

2.1創(chuàng)建topic

cd /opt/kafka/kafka_2.12-0.11.0.0

bin/kafka-topics.sh --create --topic test0 --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --partitions 1 --replication-factor 1
--create: 指定創(chuàng)建topic動(dòng)作
--topic:指定新建topic的名稱
--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項(xiàng){zookeeper.connect}一樣
--config:指定當(dāng)前topic上有效的參數(shù)值,參數(shù)列表參考文檔為: [Topic-level configuration](http://kafka.apache.org/documentation/)
--partitions:指定當(dāng)前創(chuàng)建的kafka分區(qū)數(shù)量,默認(rèn)為1個(gè)
--replication-factor:指定每個(gè)分區(qū)的復(fù)制因子個(gè)數(shù),默認(rèn)1個(gè)
create topic

2.2 查看Topic

 ./bin/kafka-topics.sh --list --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181
查看topic

2.3 查看topic描述

./bin/kafka-topics.sh --describe --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 

--describe: 指定是展示詳細(xì)信息命令
--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項(xiàng){zookeeper.connect}一樣
--topic:指定需要展示數(shù)據(jù)的topic名稱
describe

2.4 修改topic

bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 10 
bin/kafka-topics.sh --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --alter --topic test0 --partitions 3 ## Kafka分區(qū)數(shù)量只允許增加,不允許減少

2.5 刪除topic

默認(rèn)情況下Kafka的Topic不能直接刪除的,需要進(jìn)行相關(guān)參數(shù)配置
bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181

默認(rèn)情況下,刪除是標(biāo)記刪除,沒(méi)有實(shí)際刪除這個(gè)Topic;如果運(yùn)行刪除Topic,可以通過(guò)如下兩種方式:
方式一:通過(guò)delete命令刪除后,手動(dòng)將本地磁盤以及zk上的相關(guān)topic的信息刪除即可
方式二:配置server.properties文件,給定參數(shù)delete.topic.enable=true,重啟kafka服務(wù),此時(shí)執(zhí)行delete命令表示允許進(jìn)行Topic的刪除

3.啟動(dòng)生產(chǎn)者發(fā)送消息

 ./bin/kafka-console-producer.sh --broker-list 192.168.162.239:9092 --topic test0
生產(chǎn)者

生產(chǎn)者部分參數(shù)

屬性 默認(rèn)值 說(shuō)明
metadata.broker.list 啟動(dòng)時(shí)producer查詢brokers的列表,可以是集群中所有brokers的一個(gè)子集。注意,這個(gè)參數(shù)只是用來(lái)獲取topic的元信息用,producer會(huì)從元信息中挑選合適的broker并與之建立socket連接。格式是:host1:port1,host2:port2。
request.timeout.ms 10000 Broker等待ack的超時(shí)時(shí)間,若等待時(shí)間超過(guò)此值,會(huì)返回客戶端錯(cuò)誤信息。
producer.type sync 同步異步模式。async表示異步,sync表示同步。如果設(shè)置成異步模式,可以允許生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會(huì)極大的提高broker性能,推薦設(shè)置為異步。
serializer.class kafka.serializer.DefaultEncoder 序列號(hào)類,.默認(rèn)序列化成 byte[] 。
key.serializer.class Key的序列化類,默認(rèn)同上。
partitioner.class kafka.producer.DefaultPartitioner Partition類,默認(rèn)對(duì)key進(jìn)行hash。
compression.codec none 指定producer消息的壓縮格式,可選參數(shù)為: “none”, “gzip” and “snappy”。
compressed.topics null 啟用壓縮的topic名稱。若上面參數(shù)選擇了一個(gè)壓縮格式,那么壓縮僅對(duì)本參數(shù)指定的topic有效,若本參數(shù)為空,則對(duì)所有topic有效。
message.send.max.retries 3 Producer發(fā)送失敗時(shí)重試次數(shù)。若網(wǎng)絡(luò)出現(xiàn)問(wèn)題,可能會(huì)導(dǎo)致不斷重試。
queue.buffering.max.ms 5000 啟用異步模式時(shí),producer緩存消息的時(shí)間。比如我們?cè)O(shè)置成1000時(shí),它會(huì)緩存1秒的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會(huì)造成時(shí)效性的降低。
queue.buffering.max.messages 10000 采用異步模式時(shí)producer buffer 隊(duì)列里最大緩存的消息數(shù)量,如果超過(guò)這個(gè)數(shù)值,producer就會(huì)阻塞或者丟掉消息。
queue.enqueue.timeout.ms -1 當(dāng)達(dá)到上面參數(shù)值時(shí)producer阻塞等待的時(shí)間。如果值設(shè)置為0,buffer隊(duì)列滿時(shí)producer不會(huì)阻塞,消息直接被丟掉。若值設(shè)置為-1,producer會(huì)被阻塞,不會(huì)丟消息。
batch.num.messages 200 采用異步模式時(shí),一個(gè)batch緩存的消息數(shù)量。達(dá)到這個(gè)數(shù)量值時(shí)producer才會(huì)發(fā)送消息。

4.啟動(dòng)消費(fèi)者接收消息

 ./bin/kafka-console-consumer.sh  --zookeeper 192.168.162.52:2181,192.168.162.235:2181,192.168.162.239:2181 --topic test0 --from-beginning
消費(fèi)者

消費(fèi)者部分參數(shù)

屬性 默認(rèn)值 說(shuō)明
group.id Consumer的組ID,相同goup.id的consumer屬于同一個(gè)組。
zookeeper.connect Consumer的zookeeper連接串,要和broker的配置一致。
consumer.id null 如果不設(shè)置會(huì)自動(dòng)生成。
socket.timeout.ms 30 * 1000 網(wǎng)絡(luò)請(qǐng)求的socket超時(shí)時(shí)間。實(shí)際超時(shí)時(shí)間由max.fetch.wait + socket.timeout.ms 確定。
socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests.
fetch.message.max.bytes 1024 * 1024 查詢topic-partition時(shí)允許的最大消息大小。consumer會(huì)為每個(gè)partition緩存此大小的消息到內(nèi)存,因此,這個(gè)參數(shù)可以控制consumer的內(nèi)存使用量。這個(gè)值應(yīng)該至少比server允許的最大消息大小大,以免producer發(fā)送的消息大于consumer允許的消息。
num.consumer.fetchers 1 The number fetcher threads used to fetch data.
auto.commit.enable true 如果此值設(shè)置為true,consumer會(huì)周期性的把當(dāng)前消費(fèi)的offset值保存到zookeeper。當(dāng)consumer失敗重啟之后將會(huì)使用此值作為新開始消費(fèi)的值。
auto.commit.interval.ms 60 * 1000 Consumer提交offset值到zookeeper的周期。
queued.max.message.chunks 2 用來(lái)被consumer消費(fèi)的message chunks 數(shù)量, 每個(gè)chunk可以緩存fetch.message.max.bytes大小的數(shù)據(jù)量。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容