對于實時處理當中,我們實際工作當中的數(shù)據(jù)源一般都是使用kafka,所以我們一起來看看如何通過Flink來集成kafka
flink提供了一個特有的kafka connector去讀寫kafka topic的數(shù)據(jù)。flink消費kafka數(shù)據(jù),并不是完全通過跟蹤kafka消費組的offset來實現(xiàn)去保證exactly-once的語義,而是flink內(nèi)部去跟蹤offset和做checkpoint去實現(xiàn)exactly-once的語義,而且對于kafka的partition,F(xiàn)link會啟動對應(yīng)的并行度去處理kafka當中的每個分區(qū)的數(shù)據(jù)
flink整合kafka官網(wǎng)介紹
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html
第一步:導(dǎo)入jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>
第二步:將kafka作為flink的source來使用
實際工作當中一般都是將kafka作為flink的source來使用
創(chuàng)建kafka的topic
安裝好kafka集群,并啟動kafka集群,然后在node01執(zhí)行以下命令創(chuàng)建kafka的topic為test
cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
代碼實現(xiàn):
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object FlinkKafkaSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隱式轉(zhuǎn)換import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(100);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","con1")
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); var kafkaSoruce: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
kafkaSoruce.setCommitOffsetsOnCheckpoints(true) //設(shè)置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka/checkpoints",true)); val result: DataStream[String] = env.addSource(kafkaSoruce)
result.print()
env.execute()
}
}
kafka生產(chǎn)數(shù)據(jù)
node01執(zhí)行以下命令,通過shell命令行來生產(chǎn)數(shù)據(jù)到kafka當中去
cd /kkb/install/kafka_2.11-1.1.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
第三步:將kafka作為flink的sink來使用
我們也可以將kafka作為flink的sink來使用,就是將flink處理完成之后的數(shù)據(jù)寫入到kafka當中去
代碼實現(xiàn)
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper object FlinkKafkaSink {
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隱式轉(zhuǎn)換import org.apache.flink.api.scala._ //checkpoint配置env.enableCheckpointing(5000);
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig.setCheckpointTimeout(60000);
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設(shè)置statebackend env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));
val text = env.socketTextStream("node01",9000) val topic = "test" val prop = new Properties()
prop.setProperty("bootstrap.servers","node01:9092")
prop.setProperty("group.id","kafka_group1") //第一種解決方案,設(shè)置FlinkKafkaProducer011里面的事務(wù)超時時間 //設(shè)置事務(wù)超時時間prop.setProperty("transaction.timeout.ms",6000015+""); //第二種解決方案,設(shè)置kafka的最大事務(wù)超時時間* //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
text.addSink(myProducer)
env.execute("StreamingFromCollectionScala")
}
}
啟動socket服務(wù)發(fā)送數(shù)據(jù)
node01執(zhí)行以下命令,發(fā)送數(shù)據(jù)到socket服務(wù)里面去
nc -lk 9000
啟動kafka消費者
node01執(zhí)行以下命令啟動kafka消費者,消費數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092 --topic test