Flink 1.9 FlinkKafkaProducer 使用 EXACTLY_ONCE 錯誤記錄

使用flink FlinkKafkaProducer 往kafka寫入數(shù)據(jù)的時候要求使用EXACTLY_ONCE語義
本以為本以為按照官網(wǎng)寫一個就完事,但是卻報錯了

代碼

package com.meda.test

import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaProducer, KafkaSerializationSchema}

//創(chuàng)建一個DataStream
val dStream: DataStream[MapDt] = ...

//kafka配置
val kafkaPro:Properties = ...
  
//創(chuàng)建FlinkKafkaProducer 指定EXACTLY_ONCE
val kafkaSink: FlinkKafkaProducer[ResultDt] = new FlinkKafkaProducer[ResultDt]("top[ic", new ResultDtSerialization("flink-topic-lillcol"), kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)


case class ResultDt(id: String, date_h: String, star: String, end: String, watermark: String, pv: Long, uv: Long)

class ResultDtSerialization(topic: String) extends KafkaSerializationSchema[ResultDt] {
  override def serialize(t: ResultDt, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord[Array[Byte], Array[Byte]](topic, t.toString.getBytes())
  }
}

遇到問題

當(dāng)FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.AT_LEAST_ONCE時,執(zhí)行沒有問題。

當(dāng)FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.EXACTLY_ONCE時,執(zhí)行報下面的錯誤:

org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:984)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
    at java.lang.Thread.run(Thread.java:748)
[INFO ] 2019-12-24 15:25:35,212 method:org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1324)

錯誤大意是:
事務(wù)超時大于broker允許的最大值(transaction.max.timeout.ms)
一開始想都沒想去修改transaction.max.timeout.ms的值,但是沒有解決問題。


解決辦法

官網(wǎng)關(guān)于Kafka Producers and Fault Tolerance有一段說明

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. 
This property will not allow to set transaction timeouts for the producers larger than it’s value.
FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.

Kafka brokers 默認(rèn)的最大事務(wù)超時(transaction.max.timeout.ms)為15 minutes
生產(chǎn)者設(shè)置事務(wù)超時不允許大于這個值,這個屬性不允許為大于其值的
但是在默認(rèn)的情況下,F(xiàn)linkKafkaProducer011設(shè)置事務(wù)超時屬性(transaction.timeout.ms)為1 hour, 超過默認(rèn)transaction.max.timeout.ms15 minutes。
因此在使用EXACTLY_ONCE語義的時候需要增大transaction.max.timeout.ms的值。

按照個和說法我只要transaction.max.timeout.ms增加到大于1 hour(即3 600 000ms)以上就可以了,但是經(jīng)過測試還是不行。
最后通過修改生產(chǎn)者的事務(wù)超時屬性transaction.timeout.ms解決問題

transaction.timeout.ms從1hour降到5 minutes 成功解決問題。

//增加配置屬性操作如下:
kafkaPro.setProperty("transaction.timeout.ms",1000*60*5+"")

本文原創(chuàng)文章,轉(zhuǎn)載請注明出處?。?!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容