使用flink FlinkKafkaProducer 往kafka寫入數(shù)據(jù)的時候要求使用EXACTLY_ONCE語義
本以為本以為按照官網(wǎng)寫一個就完事,但是卻報錯了
代碼
package com.meda.test
import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaProducer, KafkaSerializationSchema}
//創(chuàng)建一個DataStream
val dStream: DataStream[MapDt] = ...
//kafka配置
val kafkaPro:Properties = ...
//創(chuàng)建FlinkKafkaProducer 指定EXACTLY_ONCE
val kafkaSink: FlinkKafkaProducer[ResultDt] = new FlinkKafkaProducer[ResultDt]("top[ic", new ResultDtSerialization("flink-topic-lillcol"), kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
case class ResultDt(id: String, date_h: String, star: String, end: String, watermark: String, pv: Long, uv: Long)
class ResultDtSerialization(topic: String) extends KafkaSerializationSchema[ResultDt] {
override def serialize(t: ResultDt, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](topic, t.toString.getBytes())
}
}
遇到問題
當(dāng)FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.AT_LEAST_ONCE時,執(zhí)行沒有問題。
當(dāng)FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.EXACTLY_ONCE時,執(zhí)行報下面的錯誤:
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:984)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
at java.lang.Thread.run(Thread.java:748)
[INFO ] 2019-12-24 15:25:35,212 method:org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1324)
錯誤大意是:
事務(wù)超時大于broker允許的最大值(transaction.max.timeout.ms)
一開始想都沒想去修改transaction.max.timeout.ms的值,但是沒有解決問題。
解決辦法
官網(wǎng)關(guān)于Kafka Producers and Fault Tolerance有一段說明
Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes.
This property will not allow to set transaction timeouts for the producers larger than it’s value.
FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.
Kafka brokers 默認(rèn)的最大事務(wù)超時(transaction.max.timeout.ms)為15 minutes
生產(chǎn)者設(shè)置事務(wù)超時不允許大于這個值,這個屬性不允許為大于其值的
但是在默認(rèn)的情況下,F(xiàn)linkKafkaProducer011設(shè)置事務(wù)超時屬性(transaction.timeout.ms)為1 hour, 超過默認(rèn)transaction.max.timeout.ms15 minutes。
因此在使用EXACTLY_ONCE語義的時候需要增大transaction.max.timeout.ms的值。
按照個和說法我只要transaction.max.timeout.ms增加到大于1 hour(即3 600 000ms)以上就可以了,但是經(jīng)過測試還是不行。
最后通過修改生產(chǎn)者的事務(wù)超時屬性transaction.timeout.ms解決問題
將transaction.timeout.ms從1hour降到5 minutes 成功解決問題。
//增加配置屬性操作如下:
kafkaPro.setProperty("transaction.timeout.ms",1000*60*5+"")
本文原創(chuàng)文章,轉(zhuǎn)載請注明出處?。?!