以下錯誤,想必在做Spark的DateSet操作時一定是見過吧?
Error:(58, 17) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
peopleDF.map(row => row.get(0)).show()
這是因為在作map轉(zhuǎn)換時需要指定一個轉(zhuǎn)換的Encorder,在Scala代碼中是通過隱式轉(zhuǎn)換進(jìn)行的,而在Java代碼中則需要在代碼中指明。
為了更好理解寫了一個Java的代碼供學(xué)習(xí)加深理解。
代碼如下:
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[2]").appName("DataSetEncoderExample").getOrCreate();
List<String> data = Arrays.asList("chen", "li", "huang");
//創(chuàng)建DataSet的時候指明數(shù)據(jù)是String類型
Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
/***操作一:**/
//map操作:把string類型的變換成string類型
//此時MapFunction<String, String> 這兩個地方都應(yīng)該是String
Dataset<String> dsString2String = ds.map((MapFunction<String, String>) v -> "Hi," + v, Encoders.STRING());
dsString2String.show();
/***操作二:**/
//map操作:把string類型的變換成int類型
//注意此時MapFunction<String, Integer> 這兩個地方的類型變化
//第一個類型String為原來的DataSet的類型,第二個類型為輸出的類型
Dataset<Integer> dsString2Int = ds.map(new MapFunction<String, Integer>(){
@Override
public Integer call(String value) throws Exception {
return value.length();
}
}, Encoders.INT());
dsString2Int.show();
/***操作三:**/
//map操作:把string類型的變換成自定義的對象類型
//注意此時MapFunction<String, People> 這兩個地方的類型變化
//第一個類型String為原來的DataSet的類型,第二個類型People為輸出的類型
Encoder<People> peopleEncoder = Encoders.kryo(People.class);
Dataset<People> dsString2Object = ds.map(new MapFunction<String, People>(){
@Override
public People call(String value) throws Exception {
return new People(value, value.length());
}
}, peopleEncoder);
dsString2Object.show();
dsString2Object.map((MapFunction<People, String>) item -> item.getName(), Encoders.STRING()).show();
/***操作四:**/
//map操作:把string類型的變換成Row對象類型
//注意此時MapFunction<String, Row> 這兩個地方的類型變化
//第一個類型String為原來的DataSet的類型,第二個類型Row為輸出的類型
// Encoder<Row> rowEncoder = Encoders.kryo(Row.class);
Encoder<Row> rowEncoder = Encoders.javaSerialization(Row.class);
Dataset<Row> dsString2Row = ds.map(
(MapFunction<String, Row>) value -> RowFactory.create(value, value.length())
, rowEncoder);
dsString2Row.show();
dsString2Row.map((MapFunction<Row, String>) item -> item.getString(0), Encoders.STRING()).show();
spark.stop();
}
上述代碼中需要創(chuàng)建一下Java類
public class People {
String name;
Integer age;
public People( String name, Integer age){
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}