spark 數(shù)據(jù)源 擴(kuò)展 之 excel

提問(wèn):spark 數(shù)據(jù)源有幾種擴(kuò)展方式?

答:三種,兩種是基于datasource v1的,第三種是datasource v2的實(shí)現(xiàn);不推薦前兩種方法,實(shí)現(xiàn)有點(diǎn)復(fù)雜
推薦使用第三種。本文將在后面針對(duì)datasourceV2做下讀取excel的實(shí)現(xiàn),如要擴(kuò)展其他數(shù)據(jù)源,本文也可做為參考。
注意:本篇是用java實(shí)現(xiàn)的,其實(shí)用scala實(shí)現(xiàn)會(huì)更加合適,尤其對(duì)datasource v1用java寫(xiě)會(huì)有很多坑

下面是擴(kuò)展數(shù)據(jù)源接口的偽代碼:
方法一,基于datasource v1的實(shí)現(xiàn)之一,實(shí)現(xiàn)FileFormat接口

public class ExcelDataSource implements FileFormat, Serializable {
......
}

方法二,基于datasource v1的實(shí)現(xiàn)之二,實(shí)現(xiàn)RelationProvider接口,以及SchemaRelationProvider 接口

public class ExcelRelationProvider implements RelationProvider, SchemaRelationProvider {
}

方法三,實(shí)現(xiàn)DataSourceV2接口, 以及相關(guān)的ReadSupport接口(WriteSupport根據(jù)需要,如需寫(xiě)數(shù)據(jù)則實(shí)現(xiàn))

public class ExcelDataSourceV2 implements DataSourceV2, ReadSupport, Serializable {
......
}

提問(wèn):其中原理是什么?

答:奉上代碼,自行查看

  • 方法一和方法二在spark源碼中的調(diào)用位置,見(jiàn)DataSource 類的 resolveRelation方法,此處給出偽代碼,可以看出針對(duì)不同的接口,進(jìn)行了不同的處理,如下:
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
    val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
      // TODO: Throw when too much is given.
      case (dataSource: SchemaRelationProvider, Some(schema)) =>
        .......
      case (dataSource: RelationProvider, None) =>
        .......
      case (_: SchemaRelationProvider, None) =>
        throw new AnalysisException(s"A schema needs to be specified when using $className.")
      case (dataSource: RelationProvider, Some(schema)) =>
        .......
      case (format: FileFormat, _)          
        .......
    }
......
  }
  • 方法三接口在spark源碼中的調(diào)用位置,見(jiàn)DataFrameReader類的 load(paths: String*)方法,偽代碼如下:
def load(paths: String*): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    if (classOf[DataSourceV2].isAssignableFrom(cls)) { 
      //針對(duì)datasourceV2的處理
      ......
    } else {
      //針對(duì)dataSouceV1的處理
      loadV1Source(paths: _*)
    }
  }

提問(wèn):可否給個(gè)完整的實(shí)現(xiàn)例子?

答:當(dāng)然,下面給出excel針對(duì)dataSource v2的實(shí)現(xiàn)。

  • ExcelDataSourceV2.java,主體代碼均寫(xiě)在這個(gè)類里,也可以根據(jù)需要拆開(kāi)。
package self.robin.examples.spark.sources.v2.excel;

import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import self.robin.examples.spark.sources.SheetIterator;

import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;

import static self.robin.examples.spark.sources.SparkWorkbookHelper.*;

/**
 * @Description: ...
 * @Author: Li Yalei - Robin
 * @Date: 2021/2/8 10:20
 */
public class ExcelDataSourceV2 implements DataSourceV2, ReadSupport, Serializable {

    @Override
    public DataSourceReader createReader(DataSourceOptions options) {
        return new ExcelDataSourceV2Reader(SerializableOptions.of(options));
    }

    class ExcelDataSourceV2Reader implements DataSourceReader, Serializable {

        private SerializableOptions options;

        private volatile StructType schema;

        private Collection<String> paths;

        ExcelDataSourceV2Reader(SerializableOptions options) {
            this.options = options;
            init();
        }

        private void init(){
            Optional<String> pathOpt = options.get("path");
            if (!pathOpt.isPresent()) {
                throw new RuntimeException("path 不能為空");
            }
            paths = StringUtils.getStringCollection(pathOpt.get(), ",");
        }

        /**
         * 解析傳進(jìn)來(lái)的schema信息
         */
        public void buildStructType(Map<String, String> map) {
            List<StructField> fieldList = new ArrayList<>();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                StructField structField = new StructField(entry.getKey(),
                        new CatalystSqlParser(new SQLConf()).parseDataType(entry.getValue()),
                        true, Metadata.empty());

                fieldList.add(structField);
            }
            this.schema = new StructType(fieldList.toArray(new StructField[0]));
        }

        @Override
        public StructType readSchema() {

            if (this.schema != null && !this.schema.isEmpty()) {
                return this.schema;
            }
            Optional<String> schemaOpt = options.get("schema");

            if(schemaOpt.isPresent()){
                Map<String, String> map = new Gson().fromJson(schemaOpt.get(), LinkedHashMap.class);
                buildStructType(map);
            }else {
                tryParseColsFromFiles();
            }
            return this.schema;
        }

        private void tryParseColsFromFiles(){
            boolean header = options.getBoolean("header", false);
            //嘗試從excel解析header, 構(gòu)造StructType
            //默認(rèn)取第一個(gè)表單
            //要求所有excel表單中的列必須一樣多
            List<String> colNames = new ArrayList<>();
            int size = paths.stream().map(path -> {
                try {
                    Workbook wb = createWorkbook(path, getConfiguration());
                    //默認(rèn)取第一個(gè)表單
                    List<String> cols = getColumnNames(wb.getSheetAt(0), 1, 1, header);
                    //保存首個(gè)解析出的列名
                    if(colNames.isEmpty()){
                        colNames.addAll(cols);
                    }
                    //要求所有excel表單中的列必須一樣多
                    return cols.size();
                } catch (IOException e) {
                    e.printStackTrace();
                    return -1;
                }
            }).collect(Collectors.toSet()).size();

            if(size!=1){
                //說(shuō)明所有excel文件的列不一致
                throw new RuntimeException("提供的excel文件中表單的列不一致,請(qǐng)檢查");
            }

            Map<String, String> map = new LinkedHashMap<>();
            for (String col : colNames) {
                map.put(col, "String");
            }
            buildStructType(map);
        }


        @Override
        public List<DataReaderFactory<Row>> createDataReaderFactories() {

            SerializableConfiguration serConfig = new SerializableConfiguration(getConfiguration());
            boolean header = options.getBoolean("header", false);

            return paths.parallelStream().map(path -> new DataReaderFactory<Row>() {

                @Override
                public DataReader<Row> createDataReader() {
                    return new WorkbookReader(header, path, serConfig);
                }
            }).collect(Collectors.toList());
        }

        /**
         * 獲取提交的配置信息
         * @return
         */
        private Configuration getConfiguration(){
            SparkSession spark = SparkSession.getActiveSession().get();
            Configuration config = spark.sparkContext().hadoopConfiguration();
            return config;
        }
    }

    class WorkbookReader implements DataReader<Row>, Serializable {

        /**
         * 是否第一行是表格頭
         */
        private boolean header;
        /**
         * 文件路徑
         */
        private String path;

        /**
         * excel
         */
        private Workbook workbook;

        private SheetIterator sheetIterator;

        /**
         * excel文件的path信息,以及表單中數(shù)據(jù)的位置信息
         * @param header 首行是否是表頭
         * @param path 文件路徑
         * @param configuration hadoop
         */
        public WorkbookReader(boolean header, String path,
                              SerializableConfiguration configuration) {
            this.header = header;
            this.path = path;
            if (path == null || path.equals("")) {
                throw new RuntimeException("path is null");
            }
            init(configuration.value());
        }

        /**
         * 因?yàn)榇颂幋a不在driver端運(yùn)行,所以不能 SparkSession.getActiveSession()
         */
        private void init(Configuration conf) {
            try {
                this.workbook = createWorkbook(path, conf);
                this.sheetIterator = new SheetIterator(header, this.workbook.iterator());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean next() throws IOException {
            return sheetIterator.hasNext();
        }

        @Override
        public Row get() {
            Object[] values = cellValuesInRow(this.sheetIterator.next());
            return new GenericRow(values);
        }

        @Override
        public void close() throws IOException {
            if (this.workbook != null) {
                this.workbook.close();
            }
        }
    }

}

  • SerializableOptions.java, 這個(gè)類可以理解為與DataSourceOptions完全一樣,只不過(guò)實(shí)現(xiàn)了Serializable接口,因?yàn)槲倚枰獙⑷雲(yún)⒗^續(xù)往下傳,所以需要類可序列化,僅做參數(shù)封裝用,可以用Map替換掉。
package self.robin.examples.spark.sources.v2.excel;

import org.apache.spark.sql.sources.v2.DataSourceOptions;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

/**
 * @Description: ...
 * @Author: Robin-Li
 * @DateTime: 2021-02-08 22:02
 */
public class SerializableOptions implements Serializable {

    private final Map<String, String> keyLowerCasedMap;

    private String toLowerCase(String key) {
        return key.toLowerCase(Locale.ROOT);
    }

    public static SerializableOptions of(DataSourceOptions options){
        return new SerializableOptions(options.asMap());
    }

    public SerializableOptions(Map<String, String> originalMap) {
        keyLowerCasedMap = new HashMap<>(originalMap.size());
        for (Map.Entry<String, String> entry : originalMap.entrySet()) {
            keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
        }
    }

    public Map<String, String> asMap() {
        return new HashMap<>(keyLowerCasedMap);
    }

    /**
     * Returns the option value to which the specified key is mapped, case-insensitively.
     */
    public Optional<String> get(String key) {
        return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
    }

    /**
     * Returns the boolean value to which the specified key is mapped,
     * or defaultValue if there is no mapping for the key. The key match is case-insensitive
     */
    public boolean getBoolean(String key, boolean defaultValue) {
        String lcaseKey = toLowerCase(key);
        return keyLowerCasedMap.containsKey(lcaseKey) ?
                Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
    }

    /**
     * Returns the integer value to which the specified key is mapped,
     * or defaultValue if there is no mapping for the key. The key match is case-insensitive
     */
    public int getInt(String key, int defaultValue) {
        String lcaseKey = toLowerCase(key);
        return keyLowerCasedMap.containsKey(lcaseKey) ?
                Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
    }

    /**
     * Returns the long value to which the specified key is mapped,
     * or defaultValue if there is no mapping for the key. The key match is case-insensitive
     */
    public long getLong(String key, long defaultValue) {
        String lcaseKey = toLowerCase(key);
        return keyLowerCasedMap.containsKey(lcaseKey) ?
                Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
    }

    /**
     * Returns the double value to which the specified key is mapped,
     * or defaultValue if there is no mapping for the key. The key match is case-insensitive
     */
    public double getDouble(String key, double defaultValue) {
        String lcaseKey = toLowerCase(key);
        return keyLowerCasedMap.containsKey(lcaseKey) ?
                Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
    }
}
  • SheetIterator.java 這個(gè)類,提供對(duì)row可遍歷的sheet迭代器,如果一個(gè)excel文件有多個(gè)sheet,支持前一個(gè)sheet取完,自動(dòng)遍歷下一個(gè)sheet,直到所有的sheet所有的row都遍歷一遍,很方便對(duì)吧。
package self.robin.examples.spark.sources;

import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;

import java.io.Serializable;
import java.util.Iterator;

/**
 * @Description: ...
 * @Author: Li Yalei - Robin
 * @Date: 2021/2/8 19:16
 */
public class SheetIterator implements Iterator<Row>, Serializable {

    /** 是否首行是header */
    private boolean header;

    private Iterator<Sheet> sheetIterator;

    private Iterator<Row> rowIterator;

    public SheetIterator(boolean header, Iterator<Sheet> sheetIterator){
        this.header = header;
        this.sheetIterator = sheetIterator;
    }

    @Override
    public boolean hasNext() {
        if(this.rowIterator==null || !this.rowIterator.hasNext()){
            if(this.sheetIterator==null || !this.sheetIterator.hasNext()){
                //sheetIterator is null OR sheetIterator is empty
                return false;
            }
            this.rowIterator = this.sheetIterator.next().rowIterator();
            if(header){
                //首行是標(biāo)題
                this.rowIterator.next();
            }
        }
        return this.rowIterator.hasNext();
    }

    @Override
    public Row next() {
        return rowIterator.next();
    }

}

  • SparkWorkbookHelper.java 最后貼上我用到的工具類。
package self.robin.examples.spark.sources;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.apache.spark.sql.execution.datasources.CodecStreams;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * @Description: ...
 * @Author: Li Yalei - Robin
 * @Date: 2021/2/20 17:59
 */
public interface SparkWorkbookHelper {

    /**
     * 獲取表單獵頭的表單集合
     * @param sheet 表單對(duì)象
     * @param startRow 從第幾行開(kāi)始(大于等于1)
     * @param startCol 從第幾列開(kāi)始(大于等于1)
     * @param header 首行是否是標(biāo)題行(首行即為startRow那行)
     * @return 表單列頭集合
     */
    static List<String> getColumnNames(Sheet sheet, int startRow, int startCol, boolean header){
        if(sheet==null){
            return new ArrayList<>();
        }

        Iterator<Row> rowIte = sheet.rowIterator();
        int rowIndex = startRow;

        while (--rowIndex>0 && rowIte.hasNext()) {
            rowIte.next();
        }

        if(!rowIte.hasNext()){
            return new ArrayList<>();
        }
        Row row = rowIte.next();
        int colIndex = startCol;

        Iterator<Cell> colIte = row.iterator();
        while (--colIndex>0 && colIte.hasNext()){
            colIte.next();
        }

        List<String> cols = new ArrayList<>();
        while (colIte.hasNext()){
            if(header){
                cols.add(colIte.next().getStringCellValue());
            } else {
                cols.add("col_"+(colIte.next().getColumnIndex()+1));
            }
        }
        return cols;
    }

    /**
     * 根據(jù) path 和 hadoop Configuration 創(chuàng)建 workbook對(duì)象
     * @param path
     * @param conf
     * @return wb
     */
    static Workbook createWorkbook(String path, Configuration conf) throws IOException {

        InputStream inputStream = CodecStreams.createInputStreamWithCloseResource(conf, new Path(path));

        try(InputStream is = inputStream){
            if (path.endsWith(".xls")) {
                return new HSSFWorkbook(is);
            } else if (path.endsWith(".xlsx")) {
                return new XSSFWorkbook(is);
            } else {
                throw new IOException("File format is not supported");
            }
        }
    }


    /**
     * 提取row中的cell的值
     * @param row
     * @return 返回cell值的數(shù)組
     */
    static Object[] cellValuesInRow(org.apache.poi.ss.usermodel.Row row) {
        Iterator<Cell> cellIte = row.cellIterator();

        List cellBuffer = new ArrayList();
        Cell cell;
        while (cellIte.hasNext()) {
            cell = cellIte.next();
            switch (cell.getCellTypeEnum()) {
                case NUMERIC:
                    cellBuffer.add(cell.getNumericCellValue());
                    break;
                case BOOLEAN:
                    cellBuffer.add(cell.getBooleanCellValue());
                    break;
                case STRING:
                    cellBuffer.add(cell.getStringCellValue());
                    break;
                case BLANK:
                    cellBuffer.add(null);
                    break;
                default:
                    throw new RuntimeException("unSupport cell type " + cell.getCellTypeEnum());
            }
        }
        return cellBuffer.toArray();
    }
}
  • 運(yùn)行
@Test
private void test(){
        String path = "file:/C:/Users/liyalei/Downloads/test.xlsx";
        SparkSession spark = SparkSession.builder().master("local[2]").appName("local test").getOrCreate();

        String dataSource = ExcelDataSourceV2.class.getName();

        Map<String, String> schemaMap = new HashMap<>();
        schemaMap.put("a", "String");
        schemaMap.put("b", "String");
        schemaMap.put("c", "String");
        schemaMap.put("d", "String");

        Dataset<Row> rows = spark.read().format(dataSource)
                //可選: 指定 schema 信息
//                .option("schema", new Gson().toJson(schemaMap))
                //必填:是否有表頭
                .option("header", true)
                //必填:文件路徑,多個(gè)路徑用逗號(hào)分隔
                .load(path);

        rows.show();
    }

寫(xiě)在最后:上面的實(shí)現(xiàn)中,讀取excel使用的poi的工具包,但是poi對(duì)于稍大的excel文件就會(huì)oom,所有推薦使用ali 的 EasyExcel工具包替換掉,這個(gè)后面有空了再貼上EasyExcel的改版,暫時(shí)請(qǐng)讀者自行改造。

最后補(bǔ)上一個(gè)對(duì)datasource v1的粗略實(shí)現(xiàn)的例子,細(xì)節(jié)地方讀者根據(jù)需要自行補(bǔ)充完整;需要自取

package self.robin.examples.spark.sources.excel;

import lombok.val;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.FileFormat$class;
import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.SerializableConfiguration;
import scala.Function1;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.mutable.ListBuffer;
import scala.runtime.AbstractFunction1;
import self.robin.examples.spark.sources.SparkWorkbookHelper;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * @Description: ...
 * @Author: Li Yalei - Robin
 * @Date: 2020/12/22 21:14
 */
public class ExcelDataSource implements FileFormat, Serializable {


    @Override
    public Option<StructType> inferSchema(SparkSession sparkSession, Map<String, String> options, Seq<FileStatus> files) {
        ExcelOptions xlsxOptions = new ExcelOptions(options);

        //TODO: 此處 schema 的解析未做詳細(xì)實(shí)現(xiàn)
        StructType structType = new StructType().add("aa", DataTypes.StringType.typeName())
                .add("bb", DataTypes.StringType.typeName());
        return Option.apply(structType);
    }

    @Override
    public boolean supportBatch(SparkSession sparkSession, StructType dataSchema) {
        return false;
    }

    @Override
    public OutputWriterFactory prepareWrite(SparkSession sparkSession, Job job, Map<String, String> options, StructType dataSchema) {
        throw new RuntimeException("unImplement OutputWriterFactory");
    }

    @Override
    public Option<Seq<String>> vectorTypes(StructType requiredSchema, StructType partitionSchema, SQLConf sqlConf) {
        throw new RuntimeException("unImplement vectorTypes");
    }

    @Override
    public Function1<PartitionedFile, Iterator<InternalRow>> buildReaderWithPartitionValues(SparkSession sparkSession, StructType dataSchema,
                                                                                            StructType partitionSchema,
                                                                                            StructType requiredSchema, Seq<Filter> filters,
                                                                                            Map<String, String> options,
                                                                                            Configuration hadoopConf) {


        return FileFormat$class.buildReaderWithPartitionValues(this, sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf);
    }

    @Override
    public boolean isSplitable(SparkSession sparkSession, Map<String, String> options, Path path) {
        return false;
    }

    @Override
    public Function1<PartitionedFile, Iterator<InternalRow>> buildReader(SparkSession sparkSession,
                                                                         StructType dataSchema,
                                                                         StructType partitionSchema,
                                                                         StructType requiredSchema,
                                                                         Seq<Filter> filters,
                                                                         Map<String, String> options,
                                                                         Configuration hadoopConf) {
        //TODO verify schema
        val xlsxOptions = new ExcelOptions(options);
        Broadcast<SerializableConfiguration> broadcastedHadoopConf = JavaSparkContext.fromSparkContext(sparkSession.sparkContext())
                .broadcast(new SerializableConfiguration(hadoopConf));

        return new InternalFunction1(requiredSchema, broadcastedHadoopConf, xlsxOptions);
    }

    /**
     * 此內(nèi)部類只為了能夠序列化用
     */
    class InternalFunction1 extends AbstractFunction1<PartitionedFile, Iterator<InternalRow>>
            implements Serializable {

        private StructType requiredSchema;
        private Broadcast<SerializableConfiguration> hadoopConf;
        private ExcelOptions xlsxOptions;

        public InternalFunction1(StructType requiredSchema, Broadcast<SerializableConfiguration> hadoopConf, ExcelOptions xlsxOptions) {
            this.requiredSchema = requiredSchema;
            this.hadoopConf = hadoopConf;
            this.xlsxOptions = xlsxOptions;
        }

        @Override
        public Iterator<InternalRow> apply(PartitionedFile file) {
            Configuration config = hadoopConf.getValue().value();

            try(Workbook wb = SparkWorkbookHelper.createWorkbook(file.filePath(), config)) {
                return readFile(xlsxOptions, config, wb, requiredSchema);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }
    }

    /**
     * read file
     *
     * @param requiredSchema
     * @param hadoopConf
     * @return
     */
    public Iterator<InternalRow> readFile(ExcelOptions options, Configuration hadoopConf, Workbook workbook, StructType requiredSchema) {

        ListBuffer<InternalRow> rowListBuffer = new ListBuffer();

        int sheetNbr = workbook.getNumberOfSheets();
        for (int i = 0; i < sheetNbr; i++) {
            Sheet sheet = workbook.getSheetAt(i);
            java.util.Iterator<Row> rowIte = sheet.rowIterator();

            Row row;
            while (rowIte.hasNext()) {
                row = rowIte.next();
                java.util.Iterator<Cell> cellIte = row.cellIterator();

                List cellBuffer = new ArrayList();
                Cell cell;
                while (cellIte.hasNext()) {
                    cell = cellIte.next();
                    switch (cell.getCellTypeEnum()) {
                        case NUMERIC:
                            cellBuffer.add(cell.getNumericCellValue());
                            break;
                        case BOOLEAN:
                            cellBuffer.add(cell.getBooleanCellValue());
                            break;
                        case STRING:
                            cellBuffer.add(UTF8String.fromString(cell.getStringCellValue()));
                            break;
                        case BLANK:
                            cellBuffer.add(null);
                            break;
                        default:
                            throw new RuntimeException("unSupport cell type");
                    }
                }
                InternalRow internalRow = InternalRow.fromSeq(JavaConversions.asScalaBuffer(cellBuffer).toSeq());
                rowListBuffer.$plus$eq(internalRow);
            }
        }
        return rowListBuffer.iterator();
    }
}

如果需要完整的項(xiàng)目,請(qǐng)移步至:https://github.com/Lahar-bigdata/fast-examples/tree/main/spark/src/main/java/self/robin/examples/spark/sources/v2/excel

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容