Flink 當Lambda表達式使用 java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息

網上找資料復習學習的時候留意點:
報錯

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(Flink01_WordCount_Batch.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
    at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:701)
    at com.keke.day01.Flink01_WordCount_Batch.main(Flink01_WordCount_Batch.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
    at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:351)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:523)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:147)
    at org.apache.flink.api.java.DataSet.map(DataSet.java:216)
    at com.keke.day01.Flink01_WordCount_Batch.main(Flink01_WordCount_Batch.java:32)

案例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Flink01_WordCount_Batch {

    public static void main(String[] args) throws Exception {

        //1.獲取執(zhí)行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();



        //2.讀取文件數據
        DataSource<String> input = env.readTextFile("input2");

        //3.壓平
        FlatMapOperator<String, String> wordDS = input.flatMap(new MyFlatMapFunc());

        //4.將單詞轉換為元組
//        MapOperator<String, Tuple2<String, Integer>> wordToOneDS = wordDS.map((MapFunction<String, Tuple2<String, Integer>>) value -> {
//            return new Tuple2<>(value, 1);
//            //return Tuple2.of(value, 1);
//        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        MapOperator<String, Tuple2<String, Integer>> wordToOneDS = wordDS.map((MapFunction<String, Tuple2<String, Integer>>) value -> {
            return new Tuple2<>(value, 1);
            //return Tuple2.of(value, 1);
        }); //當Lambda表達式使用 java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息

        //5.分組
        UnsortedGrouping<Tuple2<String, Integer>> groupBy = wordToOneDS.groupBy(0);

        //6.聚合
        AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1);

        //7.打印結果
        result.print();

    }

    //自定義實現壓平操作的類
    public static class MyFlatMapFunc implements FlatMapFunction<String, String> {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            //按照空格切割
            String[] words = value.split(" ");
            //遍歷words,寫出一個個的單詞
            for (String word : words) {
                out.collect(word);
            }
        }
    }

}

當Lambda表達式使用 java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息

returns(Types.TUPLE(Types.STRING, Types.INT)) 
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容