scala直接調(diào)用shell腳本是不行的,但是可以利用java調(diào)用shell腳本然后在spark代碼中引入java代碼實(shí)現(xiàn)。
- 參考:java代碼調(diào)用shell腳本
- shell腳本必須在spark的driver端調(diào)用,在worker端只能處理數(shù)據(jù)。因此必須在spark的DAG引擎開始或者結(jié)束以及兩個(gè)job之間調(diào)用shell腳本。
- 根據(jù)以上前提,spark執(zhí)行shell腳本代碼只能在以下幾個(gè)位置:
- SparkContext創(chuàng)建之前和創(chuàng)建之后但是創(chuàng)建RDD之前。
- 每一個(gè)job的Spark的action函數(shù)執(zhí)行之后以及下一個(gè)job的transformation函數(shù)執(zhí)行之前。
- 最后一個(gè)job的action函數(shù)執(zhí)行結(jié)束之后。
demo
- 先寫一個(gè)java類,擁有一個(gè)調(diào)用shell腳本的方法。
public class MyJavaClass {
public void executeShell(String shpath) {
try {
Process ps = Runtime.getRuntime().exec(shpath);
ps.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
StringBuffer sb = new StringBuffer();
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String result = sb.toString();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- spark程序的主類
object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("wordCount").setMaster("yarn")
val sc=new SparkContext(conf)
val rdd=sc.textFile("hdfs:///test/Hamlet.txt")
val rdd1=rdd.flatMap(x=>x.split(" "))
.filter(x=>x.size>1)
.map(x=>(x,1))
.reduceByKey(_+_)
.map{case(x,y)=>(y,x)}
rdd1.sortByKey(false)
.map{case(a,b)=>(b,a)}
.saveAsTextFile("hdfs:///test/output.txt")//saveAsTextFile是個(gè)action,真正開始提交job,
// 調(diào)用shell腳本
val shpath = "/data/spark-runshell.sh";
val javaClass = new MyJavaClass()
val addResult = javaClass.executeShell(shpath)
println(addResult);
}
}
-
注意:在maven工程中,scala包里不能寫java類,否則編譯不通過,必須將java類放在java包里才能編譯通過。
maven工程.png - jar包提交到spark集群
#!/bin/bash
spark-submit \
--master yarn \
--class com.kouyy.test.SparkWordCount /data/sparkdemo-1.0-SNAPSHOT.jar

服務(wù)器上spark程序Jar包及運(yùn)行腳本.png
- spark-runshell.sh內(nèi)容
#!/bin/bash
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
echo 'spark調(diào)用shell腳本運(yùn)行成功'
-
運(yùn)行spark程序結(jié)果
運(yùn)行成功.png

調(diào)用shell腳本成功且在driver端運(yùn)行.png

