1.首先自定義個(gè) KafkaDeserializationSchema
public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
//nextElement 是否表示流的最后一條元素,我們要設(shè)置為 false ,因?yàn)槲覀冃枰?msg 源源不斷的被消費(fèi)
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
@Override
// 反序列化 kafka 的 record,我們直接返回一個(gè) tuple2<kafkaTopicName,kafkaMsgValue>
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
}
@Override
//告訴 Flink 我輸入的數(shù)據(jù)類(lèi)型, 方便 Flink 的類(lèi)型推斷
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}
2.使用自定義的 KafkaDeserializationSchema 進(jìn)行消費(fèi)
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
System.out.println("topic==== " + value.f0);
}
});
// execute program
env.execute("Flink Streaming Java API Skeleton");
}