代碼提交Spark任務(wù)

Spark以Standalone模式運行,其他模式未測試


一、Spark統(tǒng)計任務(wù)

1.1 jar
hdfs:/home/mr/example/spark-example-1.0.jar
1.2 main class
org.learn.example.jobs.SparkJob

public class SparkJob implements Serializable {
    public static void main(String[] args) {
        String fullClassName = args[0];
        String[] strArr = fullClassName.split("\\.");
        String appName = strArr[strArr.length - 1];

        SparkSession session = SparkSession.builder().appName("SparkJob_" + appName).getOrCreate();

        try {
            Class clazz = Class.forName(fullClassName);
            Method method = clazz.getDeclaredMethod("run", SparkSession.class);
            method.invoke(clazz.newInstance(), session);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1.3 傳入main class的args

反射到統(tǒng)計任務(wù)

org.learn.example.jobs.WordCount

public class WordCount implements ISparkJob{
    @Override
    public void run(SparkSession session) {
        JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa bbb", "bbb ccc", "aaa"), Encoders.STRING()).javaRDD();

        javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                Iterator<String> iterator = Arrays.asList(line.split(" ")).iterator();
                return iterator;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).saveAsTextFile("/home/example/result");
    }
}

二、提交上面的統(tǒng)計任務(wù)到Spark集群

2.1 利用SparkSubmit.main提交
String[] args = {
        "--master", "spark://11.11.11.11:6066,22.22.22.22:6066",
        "--deploy-mode", "cluster",
        "--executor-memory", "1G",
        "--total-executor-cores", "2",
        "--class", "org.learn.example.jobs.SparkJob",
        "hdfs:/home/mr/example/spark-example-1.0.jar",
        "org.learn.example.jobs.WordCount"
};
SparkSubmit.main(args);
2.2 利用SparkLauncher提交
try {
    SparkLauncher launcher = new SparkLauncher()
            .setSparkHome("/home/spark")
            .setConf("spark.driver.memory", "1G")
            .setConf("spark.executor.memory", "1G")
            .setConf("spark.executor.cores", "1")
            .setConf("spark.cores.max", "2")
            .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
            .setDeployMode("cluster")
            .redirectOutput(new File("/home/example/logs/launch.log"))
            .setAppResource("hdfs:/home/mr/example/spark-example-1.0.jar")
            .setMainClass("org.learn.example.jobs.SparkJob")
            .addAppArgs("org.learn.example.jobs.WordCount");

    SparkAppHandle appHandle = launcher.startApplication(new SparkAppHandle.Listener() {
        @Override
        public void stateChanged(SparkAppHandle handle) {
        }
        @Override
        public void infoChanged(SparkAppHandle handle) {       
        }
    });
 } catch (Exception e) {
    e.printStackTrace();
 }
2.3 利用RestSubmissionClient提交

可獲取提交結(jié)果

try {
    String appResource = "hdfs:/home/mr/example/spark-example-1.0.jar";
    String mainClass = "org.learn.example.jobs.SparkJob";
    String[] args = {
            "org.learn.example.jobs.WordCount"
    };
    SparkConf sparkConf = new SparkConf()
            .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
            .set("spark.executor.cores", "1")
            .set("spark.submit.deployMode", "cluster")
            .set("spark.executor.memory", "1G")
            .set("spark.cores.max", "2")
            .set("spark.app.name", ""); // 在后面的統(tǒng)計任務(wù)中設(shè)置

    // 注意: 這里是 scala.collection.immutable.HashMap
    CreateSubmissionResponse response = (CreateSubmissionResponse) RestSubmissionClient.run(appResource, mainClass,
                    args, sparkConf, new HashMap<String, String>());

    logger.info("======> response: " + response.toJson());
} catch (Exception e) {
    e.printStackTrace();
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容