在看很多講kafka的文章里面都會說:kafka只保證單個partition的有序性,那么kafka是怎么保證有序的喃?
使用RecordAccumulator的mutePartition和unmutePartition方法來配合實現(xiàn)有序性
//記錄tp是否還有未完成的RecordBatch,保證一個tp的順序性,當一個tp對應的RecordBatch要開始發(fā)送時,就將此tp加入到muted中,tp對應的RecordBatch發(fā)送完成后,刪除muted中的tp
private final Set muted;
public void mutePartition(TopicPartition tp) { muted.add(tp); }
public void unmutePartition(TopicPartition tp) { muted.remove(tp); }
RecordAccumulator.ready方法中進行判斷(偽代碼)
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
if (!readyNodes.contains(leader) && !muted.contains(part)) {}
}
if (!readyNodes.contains(leader) && !muted.contains(part)),如果muted中包含了這個tp,那么即使這個tp對應的leader存在,RecordBatch可以發(fā)送也不會去發(fā)送它,因為它上一個RecordBatch還沒有處理完成。
RecordAccumulator.drain方法中進行判斷(偽代碼)
public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {
if (!muted.contains(tp)){}
}
if (!muted.contains(tp))在對RecordAccumulator中的記錄進行重新組裝的時候,依舊會判斷對應的tp是否在muted中。在muted中的依舊不會選擇出來發(fā)送。
在Sender中的變量:guaranteeMessageOrder:是否保持單個partition的有序性
在KafkaProducer的構(gòu)造中
this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);
public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }
guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1
我們可以在使用的時候設(shè)置max.in.flight.requests.per.connection來設(shè)置guaranteeMessageOrder的值。
mutePartition和unmutePartition方法都是在Sender中進行調(diào)用
mutePartition在Sender.run中調(diào)用
if (guaranteeMessageOrder) {
// 記錄將要發(fā)送的topicPartition到mute中
for (List batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
發(fā)送的時候,把將要提交的RecordBatch的tp加到muted中。下次再需要發(fā)送tp里的RecordBatch的時候,如果muted里面包含了此tp,就不會選擇出來發(fā)送。
在處理服務端響應的時候,清除muted中的tp
if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);
總結(jié):要保證單partition的有序性,需要配置max.in.flight.requests.per.connection=1。