Flink的DataStream集成kafka

對于實時處理當中,我們實際工作當中的數(shù)據(jù)源一般都是使用kafka,所以我們一起來看看如何通過Flink來集成kafka

flink提供了一個特有的kafka connector去讀寫kafka topic的數(shù)據(jù)。flink消費kafka數(shù)據(jù),并不是完全通過跟蹤kafka消費組的offset來實現(xiàn)去保證exactly-once的語義,而是flink內(nèi)部去跟蹤offset和做checkpoint去實現(xiàn)exactly-once的語義,而且對于kafka的partition,F(xiàn)link會啟動對應(yīng)的并行度去處理kafka當中的每個分區(qū)的數(shù)據(jù)

flink整合kafka官網(wǎng)介紹

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html

第一步:導(dǎo)入jar包

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>

第二步:將kafka作為flink的source來使用

實際工作當中一般都是將kafka作為flink的source來使用

創(chuàng)建kafka的topic

安裝好kafka集群,并啟動kafka集群,然后在node01執(zhí)行以下命令創(chuàng)建kafka的topic為test

cd /kkb/install/kafka_2.11-1.1.0

bin/kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181

代碼實現(xiàn):

import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object FlinkKafkaSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隱式轉(zhuǎn)換import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(100);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","con1")
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); var kafkaSoruce: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)

kafkaSoruce.setCommitOffsetsOnCheckpoints(true) //設(shè)置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka/checkpoints",true)); val result: DataStream[String] = env.addSource(kafkaSoruce)
result.print()
env.execute()
}
}

kafka生產(chǎn)數(shù)據(jù)

node01執(zhí)行以下命令,通過shell命令行來生產(chǎn)數(shù)據(jù)到kafka當中去

cd /kkb/install/kafka_2.11-1.1.0

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

第三步:將kafka作為flink的sink來使用

我們也可以將kafka作為flink的sink來使用,就是將flink處理完成之后的數(shù)據(jù)寫入到kafka當中去

代碼實現(xiàn)

import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper object FlinkKafkaSink {
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隱式轉(zhuǎn)換import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(5000);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設(shè)置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));

val text = env.socketTextStream("node01",9000) val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","kafka_group1") //第一種解決方案,設(shè)置FlinkKafkaProducer011里面的事務(wù)超時時間 //設(shè)置事務(wù)超時時間prop.setProperty("transaction.timeout.ms",6000015+""); //第二種解決方案,設(shè)置kafka的最大事務(wù)超時時間* //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//
使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
text.addSink(myProducer)
env.execute("StreamingFromCollectionScala")
}
}

啟動socket服務(wù)發(fā)送數(shù)據(jù)

node01執(zhí)行以下命令,發(fā)送數(shù)據(jù)到socket服務(wù)里面去

nc -lk 9000

啟動kafka消費者

node01執(zhí)行以下命令啟動kafka消費者,消費數(shù)據(jù)

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092 --topic test

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容