網上找資料復習學習的時候留意點:
報錯
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(Flink01_WordCount_Batch.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:701)
at com.keke.day01.Flink01_WordCount_Batch.main(Flink01_WordCount_Batch.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:351)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:523)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:147)
at org.apache.flink.api.java.DataSet.map(DataSet.java:216)
at com.keke.day01.Flink01_WordCount_Batch.main(Flink01_WordCount_Batch.java:32)
案例:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class Flink01_WordCount_Batch {
public static void main(String[] args) throws Exception {
//1.獲取執(zhí)行環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.讀取文件數據
DataSource<String> input = env.readTextFile("input2");
//3.壓平
FlatMapOperator<String, String> wordDS = input.flatMap(new MyFlatMapFunc());
//4.將單詞轉換為元組
// MapOperator<String, Tuple2<String, Integer>> wordToOneDS = wordDS.map((MapFunction<String, Tuple2<String, Integer>>) value -> {
// return new Tuple2<>(value, 1);
// //return Tuple2.of(value, 1);
// }).returns(Types.TUPLE(Types.STRING, Types.INT));
MapOperator<String, Tuple2<String, Integer>> wordToOneDS = wordDS.map((MapFunction<String, Tuple2<String, Integer>>) value -> {
return new Tuple2<>(value, 1);
//return Tuple2.of(value, 1);
}); //當Lambda表達式使用 java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息
//5.分組
UnsortedGrouping<Tuple2<String, Integer>> groupBy = wordToOneDS.groupBy(0);
//6.聚合
AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1);
//7.打印結果
result.print();
}
//自定義實現壓平操作的類
public static class MyFlatMapFunc implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//按照空格切割
String[] words = value.split(" ");
//遍歷words,寫出一個個的單詞
for (String word : words) {
out.collect(word);
}
}
}
}
當Lambda表達式使用 java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息
returns(Types.TUPLE(Types.STRING, Types.INT))