??在kafka 1.1版本增加了dynamic config,即這些配置可以熱生效,無需重啟broker。
??支持的參數(shù)如下:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
smart.extend.enable
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
producer_byte_rate
request_percentage
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
??那么具體是什么怎么實現(xiàn)的呢?
修改配置流程
??通過命令行發(fā)送AlterConfigsRequest請求到server端。server端使用handleAlterConfigsRequest進行處理,然后依次調(diào)用方法adminManager.alterConfigs-->alterBrokerConfigs-->changeBrokerConfig--> zk.changeEntityConfig將要修改的配置設置到對應的zk節(jié)點下
修改后broker觸發(fā)生效流程
??動態(tài)配置主要實現(xiàn)在DynamicConfigManager類, 在broker啟動時,會調(diào)用DynamicConfigManager.startup方法,Begin watching for config changes。
??接“修改配置流程”,相關zk節(jié)點變更后,會觸發(fā)ZkNodeChangeNotificationListener--->processNotification-->processEntityConfigChangeVersion,監(jiān)聽到變動后,進行相應的處理。會調(diào)用BrokerConfigHandler、ClientIdConfigHandler、TopicConfigHandler、UserConfigHandler四個handler的processConfigChanges-->updateBrokerConfig-->updateCurrentConfig。
??至此配置已經(jīng)check完畢,分為(oldConfig, newConfig),下一步進行配置的更新,調(diào)用BrokerReconfigurable.reconfigure(oldConfig, newConfig)。
??BrokerReconfigurable具體有5種實現(xiàn)類
