spark程序中調(diào)用shell腳本

scala直接調(diào)用shell腳本是不行的,但是可以利用java調(diào)用shell腳本然后在spark代碼中引入java代碼實(shí)現(xiàn)。

  • 參考:java代碼調(diào)用shell腳本
  • shell腳本必須在spark的driver端調(diào)用,在worker端只能處理數(shù)據(jù)。因此必須在spark的DAG引擎開始或者結(jié)束以及兩個(gè)job之間調(diào)用shell腳本。
  • 根據(jù)以上前提,spark執(zhí)行shell腳本代碼只能在以下幾個(gè)位置:
  1. SparkContext創(chuàng)建之前和創(chuàng)建之后但是創(chuàng)建RDD之前。
  2. 每一個(gè)job的Spark的action函數(shù)執(zhí)行之后以及下一個(gè)job的transformation函數(shù)執(zhí)行之前。
  3. 最后一個(gè)job的action函數(shù)執(zhí)行結(jié)束之后。

demo

  • 先寫一個(gè)java類,擁有一個(gè)調(diào)用shell腳本的方法。
public class MyJavaClass {

    public void executeShell(String shpath) {
        try {
            Process ps = Runtime.getRuntime().exec(shpath);
            ps.waitFor();
            BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
            StringBuffer sb = new StringBuffer();
            String line;
            while ((line = br.readLine()) != null) {
                sb.append(line).append("\n");
            }
            String result = sb.toString();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • spark程序的主類
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("wordCount").setMaster("yarn")
    val sc=new SparkContext(conf)

    val rdd=sc.textFile("hdfs:///test/Hamlet.txt")
    val rdd1=rdd.flatMap(x=>x.split(" "))
      .filter(x=>x.size>1)
      .map(x=>(x,1))
      .reduceByKey(_+_)
      .map{case(x,y)=>(y,x)}
    rdd1.sortByKey(false)
      .map{case(a,b)=>(b,a)}
      .saveAsTextFile("hdfs:///test/output.txt")//saveAsTextFile是個(gè)action,真正開始提交job,
    // 調(diào)用shell腳本
    val shpath = "/data/spark-runshell.sh";
    val javaClass = new MyJavaClass()
    val addResult = javaClass.executeShell(shpath)
    println(addResult);
  }
}
  • 注意:在maven工程中,scala包里不能寫java類,否則編譯不通過,必須將java類放在java包里才能編譯通過。


    maven工程.png
  • jar包提交到spark集群
#!/bin/bash
spark-submit \
--master yarn \
--class com.kouyy.test.SparkWordCount /data/sparkdemo-1.0-SNAPSHOT.jar
服務(wù)器上spark程序Jar包及運(yùn)行腳本.png
  • spark-runshell.sh內(nèi)容
#!/bin/bash
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
  • 運(yùn)行spark程序結(jié)果


    運(yùn)行成功.png
調(diào)用shell腳本成功且在driver端運(yùn)行.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容