Consumer 與 partition
其實topic中的partition被分配到某個consumer上,也就是某個consumer訂閱了某個partition,而不是message。所以在同一時間點上,訂閱到同一個partition的consumer必然屬于不同的Consumer Group。
在官方網(wǎng)站上,給出了這樣一張圖:
一個kafka cluster中的某個topic,有4個partition。有兩個consumer group (A and B)訂閱了該topic。
Consumer Group A有2個partition:p0、p1,Consumer Group B有4個partition:c3,c4,c5,c6。
經(jīng)過分區(qū)分配后,consumer與partition的訂閱關(guān)系如下:
Topic 中的4個partition在Consumer Group A中的分配情況如下:
C1 訂閱p0,p3
C2 訂閱p1,p2
Topic 中的4個partition在Consumer Group B中的分配情況如下:
C3 訂閱p0
C4 訂閱p3
C5 訂閱p1
C6 訂閱p2
New Consumer Configs
| NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
|---|---|---|---|---|---|
| bootstrap.servers | 用于建立到Kafka集群的初始連接的主機/端口對列表??蛻舳藢⑹褂盟蟹?wù)器,而不管在這里指定哪些服務(wù)器用于引導(dǎo) - 該列表僅影響用于發(fā)現(xiàn)全套服務(wù)器的初始主機。這個清單應(yīng)該在表格中host1:port1,host2:port2,...。由于這些服務(wù)器僅用于初始連接以發(fā)現(xiàn)完整的群集成員資格(可能會動態(tài)更改),因此此列表不必包含整套服務(wù)器(但可能需要多個服務(wù)器,以防服務(wù)器關(guān)閉) 。 | list | high | ||
| key.deserializer | 實現(xiàn)org.apache.kafka.common.serialization.Deserializer接口的密鑰的反序列化器類。 | class | high | ||
| value.deserializer | 用于實現(xiàn)org.apache.kafka.common.serialization.Deserializer接口的值的反序列化器類。 | class | high | ||
| fetch.min.bytes | 服務(wù)器為獲取請求返回的最小數(shù)據(jù)量。如果沒有足夠的數(shù)據(jù)可用,請求將等待那么多的數(shù)據(jù)在應(yīng)答請求之前積累。1字節(jié)的默認設(shè)置意味著只要有一個字節(jié)的數(shù)據(jù)可用,或者提取請求超時等待數(shù)據(jù)到達,就會立即應(yīng)答提取請求。將其設(shè)置為大于1的值將導(dǎo)致服務(wù)器等待大量的數(shù)據(jù)累積,這可以稍稍提高服務(wù)器吞吐量,但需要花費一些額外的延遲時間。 當consumer向一個broker發(fā)起fetch請求時,broker返回的records的大小最小值。如果broker中數(shù)據(jù)量不夠的話會wait,直到數(shù)據(jù)大小滿足這個條件。取值范圍是:[0, Integer.Max],默認值是1。默認值設(shè)置為1的目的是:使得consumer的請求能夠盡快的返回。 |
int | 1 | [0,...] | high |
| group.id | 標識此消費者所屬的消費者群組的唯一字符串。如果消費者通過使用subscribe(topic)基于卡夫卡的偏移量管理策略來使用組管理功能,則此屬性是必需的 | string | "" | high | |
| heartbeat.interval.ms | 心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。這個值必須設(shè)置的小于session.timeout.ms,因為:當Consumer由于某種原因不能發(fā)Heartbeat到coordinator時,并且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內(nèi)的其它的consumer上。通常設(shè)置的值要低于session.timeout.ms的1/3。 | int | 3000 | high | |
| max.partition.fetch.bytes | 一次fetch請求,從一個partition中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。 broker、topic都會對producer發(fā)給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。 | int | 1048576 | [0,...] | high |
| session.timeout.ms | 使用Kafka的組管理設(shè)施時,用于檢測消費者失敗的超時。消費者定期發(fā)送心跳來向經(jīng)紀人表明其活躍度。如果代理在該會話超時到期之前沒有收到心跳,那么代理將從該組中刪除該消費者并啟動重新平衡。請注意,該值必須在允許的范圍內(nèi) Consumer session 過期時間。這個值必須設(shè)置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。 |
int | 10000 | high | |
| ssl.key.password | 密鑰存儲文件中的私鑰密碼。這對于客戶端是可選的。 | password | null | high | |
| ssl.keystore.location | 密鑰存儲文件的位置。這對客戶端是可選的,可以用于客戶端的雙向認證。 | string | null | high | |
| ssl.keystore.password | 密鑰存儲文件的商店密碼。這對客戶端是可選的,只有在配置了ssl.keystore.location時才需要。 | password | null | high | |
| ssl.truststore.location | 信任存儲文件的位置。 | string | null | high | |
| ssl.truststore.password | 信任存儲文件的密碼。如果密碼未設(shè)置,信任庫的訪問仍然可用,但完整性檢查被禁用。 | password | null | high | |
| auto.offset.reset | 這個配置項,是告訴Kafka Broker在發(fā)現(xiàn)kafka在沒有初始offset,或者當前的offset是一個不存在的值(如果一個record被刪除,就肯定不存在了)時,該如何處理。它有4種處理方式: ● earliest: 自動將偏移量重置為最早的偏移量 ● latest:自動將偏移量重置為最新的偏移量 ● none: 如果邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發(fā)現(xiàn)有這樣的offset。 ● anything else: 如果不是上述3種,只拋出異常給consumer。 |
string | latest | [latest, earliest, none] | medium |
| connections.max.idle.ms | 連接空閑超時時間。因為consumer只與broker有連接(coordinator也是一個broker),所以這個配置的是consumer到broker之間的。 | long | 540000 | medium | |
| enable.auto.commit | Consumer 在commit offset時有兩種模式:自動提交,手動提交。手動提交在前面已經(jīng)說過。自動提交:是Kafka Consumer會在后臺周期性的去commit。默認值是true。 | boolean | true | medium | |
| exclude.internal.topics | 內(nèi)部主題(如偏移)的記錄是否應(yīng)該暴露給消費者。如果設(shè)置為true從內(nèi)部主題接收記錄的唯一方法是訂閱它 | boolean | true | medium | |
| fetch.max.bytes | 一次fetch請求,從一個broker中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。 broker、topic都會對producer發(fā)給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。 | int | 52428800 | [0,...] | medium |
| isolation.level | 控制如何閱讀事務(wù)處理的消息。如果設(shè)置為read_committed,consumer.poll()將僅返回已提交的事務(wù)消息。如果設(shè)置為read_uncommitted'(默認),consumer.poll()將返回所有的消息,甚至是已經(jīng)中止的事務(wù)消息。非交易消息將在任一模式下無條件返回。 消息將始終以偏移順序返回。因此,在 read_committed模式下,consumer.poll()將只返回到最后一個穩(wěn)定偏移量(LSO)的消息,這比第一個打開事務(wù)的偏移量小。特別是在屬于正在進行的交易的消息之后出現(xiàn)的任何消息將被扣留,直到相關(guān)的交易完成。因此,read_committed消費者在飛行交易中將無法讀取高水印。 而且,當進入時 read_committed the seekToEnd method will return the LSO |
string | read_uncommitted | [read_committed, read_uncommitted] | medium |
| max.poll.interval.ms | 在使用消費者組管理時,調(diào)用poll()之間的最大延遲。這提出了消費者在獲取更多記錄之前可以閑置的時間量的上界。如果在此超時到期之前未調(diào)用poll(),則認為使用者失敗,并且組將重新平衡以將分區(qū)重新分配給其他成員。 | int | 300000 | [1,...] | medium |
| max.poll.records | 在一次調(diào)用poll()中返回的最大記錄數(shù)。 | int | 500 | [1,...] | medium |
| partition.assignment.strategy | 當使用組管理時,客戶端將用于在客戶實例之間分配分區(qū)所有權(quán)的分區(qū)分配策略的類名 | list | class org.apache.kafka.clients.consumer.RangeAssignor | medium | |
| receive.buffer.bytes | 讀取數(shù)據(jù)時使用的TCP接收緩沖區(qū)(SO_RCVBUF)的大小。如果值為-1,則將使用操作系統(tǒng)默認值。 | int | 65536 | [-1,...] | medium |
| request.timeout.ms | 配置控制客戶端等待請求響應(yīng)的最長時間。如果在超時過去之前未收到響應(yīng),則客戶端將在必要時重新發(fā)送請求,或者如果重試耗盡,則請求失敗。 | int | 305000 | [0,...] | medium |
| sasl.jaas.config | 用于JAAS配置文件使用的格式的SASL連接的JAAS登錄上下文參數(shù)。 這里描述JAAS配置文件格式。 值的格式是:'(=)*;' | password | null | medium | |
| sasl.kerberos.service.name | Kafka運行的Kerberos主體名稱。這可以在Kafka的JAAS配置或Kafka的配置中定義。 | string | null | medium | |
| sasl.mechanism | 用于客戶端連接的SASL機制。這可能是安全提供者可用的任何機制。GSSAPI是默認的機制。 | string | GSSAPI | medium | |
| security.protocol | 用于與經(jīng)紀人溝通的協(xié)議。有效值為:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | medium | |
| send.buffer.bytes | 發(fā)送數(shù)據(jù)時要使用的TCP發(fā)送緩沖區(qū)(SO_SNDBUF)的大小。如果值為-1,則將使用操作系統(tǒng)默認值。 | int | 131072 | [-1,...] | medium |
| ssl.enabled.protocols | 啟用了SSL連接的協(xié)議列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
| ssl.keystore.type | 密鑰存儲文件的文件格式。這對于客戶端是可選的。 | string | JKS | medium | |
| ssl.protocol | 用于生成SSLContext的SSL協(xié)議。默認設(shè)置是TLS,在大多數(shù)情況下這是很好的。最近的JVM中允許的值是TLS,TLSv1.1和TLSv1.2。較舊的JVM中可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不鼓勵使用SSL。 | string | TLS | medium | |
| ssl.provider | 用于SSL連接的安全提供程序的名稱。默認值是JVM的默認安全提供程序。 | string | null | medium | |
| ssl.truststore.type | 信任存儲文件的文件格式。 | string | JKS | medium | |
| auto.commit.interval.ms | 自動提交間隔。范圍:[0,Integer.MAX],默認值是 5000 (5 s) 消費者偏移的頻率以毫秒為單位自動提交給Kafka,如果enable.auto.commit設(shè)置為true。 |
int | 5000 | [0,...] | low |
| check.crcs | 自動檢查消耗的記錄的CRC32。這可以確保沒有在線或磁盤損壞的消息發(fā)生。這個檢查會增加一些開銷,所以在尋求極高性能的情況下可能會被禁用。 | boolean | true | low | |
| client.id | 發(fā)出請求時傳遞給服務(wù)器的id字符串。這樣做的目的是通過允許在服務(wù)器端請求日志中包含一個邏輯應(yīng)用程序的名字來跟蹤請求的來源,而不僅僅是ip / port。 | string | "" | low | |
| fetch.max.wait.ms | Fetch請求發(fā)給broker后,在broker中可能會被阻塞的(當topic中records的總size小于fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。 | int | 500 | [0,...] | low |
| interceptor.classes | 用作攔截器的類的列表。實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口允許你攔截(也可能是變異)消費者收到的記錄。默認情況下,沒有攔截器。 | list | null | low | |
| metadata.max.age.ms | 以毫秒為單位的時間段之后,即使我們沒有看到任何分區(qū)領(lǐng)導(dǎo)變化,以主動發(fā)現(xiàn)任何新的代理或分區(qū),我們強制更新元數(shù)據(jù)。 | long | 300000 | [0,...] | low |
| metric.reporters | 用作度量記錄的類的列表。實現(xiàn)org.apache.kafka.common.metrics.MetricsReporter接口允許插入將被通知新度量創(chuàng)建的類。JmxReporter始終包含在注冊JMX統(tǒng)計信息中。 | list | "" | low | |
| metrics.num.samples | 維持用于計算度量的樣本數(shù)量。 | int | 2 | [1,...] | low |
| metrics.recording.level | 指標的最高記錄級別。 | string | INFO | [INFO, DEBUG] | low |
| metrics.sample.window.ms | 計算指標樣本的時間窗口。 | long | 30000 | [0,...] | low |
| reconnect.backoff.max.ms | 重新連接到重復(fù)連接失敗的代理程序時要等待的最長時間(以毫秒為單位)。如果提供的話,每個主機的退避將以指數(shù)方式增加,對于每個連續(xù)的連接失敗,達到這個最大值。計算后退增加后,增加20%隨機抖動以避免連接風(fēng)暴。 | long | 1000 | [0,...] | low |
| reconnect.backoff.ms | 嘗試重新連接到給定主機之前等待的基本時間。這避免了在一個緊密的循環(huán)中重復(fù)連接到主機。該退避適用于客戶端向經(jīng)紀人的所有連接嘗試。 | long | 50 | [0,...] | low |
| retry.backoff.ms | 嘗試重試對給定主題分區(qū)的失敗請求之前等待的時間量。這樣可以避免在某些故障情況下重復(fù)發(fā)送請求。 | long | 100 | [0,...] | low |
| sasl.kerberos.kinit.cmd | Kerberos kinit命令路徑。 | string | /usr/bin/kinit | low | |
| sasl.kerberos.min.time.before.relogin | 登錄線程在刷新嘗試之間的休眠時間 | long | 60000 | low | |
| sasl.kerberos.ticket.renew.jitter | 隨機抖動增加到更新時間的百分比。 | double | 0.05 | low | |
| sasl.kerberos.ticket.renew.window.factor | 登錄線程將休眠,直到已經(jīng)到達從上次刷新到票證到期的指定窗口時間因子,屆時它將嘗試更新票證。 | double | 0.8 | low | |
| ssl.cipher.suites | 密碼套件列表。這是用于使用TLS或SSL網(wǎng)絡(luò)協(xié)議來協(xié)商網(wǎng)絡(luò)連接的安全設(shè)置的認證,加密,MAC和密鑰交換算法的命名組合。默認情況下,所有可用的密碼套件都受支持。 | list | null | low | |
| ssl.endpoint.identification.algorithm | 使用服務(wù)器證書驗證服務(wù)器主機名的端點識別算法。 | string | null | low | |
| ssl.keymanager.algorithm | 密鑰管理器工廠用于SSL連接的算法。默認值是為Java虛擬機配置的密鑰管理器工廠算法。 | string | SunX509 | low | |
| ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG實現(xiàn)。 | string | null | low | |
| ssl.trustmanager.algorithm | 信任管理器工廠用于SSL連接的算法。默認值是為Java虛擬機配置的信任管理器工廠算法。 | string | PKIX | low |