Flink消費(fèi)kafka如何獲取每條消息對(duì)應(yīng)的topic

1.首先自定義個(gè) KafkaDeserializationSchema

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    //nextElement 是否表示流的最后一條元素,我們要設(shè)置為 false ,因?yàn)槲覀冃枰?msg 源源不斷的被消費(fèi)
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }
    
    @Override
    // 反序列化 kafka 的 record,我們直接返回一個(gè) tuple2<kafkaTopicName,kafkaMsgValue>
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }
    
    @Override
    //告訴 Flink 我輸入的數(shù)據(jù)類(lèi)型, 方便 Flink 的類(lèi)型推斷
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

2.使用自定義的 KafkaDeserializationSchema 進(jìn)行消費(fèi)

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        
        FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
        kafkaConsumer.setStartFromEarliest();
        env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
                System.out.println("topic==== " + value.f0);
            }
        });
        
        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 【Android 動(dòng)畫(huà)】 動(dòng)畫(huà)分類(lèi)補(bǔ)間動(dòng)畫(huà)(Tween動(dòng)畫(huà))幀動(dòng)畫(huà)(Frame 動(dòng)畫(huà))屬性動(dòng)畫(huà)(Property ...
    Rtia閱讀 6,380評(píng)論 1 38
  • 今天看了一部戰(zhàn)爭(zhēng)電影。 平時(shí)很少看電影、看劇。今天才知道,原來(lái)最近很火的、朋友圈刷屏的《長(zhǎng)安十二時(shí)辰》,是電視劇。...
    Fang2023閱讀 199評(píng)論 0 0
  • -o 的意思是 輸出到文件而非標(biāo)準(zhǔn)輸出,作用等于>。 -n 表示最多輸出多少行文件。
    VanJordan閱讀 280評(píng)論 0 0
  • 今日驚蟄。 《月令七十二候集解》中說(shuō):“二月節(jié),萬(wàn)物出乎震,震為雷,故曰驚蟄。是蟄蟲(chóng)驚而出走矣?!?驚蟄的意思是天...
    風(fēng)之舞555閱讀 1,035評(píng)論 12 31

友情鏈接更多精彩內(nèi)容