Kafka日志本身是由多個日志段組成(log segment)。一個日志是一個FileMessageSet,它包含了日志數(shù)據(jù)以及OffsetIndex對象,該對象使用位移來讀取日志數(shù)據(jù)
*** borker配置就是指配置server.properties文件 ***
最小配置
通常情況下需要在減壓縮kafka后,修改config/server.properties 配置文件中的以下兩項
log.dirs = kafka-logs
zookeeper.connect = localhost:9092
listeners = PLAINTEXT://ip:9092
log.dirs 指的是kafka的log Data保存的目錄,默認為Null。如果不指定log Data會保存到log.dir設(shè)置的目錄中,log.dir默認為/tmp/kafka-logs。需要保證啟動KafKaServer的用戶對log.dirs或log.dir設(shè)置的目錄具有讀與寫的權(quán)限。
zookeeper.connect 指的是zookeeper集群的地址,可以是多個,多個之間用逗號分割hostname1:port1,hostname2:port2,hostname3:port3
listeners 監(jiān)聽列表(以逗號分隔 不同的協(xié)議(如plaintext,trace,ssl、不同的IP和端口))
kafka提供的borker配置
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
配置詳細說明
名稱| 說明 | 類型 | 默認值 | 有效值 | 重要性
----|------|---- |----|----|----|----
zookeeper.connect | zookeeper集群的地址,可以是多個,多個之間用逗號分割 |string | localhost:2181 | ip1:port1,ip2:port2 | 高
zookeeper.connection.timeout.ms | 客戶端在建立通zookeeper連接中的最大等待時間 | int | null| 6000 | 高
zookeeper.session.timeout.ms | ZooKeeper的session的超時時間,如果在這段時間內(nèi)沒有收到ZK的心跳,則會被認為該Kafka server掛掉了。如果把這個值設(shè)置得過低可能被誤認為掛掉,如果設(shè)置得過高,如果真的掛了,則需要很長時間才能被server得知 | int | 6000 | | 高
zookeeper.sync.time.ms | 一個ZK follower能落后leader的時間 | int | 2000 | | 高
listeners | 監(jiān)聽列表(以逗號分隔 不同的協(xié)議(如plaintext,trace,ssl、不同的IP和端口)),hostname如果設(shè)置為0.0.0.0則綁定所有的網(wǎng)卡地址;如果hostname為空則綁定默認的網(wǎng)卡。如果沒有配置則默認為java.net.InetAddress.getCanonicalHostName() | string | null | 如:PLAINTEXT://myhost:9092,TRACE://:9091 或 PLAINTEXT://0.0.0.0:9092, | 高
host.name | 。如果設(shè)置了它,會僅綁定這個地址。如果沒有設(shè)置,則會綁定所有的網(wǎng)絡(luò)接口,并提交一個給ZK。不推薦使用 只有當(dāng)listeners沒有設(shè)置時才有必要使用。| string | "' | 如:"localhost" | 高
port | server用來接受client連接的端口。不推薦使用,使用listeners配置項代替;只有在listeners沒有配置時才使用。 | int | 9092 | | 高
advertised.host.name | 會將hostname通知給生產(chǎn)者和消費者,在多網(wǎng)卡時需要設(shè)置該值為另一個ip地址。如果沒有設(shè)置該值,則返回 配置項host.name設(shè)置的值,如果host.name沒有設(shè)置則返回java.net.InetAddress.getCanonicalHostName()不推薦使用 只有當(dāng)advertised.listeners或listeners沒有設(shè)置時才有必要使用。 | string | null | | 高
advertised.listeners | 設(shè)置不同于listeners配置的監(jiān)聽列表即不同于listeners設(shè)置的網(wǎng)卡地址及端口;如果沒有配置,會使用listeners的值 | string | null | | 高
advertised.port | 分發(fā)這個端口給所有的producer,consumer和其他broker來建立連接。如果此端口跟server綁定的端口不同,則才有必要設(shè)置。不推薦使用 只有當(dāng)advertised.listeners或listeners沒有設(shè)置時才有必要使用。 | int| null | | 高
auto.create.topics.enable | 是否允許自動創(chuàng)建topic。如果設(shè)為true,那么produce,consume或者fetch metadata一個不存在的topic時,就會自動創(chuàng)建一個默認replication factor和partition number的topic。| boolean| true | | 高
background.threads | 一些后臺任務(wù)處理的線程數(shù),例如過期消息文件的刪除等,一般情況下不需要去做修改 | int | 10 | | 高
broker.id | 每一個broker在集群中的唯一表示,要求是正數(shù)。當(dāng)該服務(wù)器的IP地址發(fā)生改變時,broker.id沒有變化,則不會影響consumers的消息情況。| int | -1 | | 高
compression.type | 指定topic的壓縮類型。除了支持'gzip', 'snappy', 'lz4'外,還支持"uncompressed(不壓縮)"以及producer(由producer來指定) | string | producer | | 高
delete.topic.enable | 是否啟動刪除topic。如果設(shè)置為false,你在刪除topic的時候無法刪除,但是會打上一個你將刪除該topic的標(biāo)記,等到你修改這一屬性的值為true后重新啟動Kafka集群的時候,集群自動將那些標(biāo)記刪除的topic刪除掉,對應(yīng)的log.dirs目錄下的topic目錄和數(shù)據(jù)也會被刪除。而將這一屬性設(shè)置為true之后,你就能成功刪除你想要刪除的topic了 | boolean | false || 高
auto.leader.rebalance.enable | 一個后臺線程會周期性的自動嘗試,為所有的broker的每個partition平衡leadership,使kafka的leader均衡。| boolean| true | | 高
leader.imbalance.check.interval.seconds | 檢查leader是否均衡的時間間隔(秒) | long | 300 | | 高
leader.imbalance.per.broker.percentage | 每個broker允許的不平衡的leader的百分比。如果每個broker超過了這個百分比,復(fù)制控制器會重新平衡leadership。| int | 10 | | 高
log.flush.interval.messages | 數(shù)據(jù)flush(sync)到硬盤前之前累積的消息條數(shù),因為磁盤IO操作是一個慢操作,但又是一個”數(shù)據(jù)可靠性"的必要手段,所以此參數(shù)的設(shè)置,需要在"數(shù)據(jù)可靠性"與"性能"之間做必要的權(quán)衡.如果此值過大,將會導(dǎo)致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導(dǎo)致"fsync"的次數(shù)較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導(dǎo)致沒有fsync的消息丟失 | long | 9223372036854775807 | | 高
log.flush.interval.ms | 當(dāng)達到下面的時間(ms)時,執(zhí)行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。| long | null | | 高
log.flush.offset.checkpoint.interval.ms | 記錄上次把log刷到磁盤的時間點的頻率,用來日后的recovery。通常不需要改變 | long | 60000 | | 高
log.flush.scheduler.interval.ms | 檢查是否需要固化到硬盤的時間間隔 | long | 9223372036854775807 | | 高
log.retention.bytes| topic每個分區(qū)的最大文件大小,一個topic的大小限制 = 分區(qū)數(shù)log.retention.bytes。-1沒有大小限log.retention.bytes和log.retention.minutes任意一個達到要求,都會執(zhí)行刪除,會被topic創(chuàng)建時的指定參數(shù)覆蓋 | loong | -1 || 高
log.retention.hours | 日志保存時間,默認為7天(168小時)。超過這個時間會根據(jù)policy處理數(shù)據(jù)。bytes和minutes無論哪個先達到都會觸發(fā) | int | 168 | | 高
log.retention.minutes | 數(shù)據(jù)存儲的最大時間超過這個時間會根據(jù)log.cleanup.policy設(shè)置的策略處理數(shù)據(jù),也就是消費端能夠多久去消費數(shù)據(jù)
log.retention.bytes和log.retention.minutes任意一個達到要求,都會執(zhí)行刪除,會被topic創(chuàng)建時的指定參數(shù)覆蓋 | int | null | | 高
log.roll.hous | 當(dāng)達到下面時間,會強制新建一個segment。這個參數(shù)會在日志segment沒有達到log.segment.bytes設(shè)置的大小,也會強制新建一個segment會 | int | 168 | | 高
log.roll.jitter.{ms,hours} | 從logRollTimeMillis抽離的jitter最大數(shù)目 | int | 0 || 高
log.segment.bytes | topic partition的日志存放在某個目錄下諸多文件中,這些文件將partition的日志切分成一段一段的;這個屬性就是每個文件的最大尺寸;當(dāng)尺寸達到這個數(shù)值時,就會創(chuàng)建新文件。此設(shè)置可以由每個topic基礎(chǔ)設(shè)置時進行覆蓋 | long | 1G=102410241024 | | 高
log.segment.delet.delay.ms | 刪除文件系統(tǒng)上文件的等待時間,默認是1分鐘 | long | 6000 | | 高
message.max.bytes | 表示一個服務(wù)器能夠接收處理的消息的最大字節(jié)數(shù),注意這個值producer和consumer必須設(shè)置一致,且不要大于fetch.message.max.bytes屬性的值該值默認是1000012字節(jié),大概900KB
| int | 1000012 | | 高
min.insync.replicas | 該屬性規(guī)定了最小的ISR數(shù)。當(dāng)producer設(shè)置request.required.acks為all或-1時,指定副本(replicas)的最小數(shù)目(必須確認每一個repica的寫數(shù)據(jù)都是成功的),如果這個數(shù)目沒有達到,producer會產(chǎn)生異常。| int | 1 || 高
num.io.threads | 服務(wù)器用來處理請求的I/O線程的數(shù)目;這個線程數(shù)目至少要等于硬盤的個數(shù)。| int | 8 | | 高
num.network.threads | 服務(wù)器用來處理網(wǎng)絡(luò)請求的網(wǎng)絡(luò)線程數(shù)目;一般你不需要更改這個屬性 | int | 3 || 高
num.recovery.threads.per.data.dir | 每數(shù)據(jù)目錄用于日志恢復(fù)啟動和關(guān)閉沖洗時的線程數(shù)量 | int | 1 | | 高
num.replica.fetchers | 從leader進行復(fù)制消息的線程數(shù),增大這個數(shù)值會增加follower的IO | int | 1 | | 高
offset.metadata.max.bytes | 允許client(消費者)保存它們元數(shù)據(jù)(offset)的最大的數(shù)據(jù)量 | int | 4096(4kb) | | | 高
offsets.commit.required.acks | 在offset commit可以接受之前,需要設(shè)置確認的數(shù)目,一般不需要更改 | int | -1 || 高
offsets.commit.timeout.ms | offset commit會延遲直至此超時或所需的副本數(shù)都收到offset commit,這類似于producer請求的超時 | int | 5000 | | 高
offsets.load.buffer.size | 此設(shè)置對應(yīng)于offset manager在讀取緩存offset segment的批量大小(以字節(jié)為單位). | int | 5242880 | | 高
offsets.retention.check.interval.ms | offset管理器檢查陳舊offsets的頻率 | long | 600000(10分鐘) | | 高
offsets.topic.num.partitions | 偏移的提交topic的分區(qū)數(shù)目。 由于目前不支持部署之后改變,我們建議您使用生產(chǎn)較高的設(shè)置(例如,100-200) | int | 50 | | 高
offsets.topic.replication.factor | 復(fù)制因子的offset提交topic。較高的設(shè)置(例如三個或四個),建議以確保更高的可用性。如果offset topic創(chuàng)建時,broker比復(fù)制因子少,offset topic將以較少的副本創(chuàng)建。| short | 3 | | 高
offsets.topic.segment.bytes | offset topic的Segment大小。因為它使用壓縮的topic,所有Sgment的大小應(yīng)該保持小一點,以促進更快的日志壓實和負載 | int | 104857600 || 高
queued.max.requests | 在網(wǎng)絡(luò)線程(network threads)停止讀取新請求之前,可以排隊等待I/O線程處理的最大請求個數(shù)。若是等待IO的請求超過這個數(shù)值,那么會停止接受外部消息 | int | 500 | | 高
quota.consumer.default | 以clientid或consumer group區(qū)分的consumer端每秒可以抓取的最大byte | long | 9223372036854775807 | | 高
quota.producer.default | producer端每秒可以產(chǎn)生的最大byte | long | 9223372036854775807 | | 高
replica.fetch.max.bytes | replicas每次獲取數(shù)據(jù)的最大字節(jié)數(shù) | int | 1048576 | | 高
replica.fetch.min.bytes | fetch的最小數(shù)據(jù)尺寸,如果leader中尚未同步的數(shù)據(jù)不足此值,將會阻塞,直到滿足條件 | int | 1 || 高
replica.fetch.wait.max.ms | replicas同leader之間通信的最大等待時間,失敗了會重試。這個值須小于replica.lag.time.max.ms,以防止低吞吐量主題ISR頻繁收縮 | int | 500 || 高
replica.high.watermark.checkpoint.interval.ms | 每一個replica存儲自己的high watermark到磁盤的頻率,用來日后的recovery | int | 5000|| 高
replica.socket.timeout.ms | 復(fù)制數(shù)據(jù)過程中,replica發(fā)送給leader的網(wǎng)絡(luò)請求的socket超時時間,至少等于replica.fetch.wait.max.ms | int | 30000 | | 高
replica.socket.receive.buffer.bytes | 復(fù)制過程leader接受請求的buffer大小 | int | 65536(641024) | | 高
replica.lag.time.max.ms | replicas響應(yīng)partition leader的最長等待時間,若是超過這個時間,就將replicas列入ISR(in-sync replicas),并認為它是死的,不會再加入管理中 | long | 10000 || 高
replica.lag.max.messages | 如果follower落后與leader太多,將會認為此follower[或者說partition relicas]已經(jīng)失效。</br> 通常,在follower與leader通訊時,因為網(wǎng)絡(luò)延遲或者鏈接斷開,總會導(dǎo)致replicas中消息同步滯后</br>如果消息之后太多,leader將認為此follower網(wǎng)絡(luò)延遲較大或者消息吞吐能力有限,將會把此replicas遷移到其他follower中.</br>在broker數(shù)量較少,或者網(wǎng)絡(luò)不足的環(huán)境中,建議提高此值. | int | 4000 || 高
request.timeout.ms | producer等待響應(yīng)的最長時間,如果超時將重發(fā)幾次,最終報錯 | int | 30000 || 高
socket.receive.buffer.bytes | socket用于接收網(wǎng)絡(luò)請求的緩存大小 | int | 102400 || 高
socket.request.max.bytes | server能接受的請求的最大的大小,這是為了防止server跑光內(nèi)存,不能大于Java堆的大小。 | int | 104857600(10010241024) || 高
socket.send.buffer.bytes | server端用來處理socket連接的SO_SNDBUFF緩沖大小 | int | 102400 || 高
controller.socket.timeout.ms | partition管理控制器進行備份時,socket的超時時間 | int | 30000 || 高
controller.message.queue.size | partition leader與replicas數(shù)據(jù)同步時,消息的隊列大小 | int | 10|| 高
num.partitions | 每個topic的分區(qū)個數(shù),若是在topic創(chuàng)建時候沒有指定的話會被topic創(chuàng)建時的指定參數(shù)覆蓋 | int | 1 | 推薦設(shè)為8 | 高
log.index.interval.bytes | 當(dāng)執(zhí)行一次fetch后,需要一定的空間掃描最近的offset,設(shè)置的越大越好,但是也更耗內(nèi)存一般使用默認值就可以 | int | 4096 || 中
log.index.size.max.bytes | 每個log segment的最大尺寸。注意,如果log尺寸達到這個數(shù)值,即使尺寸沒有超過log.segment.bytes限制,也需要產(chǎn)生新的log segment。| int | 10485760 || 中
fetch.purgatory.purge.interval.requests | 非立即答復(fù)請求放入purgatory中,當(dāng)?shù)竭_或超出interval時認為request complete | int | 1000 || 中
producer.purgatory.purge.interval.requests | producer請求清除時間 | int | 1000 ||中
default.replication.factor | 一個topic ,默認分區(qū)的replication個數(shù) ,不能大于集群中broker的個數(shù)。| int | 1||中
group.max.session.timeout.ms | 注冊consumer允許的最大超時時間 | int | 300000 | |中
group.min.session.timeout.ms | 注冊consumer允許的最小超時時間 | int | 6000 | |中
inter.broker.protocol.version | broker協(xié)議版本 | string | 0.10.0 | |中
log.cleaner.backoff.ms | 檢查log是否需要clean的時間間隔 | long | 15000 | |中
log.cleaner.dedupe.buffer.size | 日志壓縮去重時候的緩存空間,在空間允許的情況下,越大越好 | long | 134217728 | |中
log.cleaner.delete.retention.ms |保存時間;保存壓縮日志的最長時間;也是客戶端消費消息的最長時間,同log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù),一個控制壓縮后的數(shù)據(jù);會被topic創(chuàng)建時的指定時間覆蓋。| long | 86400000(一天) | |中
log.cleaner.enable | 是否啟動壓縮日志,當(dāng)這個屬性設(shè)置為false時,一旦日志的保存時間或者大小達到上限時,就會被刪除;如果設(shè)置為true,則當(dāng)保存屬性達到上限時,就會進行壓縮 | boolean | false || 中
log.cleaner.threads | 日志壓縮運行的線程數(shù) | int | 1 || 中
log.cleaner.io.buffer.load.factor | 日志清理中hash表的擴大因子,一般不需要修改 | double | 0.9 || 中
log.cleaner.io.buffer.size | log cleaner清除過程中針對日志進行索引化以及精簡化所用到的緩存大小。最好設(shè)置大點,以提供充足的內(nèi)存 | int | 524288|| 中
log.cleaner.io.max.bytes.per.second | 進行l(wèi)og compaction時,log cleaner可以擁有的最大I/O數(shù)目。這項設(shè)置限制了cleaner,以避免干擾活動的請求服務(wù)。 | double | 1.7976931348623157E308|| 中
log.cleaner.min.cleanable.ratio | 這項配置控制log compactor試圖清理日志的頻率(假定[log compaction]是打開的)。默認避免清理壓縮超過50%的日志。這個比率綁定了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%)。更高的比率則意味著浪費消耗更少,也就可以更有效的清理更多的空間。這項設(shè)置在每個topic設(shè)置中可以覆蓋 | double |0.5 || 中
log.preallocate | 是否預(yù)創(chuàng)建新文件,windows推薦使用 | boolean | false || 中
log.retention.check.interval.ms | 檢查日志分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。 | long | 300000 || 中
max.connections.per.ip | 一個broker允許從每個ip地址連接的最大數(shù)目 | int | 2147483647=Int.MaxValue || 中
max.connections.per.ip.overrides | 每個IP或主機名覆蓋連接的默認最大數(shù)量 | string | "" || 中
replica.fetch.backoff.ms | 復(fù)制數(shù)據(jù)時失敗等待時間 | int | 1000 || 中
reserved.broker.max.id | broker可以使用的最大ID值 | int |1000|| 中
topic level 配置
broker級別的參數(shù)可以由topic級別的覆寫,不是所有的broker參數(shù)在topic級別都有對應(yīng)值
以下是topic-level的配置選項。server的默認配置在Server Default Property列下給出了,設(shè)定這些默認值不會改變原有的設(shè)置
| Property | Default | Server Default Property | Description |
|---|---|---|---|
| cleanup.policy | delete | log.cleanup.policy | 要么是”delete“要么是”compact“; 這個字符串指明了針對舊日志部分的利用方式;默認方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達時。”compact“將會進行日志壓縮 |
| delete.retention.ms | 86400000 (24 hours) | log.cleaner.delete.retention.ms | 對于壓縮日志保留的最長時間,也是客戶端消費消息的最長時間,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù),一個控制壓縮后的數(shù)據(jù)。此項配置可以在topic創(chuàng)建時的置頂參數(shù)覆蓋 |
| flush.messages | none | log.flush.interval.messages | 此項配置指定時間間隔:強制進行fsync日志。例如,如果這個選項設(shè)置為1,那么每條消息之后都需要進行fsync,如果設(shè)置為5,則每5條消息就需要進行一次fsync。一般來說,建議你不要設(shè)置這個值。此參數(shù)的設(shè)置,需要在"數(shù)據(jù)可靠性"與"性能"之間做必要的權(quán)衡.如果此值過大,將會導(dǎo)致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導(dǎo)致"fsync"的次數(shù)較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導(dǎo)致沒有fsync的消息丟失. |
| flush.ms | None | log.flush.interval.ms | 此項配置用來置頂強制進行fsync日志到磁盤的時間間隔;例如,如果設(shè)置為1000,那么每1000ms就需要進行一次fsync。一般不建議使用這個選項 |
| index.interval.bytes | 4096 | log.index.interval.bytes | 默認設(shè)置保證了我們每4096個字節(jié)就對消息添加一個索引,更多的索引使得閱讀的消息更加靠近,但是索引規(guī)模卻會由此增大;一般不需要改變這個選項 |
| max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最大尺寸。注意如果你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的消息。 |
| min.cleanable.dirty.ratio | 0.5 | in.cleanable.dirty.ratio | 此項配置控制log壓縮器試圖進行清除日志的頻率。默認情況下,將避免清除壓縮率超過50%的日志。這個比率避免了最大的空間浪費 |
| min.insync.replicas | 1 | min.insync.replicas | 當(dāng)producer設(shè)置 acks為-1時,min.insync.replicas指定replicas的最小數(shù)目(必須確認每一個repica的寫數(shù)據(jù)都是成功的),如果這個數(shù)目沒有達到,producer會產(chǎn)生異常。 |
| retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策略,這項配置就是指在刪除日志之前,日志所能達到的最大尺寸。默認情況下,沒有尺寸限制而只有時間限制 |
| retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策略,這項配置就是指刪除日志前日志保存的時間。 |
| segment.bytes | 1GB | log.segment.bytes | kafka中l(wèi)og日志是分成一塊塊存儲的,此配置是指log日志劃分成塊的大小 |
| segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有關(guān)offsets和文件位置之間映射的索引文件的大小;一般不需要修改這個配置 |
| segment.ms | 7 days | log.roll.hours | 即使log的分塊(segment)文件沒有達到需要刪除、壓縮的大小,一旦log 的時間達到這個上限,就會強制新建一個log分塊文件 |
| segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |