消息系統(tǒng)的用戶從更嚴格的冪等生產(chǎn)者語義中獲益良多,即每個消息寫將被精確地持久化一次,沒有重復(fù)和數(shù)據(jù)丟失——即使在客戶端重試或代理失敗的情況下也是如此。這些更強的語義不僅使編寫應(yīng)用程序更容易,而且擴展了可以使用給定消息傳遞系統(tǒng)的應(yīng)用程序的空間。
然而,冪等Producer并不為跨多個主題分區(qū)的寫提供保證。為此,需要更強的事務(wù)保證。能夠自動寫入多個主題分區(qū)。
在原子性上,我們指的是跨topicpartition將一組消息作為一個單元提交的能力:要么提交所有消息,要么不提交
流處理應(yīng)用程序是“consu -transform- production”任務(wù)的管道,當流的重復(fù)處理不可接受時,絕對需要事務(wù)保證。因此,將事務(wù)保證添加到Kafka(一個流平臺)使其不僅對流處理更有用,而且對其他各種應(yīng)用程序也更有用。
在本文檔中,我們提出了將事務(wù)引入Kafka的建議。我們將只關(guān)注面臨變化的用戶:客戶端API的變化,我們將引入的新配置,以及保證的總結(jié)。我們還概述了基本的數(shù)據(jù)流,它總結(jié)了我們將在事務(wù)中引入的所有新rpc。設(shè)計細節(jié)在單獨的文檔中給出。
簡單介紹一下Transaction和Streams
在上一節(jié)中,我們提到事務(wù)的主要動機是在Kafka流中只啟用一次處理。我們有必要再深入研究一下這個用例,
回想一下,使用Kafka Streams的數(shù)據(jù)轉(zhuǎn)換通常通過多個stream processors進行,每個處理器由Kafka主題連接。這個設(shè)置被稱為流拓撲,基本上是一個DAG,其中流處理器是節(jié)點,連接Kafka主題是頂點。這種模式是所有流架構(gòu)的典型模式。您可以在這里閱讀更多關(guān)于Kafka streams架構(gòu)的內(nèi)容。
因此,Kafka流的事務(wù)本質(zhì)上將包含輸入消息、本地狀態(tài)存儲的更新和輸出消息。在事務(wù)中包含輸入偏移量會促使將“sendOffsets”API添加到生產(chǎn)者接口,如下所述。進一步的細節(jié)將在單獨的KIP中呈現(xiàn)。
Public Interfaces
Producer API changes
生產(chǎn)者將獲得5個新方法(initTransactions、beginTransaction、sendoffset、commitTransaction、abortTransaction),并更新send方法以拋出一個新的異常。詳情如下:
public interface Producer<K,V> extends Closeable {
/**
* Needs to be called before any of the other transaction methods. Assumes that
* the transactional.id is specified in the producer configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer
* are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* @throws IllegalStateException if the TransactionalId for the producer is not set
* in the configuration.
*/
void initTransactions() throws IllegalStateException;
/**
* Should be called before the start of each new transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void beginTransaction() throws ProducerFencedException;
/**
* Sends a list of consumed offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* consumed only if the transaction is committed successfully.
*
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/**
* Commits the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void commitTransaction() throws ProducerFencedException;
/**
* Aborts the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void abortTransaction() throws ProducerFencedException;
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
*
* @param record The record to send
* @return A future which will eventually contain the response information
*
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}
The OutOfOrderSequence Exception
如果代理檢測到數(shù)據(jù)丟失,生產(chǎn)者將引發(fā)OutOfOrderSequenceException。換句話說,如果它接收到的序列號大于它所期望的序列號。這個異常將在將來返回并傳遞給回調(diào)(如果有的話)。這是一個致命的異常,以后對產(chǎn)生器方法(如send、beginTransaction、commitTransaction等)的調(diào)用將引發(fā)IlegalStateException
public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Note that the ‘transactional.id’ configuration _must_ be specified in the
// producer config in order to use transactions.
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
// We need to initialize transactions once per producer instance. To use transactions,
// it is assumed that the application id is specified in the config with the key
// transactional.id.
//
// This method will recover or abort transactions initiated by previous instances of a
// producer with the same app id. Any other transactional messages will report an error
// if initialization was not performed.
//
// The response indicates success or failure. Some failures are irrecoverable and will
// require a new producer instance. See the documentation for TransactionMetadata for a
// list of error codes.
producer.initTransactions();
while(true) {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
// Start a new transaction. This will begin the process of batching the consumed
// records as well
// as an records produced as a result of processing the input records.
//
// We need to check the response to make sure that this producer is able to initiate
// a new transaction.
producer.beginTransaction();
// Process the input records and send them to the output topic(s).
List<ProducerRecord<String, String>> outputRecords = processRecords(records);
for (ProducerRecord<String, String> outputRecord : outputRecords) {
producer.send(outputRecord);
}
// To ensure that the consumed and produced messages are batched, we need to commit
// the offsets through
// the producer and not the consumer.
//
// If this returns an error, we should abort the transaction.
sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
// Now that we have consumed, processed, and produced a batch of messages, let's
// commit the results.
// If this does not report success, then the transaction will be rolled back.
producer.endTransaction();
}
}
}
}
New Configurations
Broker configs
- transactional.id.timeout.ms
transaction coordinator主動終止 producer transactionalId之前等待的ms中的最大時間量(該事務(wù)協(xié)調(diào)器沒有收到來自它的任何事務(wù)狀態(tài)更新)
默認值為604800000(7天)。 - max.transaction.timeout.ms
事務(wù)允許的最大超時。如果客戶端請求的事務(wù)時間超過這個值,那么代理將在InitPidRequest中返回InvalidTransactionTimeout錯誤。這可以防止客戶端超時過大,這會阻止用戶從事務(wù)中包含的主題進行讀取。
默認值是900000(15分鐘)。這是需要發(fā)送消息事務(wù)的時間段的保守上限。
Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.
transaction.state.log.replication.factor
事務(wù)狀態(tài)主題的副本數(shù)。
Default: 3transaction.state.log.num.partitions
事務(wù)狀態(tài)主題的分區(qū)數(shù)。
Default: 50transaction.state.log.min.isr
線事務(wù)狀態(tài)主題的每個分區(qū)的insync副本的最小數(shù)量。
Default: 2transaction.state.log.segment.bytes
事務(wù)狀態(tài)主題的段大小
Default: 104857600 bytes.
Producer configs
- enable.idempotence
是否啟用等冪性(默認為false)。如果禁用,生產(chǎn)者將不會在produce請求中設(shè)置PID字段,并且當前生產(chǎn)者交付語義將生效。注意,必須啟用冪等性才能使用事務(wù)。
當啟用idempotence時,我們強制acks=all,retries > 1,并且max.inflight. request.per.connection =1。如果這些配置沒有這些值,我們就不能保證冪等性。如果應(yīng)用程序沒有顯式地覆蓋這些設(shè)置,當啟用冪等性時,生產(chǎn)者將設(shè)置
acks=all, retries=Integer.MAX_VALUE,max.inflight.requests.per.connection=1
transaction.timeout.ms
事務(wù)協(xié)調(diào)器在主動終止正在進行的事務(wù)之前等待來自生產(chǎn)者的事務(wù)狀態(tài)更新的ms中的最大時間量。
這個配置值將與InitPidRequest一起發(fā)送到事務(wù)協(xié)調(diào)器。如果該值大于max.transaction.timeout.ms在BROKER設(shè)置的ms,請求將失敗,并出現(xiàn)InvalidTransactionTimeout錯誤
默認是60000。這使得事務(wù)不會阻塞下游消費超過一分鐘,這在實時應(yīng)用程序中通常是允許的。
transactional.id
用于事務(wù)傳遞的TransactionalId。這支持跨多個生產(chǎn)者會話的可靠性語義,因為它允許客戶端保證使用相同TransactionalId的事務(wù)在啟動任何新事務(wù)之前已經(jīng)完成。如果不提供TransactionalId,則生產(chǎn)者僅限于冪等交付
注意,啟用。如果配置了TransactionalId,則必須啟用冪等性。
默認值為空,這意味著不能使用事務(wù)。
Consumer configs
isolation.level
以下是可能的值(默認為read_uncommitted):
read_uncommitted:消費uncommitted和committed的消息 in offset ordering.
read_committed:只消費non-transactional messages(非開啟事務(wù)的消息和)和committed transactional messages in offset order.未提交的消息對consumer不可見,只有在事務(wù)結(jié)束后,消息才對consumer可見。為了維持in offset ordering,這個設(shè)置意味著我們必須緩沖消費者中的消息,直到我們看到給定事務(wù)中的所有消息
Proposed Changes
Summary of Guarantees
- Idempotent Producer Guarantees
為了實現(xiàn)冪等生產(chǎn)者語義,我們引入了生產(chǎn)者id(后面成為為PID)和Kafka消息的sequence numbers的概念。在initialization過程中,每個新生產(chǎn)者都會被分配一個惟一的PID。PID分配對用戶是完全透明的
對于給定的PID,序列號將從零開始并單調(diào)遞增,每個主題分區(qū)產(chǎn)生一個序列號。在發(fā)送給Broker的每個消息上,Produer將遞增序列號。Broker在內(nèi)存中維護從每個PID接收到的每個TopicPartition的sequence numbers(<PID,TopicPartition>)。
- 如果一個produce request的
sequence numbers大于PID/TopicPartition上次提交 committed message,BROKER將拒絕該請求。 -
較低的sequence numbers消息會導(dǎo)致重復(fù)錯誤,生產(chǎn)者可以忽略該錯誤。 - 具有
較高sequence numbers的消息會導(dǎo)致序列錯誤,這表示一些消息已經(jīng)丟失,并且是致命的
這確保了,即使producer在失敗時必須retry requests,每條消息都將被精確地保存在日志中一次。此外,由于為new instance of a producer分配惟一的PID,因此我們只能保證在單個producer 會話中實現(xiàn)冪等生產(chǎn)。
這些冪等生產(chǎn)者語義對于無狀態(tài)應(yīng)用程序(如度量跟蹤和審計)可能非常有用。
- Transactional Guarantees
Transactional,可以自動對多個TopicPartitions的寫入保證事務(wù),例如。作為一個單元,對這些主題分區(qū)的所有寫操作都將成功或失敗
因為offsets topic記錄了consumer 消費的進度,因此利用了上述功能,使應(yīng)用程序能夠?qū)onsumed and produced 的消息批次綁定到到一個原子單元中
只有當整個“consu -transform- production”被完整地執(zhí)行時,才可以認為一次完整的事務(wù)完成。
另外,有狀態(tài)的應(yīng)用也可以保證重啟后從斷點處繼續(xù)處理,也即事務(wù)恢復(fù)。
為了實現(xiàn)這種效果,應(yīng)用程序必須提供一個穩(wěn)定的(重啟后不變)唯一的ID,也即Transaction ID。Transactin ID與PID可能一一對應(yīng)。區(qū)別在于Transaction ID由用戶提供,而PID是內(nèi)部的實現(xiàn)對用戶透明
當提供這樣一個TransactionalId時,Kafka將保證:
1.跨Session的數(shù)據(jù)冪等發(fā)送。當具有相同Transaction ID的新的Producer實例被創(chuàng)建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作。
2.跨Session的事務(wù)恢復(fù)。如果某個應(yīng)用實例宕機,新的實例可以保證任何未完成的舊的事務(wù)要么Commit要么Abort,使得新實例從一個正常狀態(tài)開始工作。
需要注意的是,上述的事務(wù)保證是從Producer的角度去考慮的。從Consumer的角度來看,該保證會相對弱一些。尤其是不能保證所有被某事務(wù)Commit過的所有消息都被一起消費,因為:
- 對于壓縮的Topic而言,同一事務(wù)的某些消息可能被其它版本覆蓋
- 事務(wù)包含的消息可能分布在多個Segment中(即使在同一個Partition內(nèi)),當老的Segment被刪除時,該事務(wù)的部分數(shù)據(jù)可能會丟失
- Consumer在一個事務(wù)內(nèi)可能通過seek方法訪問任意Offset的消息,從而可能丟失部分消息
- Consumer可能并不需要消費某一事務(wù)內(nèi)的所有Partition,因此它將永遠不會讀取組成該事務(wù)的所有消息
關(guān)鍵概念
為了實現(xiàn)Transaction,確保一組消息是自動的produce和consumer,我們引入了幾個新概念:
我們將引入一個稱為事務(wù)協(xié)調(diào)器的實體對象。與group coordinator類似,每個生產(chǎn)者都被分配一個transaction coordinator,所有分配pid和管理事務(wù)的邏輯都由transaction coordinator完成。
我們引入控制消息的概念。這些是寫進用戶主題的特殊消息,由client
處理,但對用戶透明。例如,可以使用它們讓broker向consumer表明以前獲取的消息是否已經(jīng)原子提交。在此之前已經(jīng)提出了控制消息。
我們引入了TransactionalId的概念,使用戶能夠以持久的方式標識producer。具有相同TransactionalId的生產(chǎn)者的不同實例將能夠resume(或abort)前一個實例實例化的任何事務(wù)
我們引入了producer epoch的概念,它使我們能夠確保具有給定TransactionalId的producer只有一個合法的active實例,從而使我們能夠在發(fā)生故障時維護保證事務(wù)。
除了上面的新概念之外,我們還引入了新的請求類型、現(xiàn)有請求的新版本和核心消息格式的新版本,以支持事務(wù)。所有這些的細節(jié)將在其他文檔介紹。
完整事務(wù)過程


1. 找到 transaction coordinator -- the FindCoordinatorRequest
由于Transaction Coordinator是分配PID和管理事務(wù)的核心,因此Producer要做的第一件事情就是通過向任意一個Broker發(fā)送FindCoordinator請求找到Transaction Coordinator的位置。
-
獲取 producer Id -- the InitPidRequest
找到Transaction Coordinator后,具有冪等特性的Producer必須發(fā)起InitPidRequest請求以獲取PID。
- 2.1 如果事務(wù)特性被開啟,InitPidRequest會發(fā)送給Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有該Transaction ID的InitPidRequest請求,它將會把該<TransactionID, PID>存入Transaction Log,如上圖中步驟2.1所示。這樣可保證該對應(yīng)關(guān)系被持久化,從而保證即使Transaction Coordinator宕機該對應(yīng)關(guān)系也不會丟失。這使我們能夠?qū)ransactionalId的相同PID返回去實例化 producer,從而能夠恢復(fù)或中止以前不完整的事務(wù)
除了返回PID外,InitPidRequest還會執(zhí)行如下任務(wù):
增加該PID對應(yīng)的epoch。具有相同PID但epoch小于該epoch的其它Producer(如果有)新開啟的事務(wù)將被拒絕。
恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)(如果有)。
注意:InitPidRequest的處理過程是同步阻塞的。一旦該調(diào)用正確返回,生產(chǎn)者就可以發(fā)送數(shù)據(jù)并啟動新的事務(wù)。
- 2.2 另外,如果事務(wù)特性未開啟,InitPidRequest可發(fā)送至任意Broker,則分配一個新的PID,并且該生產(chǎn)者在單個會話中只使用冪等語義和事務(wù)語義。
啟動Transaction – The beginTransaction() API
Kafka從0.11.0.0版本開始,提供beginTransaction()方法用于開啟一個事務(wù)。調(diào)用該方法后,Producer本地會記錄已經(jīng)開啟了事務(wù),但Transaction Coordinator只有在Producer發(fā)送第一條消息后才認為事務(wù)已經(jīng)開啟。The consume-transform-produce loop
這一階段,包含了整個事務(wù)的數(shù)據(jù)處理過程,并且包含了多種請求。
- 4.1 AddPartitionsToTxnRequest
注冊partition
一個Producer可能會給多個<Topic, Partition>發(fā)送數(shù)據(jù),給一個新的<Topic, Partition>發(fā)送數(shù)據(jù)前,它需要先向Transaction Coordinator發(fā)送AddPartitionsToTxnRequest。
Transaction Coordinator會將該<Transaction, TopicPartition>存于Transaction Log內(nèi),并將其狀態(tài)置為BEGIN,如上圖中步驟4.1所示。有了該信息后,我們才可以在后續(xù)步驟中為每個Topic, Partition>寫入commit或abort標記(如上圖中步驟5.2所示)。
另外,如果該<Topic, Partition>為該事務(wù)中第一個<Topic, Partition>,Transaction Coordinator還會啟動對該事務(wù)的計時(每個事務(wù)都有自己的超時時間
- 4.2 ProduceRequest
Producer通過一個或多個ProduceRequest發(fā)送一系列消息。除了應(yīng)用數(shù)據(jù)外,該請求還包含了PID,epoch,和Sequence Number
4.2a
4.3 AddOffsetCommitsToTxnRequest
Producer有一個新的KafkaProducer.sendOffsetsToTransactionAPI方法,它支持批量處理consumed and produced 的消息。這個方法接受Map<TopicPartitions、OffsetAndMetadata>和一個groupId參數(shù)。
該方法先判斷在當前事務(wù)中該方法是否已經(jīng)被調(diào)用并傳入了相同的Group ID。若是,直接跳到下一步;若不是,則向Transaction Coordinator發(fā)送AddOffsetsToTxnRequests請求,Transaction Coordinator將對應(yīng)的所有<Topic, Partition>存于Transaction Log中,并將其狀態(tài)記為BEGIN,該方法會阻塞直到收到響應(yīng).in step 4.2a。
4.4 TxnOffsetCommitRequest 提交消費偏移
作為sendOffsetsToTransaction方法的一部分,在處理完AddOffsetsToTxnRequest后,Producer也會發(fā)送TxnOffsetCommit請求給Consumer Coordinator從而將本事務(wù)包含的與讀操作相關(guān)的各<Topic, Partition>的Offset持久化到內(nèi)部的__consumer_offsets主題中,step 4.3a
在此過程中,Consumer Coordinator會通過PID和對應(yīng)的epoch來驗證是否應(yīng)該允許該Producer的該請求。
這里需要注意:
寫入__consumer_offsets的Offset信息在當前事務(wù)Commit前對外是不可見的。也即在當前事務(wù)被Commit前,可認為該Offset尚未Commit,偏移量在外部是不可見的,也即對應(yīng)的消息尚未被完成處理。在事務(wù)提交之前,
Consumer Coordinator并不會立即更新緩存中相應(yīng)<Topic, Partition>的Offset,因為此時這些更新操作尚未被COMMIT或ABORT。
5. Committing or Aborting a Transaction 提交或終止事務(wù)
一旦數(shù)據(jù)被寫入,用戶必須調(diào)用KafkaProducer.endTransaction方法或者KafkaProducer.abortTransaction方法開始提交或中止事務(wù)
5.1 EndTxnRequest
commitTransaction方法使得Producer寫入的數(shù)據(jù)對下游Consumer可見。abortTransaction方法通過Transaction Marker將Producer寫入的數(shù)據(jù)標記為Aborted狀態(tài)。下游的Consumer如果將isolation.level設(shè)置為READ_COMMITTED,則它讀到被Abort的消息后直接將其丟棄而不會返回給客戶程序,也即被Abort的消息對應(yīng)用程序不可見。
無論是Commit還是Abort,Producer都會發(fā)送EndTxnRequest請求(附加指示事務(wù)是Commit還是Abort的數(shù)據(jù))給Transaction Coordinator
收到該請求后,Transaction Coordinator會進行如下操作
- 將PREPARE_COMMIT或PREPARE_ABORT消息寫入Transaction Log,
step 5.1a所示 - 通過WriteTxnMarkerRequest以Transaction Marker的形式將COMMIT或ABORT command消息寫入用戶數(shù)據(jù)日志以及Offset Log中,如上圖中步驟5.2所示
- 最后將 COMMITTED (or ABORTED) message寫入Transaction Log中,如上圖中步驟5.3所示
5.2 WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest由Transaction Coordinator發(fā)送給當前事務(wù)涉及到的每個<Topic, Partition>的Leader。在接收到這個請求時,每個BROKER都將向LOG寫入一個COMMIT(PID)或ABORT(PID)控制消息,step 5.2a。
該控制消息向Broker以及Consumer表明對應(yīng)PID的消息被Commit了還是被Abort了。
在此之前,consumer將緩沖具有PID的message,直到讀取相應(yīng)的COMMIT或ABORT消息,然后分別交付或drop消息。
這里要注意,如果事務(wù)也涉及到__consumer_offsets,即該事務(wù)中有消費數(shù)據(jù)的操作且將該消費的Offset存于__consumer_offsets中,Transaction Coordinator也需要向該內(nèi)部Topic的各Partition的Leader發(fā)送WriteTxnMarkerRequest從而寫入COMMIT(PID)或COMMIT(PID)控制信息step 5.2a on the left
5.3 Writing the final Commit or Abort Message 寫最終的commit或abort消息
在所有commit或abort控制消息寫入datalog之后,事務(wù)協(xié)調(diào)器最終將COMMITTED or ABORTED message 寫入transaction log,表明事務(wù)已完成step in 5.3。此時,可以刪除事務(wù)日志中與事務(wù)相關(guān)的大多數(shù)消息。
此時,Transaction Log中所有關(guān)于該事務(wù)的消息全部可以移除。當然,由于Kafka內(nèi)數(shù)據(jù)是Append Only的,不可直接更新和刪除,這里說的移除只是將其標記為null從而在Log Compact時不再保留。
另外,COMPLETE_COMMIT或COMPLETE_ABORT的寫入并不需要得到所有RREPLICAS的ACK,因為如果該消息丟失,可以根據(jù)事務(wù)協(xié)議重發(fā)。
我們只需要保留已完成事務(wù)的PID和時間戳,這樣我們就可以最終刪除為producer 產(chǎn)生的TransactionalId->PID mapping。請參閱下面的過期pid一節(jié)。
補充說明,如果參與該事務(wù)的某些<Topic, Partition>在被寫入Transaction Marker前不可用,它對READ_COMMITTED的Consumer不可見,但不影響其它可用<Topic, Partition>的COMMIT或ABORT。在該<Topic, Partition>恢復(fù)可用后,Transaction Coordinator會重新根據(jù)PREPARE_COMMIT或PREPARE_ABORT向該<Topic, Partition>發(fā)送Transaction Marker。
總結(jié)
PID與Sequence Number的引入實現(xiàn)了寫操作的冪等性
寫操作的冪等性結(jié)合At Least Once語義實現(xiàn)了單一Session內(nèi)的Exactly Once語義
Transaction Marker與PID提供了識別消息是否應(yīng)該被讀取的能力,從而實現(xiàn)了事務(wù)的隔離性
Offset的更新標記了消息是否被讀取,從而將對讀操作的事務(wù)處理轉(zhuǎn)換成了對寫(Offset)操作的事務(wù)處理
Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對應(yīng)的消息與一組讀操作(如果有)對應(yīng)的Offset的更新進行同樣的標記(即Transaction Marker)來實現(xiàn)事務(wù)中涉及的所有讀寫操作同時對外可見或同時對外不可見
Kafka只提供對Kafka本身的讀寫操作的事務(wù)性,不提供包含外部系統(tǒng)的事務(wù)性
參考自
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka#TransactionalMessaginginKafka-ProducerIDsandstategroups
https://www.confluent.io/blog/transactions-apache-kafka/
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
https://mp.weixin.qq.com/s/HBN-rSYRozNOsjMmk5cjLw
http://www.itdecent.cn/p/f77ade3f41fd