kafka dynamic config

??在kafka 1.1版本增加了dynamic config,即這些配置可以熱生效,無需重啟broker。
??支持的參數(shù)如下:

For entity-type 'topics':
        cleanup.policy
        compression.type
        delete.retention.ms
        file.delete.delay.ms
        flush.messages
        flush.ms
        follower.replication.throttled.replicas
        index.interval.bytes
        leader.replication.throttled.replicas
        max.compaction.lag.ms
        max.message.bytes
        message.downconversion.enable
        message.format.version
        message.timestamp.difference.max.ms
        message.timestamp.type
        min.cleanable.dirty.ratio
        min.compaction.lag.ms
        min.insync.replicas
        preallocate
        retention.bytes
        retention.ms
        segment.bytes
        segment.index.bytes
        segment.jitter.ms
        segment.ms
        unclean.leader.election.enable
For entity-type 'brokers':
        advertised.listeners
        background.threads
        compression.type
        follower.replication.throttled.rate
        leader.replication.throttled.rate
        listener.security.protocol.map
        listeners
        log.cleaner.backoff.ms
        log.cleaner.dedupe.buffer.size
        log.cleaner.delete.retention.ms
        log.cleaner.io.buffer.load.factor
        log.cleaner.io.buffer.size
        log.cleaner.io.max.bytes.per.second
        log.cleaner.max.compaction.lag.ms
        log.cleaner.min.cleanable.ratio
        log.cleaner.min.compaction.lag.ms
        log.cleaner.threads
        log.cleanup.policy
        log.flush.interval.messages
        log.flush.interval.ms
        log.index.interval.bytes
        log.index.size.max.bytes
        log.message.downconversion.enable
        log.message.timestamp.difference.max.ms
        log.message.timestamp.type
        log.preallocate
        log.retention.bytes
        log.retention.ms
        log.roll.jitter.ms
        log.roll.ms
        log.segment.bytes
        log.segment.delete.delay.ms
        max.connections
        max.connections.per.ip
        max.connections.per.ip.overrides
        message.max.bytes
        metric.reporters
        min.insync.replicas
        num.io.threads
        num.network.threads
        num.recovery.threads.per.data.dir
        num.replica.fetchers
        principal.builder.class
        replica.alter.log.dirs.io.max.bytes.per.second
        sasl.enabled.mechanisms
        sasl.jaas.config
        sasl.kerberos.kinit.cmd
        sasl.kerberos.min.time.before.relogin
        sasl.kerberos.principal.to.local.rules
        sasl.kerberos.service.name
        sasl.kerberos.ticket.renew.jitter
        sasl.kerberos.ticket.renew.window.factor
        sasl.login.refresh.buffer.seconds
        sasl.login.refresh.min.period.seconds
        sasl.login.refresh.window.factor
        sasl.login.refresh.window.jitter
        sasl.mechanism.inter.broker.protocol
        smart.extend.enable
        ssl.cipher.suites
        ssl.client.auth
        ssl.enabled.protocols
        ssl.endpoint.identification.algorithm
        ssl.key.password
        ssl.keymanager.algorithm
        ssl.keystore.location
        ssl.keystore.password
        ssl.keystore.type
        ssl.protocol
        ssl.provider
        ssl.secure.random.implementation
        ssl.trustmanager.algorithm
        ssl.truststore.location
        ssl.truststore.password
        ssl.truststore.type
        unclean.leader.election.enable
For entity-type 'users':
        SCRAM-SHA-256
        SCRAM-SHA-512
        consumer_byte_rate
        producer_byte_rate
        request_percentage
For entity-type 'clients':
        consumer_byte_rate
        producer_byte_rate
        request_percentage
Entity types 'users' and 'clients' may
      be specified together to update
      config for clients of a specific
      user.

??那么具體是什么怎么實現(xiàn)的呢?

修改配置流程

??通過命令行發(fā)送AlterConfigsRequest請求到server端。server端使用handleAlterConfigsRequest進行處理,然后依次調(diào)用方法adminManager.alterConfigs-->alterBrokerConfigs-->changeBrokerConfig--> zk.changeEntityConfig將要修改的配置設置到對應的zk節(jié)點下

修改后broker觸發(fā)生效流程

??動態(tài)配置主要實現(xiàn)在DynamicConfigManager類, 在broker啟動時,會調(diào)用DynamicConfigManager.startup方法,Begin watching for config changes。
??接“修改配置流程”,相關zk節(jié)點變更后,會觸發(fā)ZkNodeChangeNotificationListener--->processNotification-->processEntityConfigChangeVersion,監(jiān)聽到變動后,進行相應的處理。會調(diào)用BrokerConfigHandler、ClientIdConfigHandler、TopicConfigHandler、UserConfigHandler四個handler的processConfigChanges-->updateBrokerConfig-->updateCurrentConfig。
??至此配置已經(jīng)check完畢,分為(oldConfig, newConfig),下一步進行配置的更新,調(diào)用BrokerReconfigurable.reconfigure(oldConfig, newConfig)。
??BrokerReconfigurable具體有5種實現(xiàn)類


BrokerReconfigurable
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容