Spark job server使用調(diào)研

Job Server概述

Spark-jobserver 提供了一個(gè) RESTful 接口來(lái)提交和管理 spark 的 jobs、jars 和 job contexts。此項(xiàng)目包含了完整的 Spark job server 的項(xiàng)目,包括單元測(cè)試和項(xiàng)目部署腳本。
“Spark as Service”:針對(duì) job 和 contexts 的各個(gè)方面提供了 REST 風(fēng)格的 api 接口進(jìn)行管理

  • 支持 SparkSQL、Hive、Streaming Contexts/jobs 以及定制 job contexts
  • 通過(guò)集成 Apache Shiro 來(lái)支持 LDAP 權(quán)限驗(yàn)證
  • 支持亞秒級(jí)別低延遲的任務(wù)通過(guò)長(zhǎng)期運(yùn)行的 job contexts
  • 可以通過(guò)結(jié)束 context 來(lái)停止運(yùn)行的作業(yè)(job)
  • 分割 jar 上傳步驟以提高 job 的啟動(dòng)
  • 異步和同步的 job API,其中同步 API 對(duì)低延時(shí)作業(yè)非常有效
  • 支持 Standalone Spark 和 Mesos、yarn
  • Job 和 jar 信息通過(guò)一個(gè)可插拔的 DAO 接口來(lái)持久化
  • 命名 RDD 以緩存,并可以通過(guò)該名稱獲取 RDD。這樣可以提高作業(yè)間 RDD 的共享和重用
  • 支持 Scala 2.10 版本和 2.11 版本
    git地址: git@github.com:spark-jobserver/spark-jobserver.git

編譯、打包、部署

從github中clone此項(xiàng)目的代碼,此處選擇jobserver-0.6.2-spark-1.6.1版本分支。
首先根據(jù)目標(biāo)平臺(tái)來(lái)創(chuàng)建相應(yīng)的配置文件:
$JOBSERVER_HOME/config目錄下已存在一些配置模板,可以復(fù)用這些模板并對(duì)其中的配置項(xiàng)做相應(yīng)的調(diào)整。
在conf目錄下創(chuàng)建datacloud.conf及datacloud.sh文件,修改其中的配置項(xiàng)。

$JOBSERVER_HOME/bin/server_package datacloud //編譯打包
打包成功后,拷貝出job-server.tar.gz到目標(biāo)運(yùn)行平臺(tái),應(yīng)該部署安裝spark的服務(wù)器環(huán)境中。

啟動(dòng)、停止服務(wù)

$JOBSERVER_HOME/bin/server_start 啟動(dòng)服務(wù),默認(rèn)監(jiān)聽端口為8090,可在啟動(dòng)前修改datacloud.conf進(jìn)行配置。
$JOBSERVER_HOME/bin/server_stop停止服務(wù),注意服務(wù)停止后,常駐context將停止運(yùn)行,因此,重啟jobserver需要重新創(chuàng)建常駐context。

定制job Project

添加依賴:

resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"
libraryDependencies += "spark.jobserver" %% "job-server-api" % "0.6.2" % "provided"
libraryDependencies += "spark.jobserver" %% "job-server-extras" % "0.6.2" % "provided"```

通過(guò)job server來(lái)提交的job,必須實(shí)現(xiàn)SparkJob相關(guān)的接口,這是jobserver復(fù)用context機(jī)制的前提:

object SampleJob extends SparkJob {
override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}```
runJob定義一個(gè)job的具體實(shí)現(xiàn)邏輯。
validate在job執(zhí)行之前做參數(shù)進(jìn)行驗(yàn)證,驗(yàn)證通過(guò)后才會(huì)調(diào)用runjob方法。

Context管理

GET /contexts - 列出當(dāng)前所有context
POST /contexts/<name> - 創(chuàng)建context
DELETE /contexts/<name> - 停止一個(gè)context,并且停止所有在其運(yùn)行的作業(yè)。
PUT /contexts?reset=reboot - 刪除所有context,并且從配置中重新重建context。

POST創(chuàng)建context時(shí),可以對(duì)context資源進(jìn)行配置:
dependent-jar-uris=file:///path/a.jar,file:///path/b.jar 此參數(shù)用以對(duì)依賴jar包進(jìn)行配置
num-cpu-cores=10 配置cpu資源
memory-per-node=512m 配置內(nèi)存資源
上述配置會(huì)覆蓋datacloud.conf相同的配置項(xiàng),可配置項(xiàng)可參考job-server/src/main/resources/application.conf

Job管理

GET /jobs - 列出所有job,以及job執(zhí)行狀態(tài),包括等待中、運(yùn)行中、已完成
POST /jobs - 啟動(dòng)新作業(yè),參數(shù)?sync=true標(biāo)志同步等待作業(yè)計(jì)算結(jié)果
GET /jobs/<jobId> - 獲取指定job的執(zhí)行狀態(tài)和結(jié)果
DELETE /jobs/<jobId> - kill特定job
GET /jobs/<jobId>/config - 獲取job相關(guān)的配置信息

Job Server使用

臨時(shí)異步方式提交作業(yè)

此方式會(huì)臨時(shí)創(chuàng)建context,作業(yè)執(zhí)行完成后,context被刪除。

curl -d "input.string = a b c a b see" '172.16.31.63:8092/jobs?appName=test&classPath=spark.jobserver.WordCountExample
{
  "status": "STARTED",
  "result": {
    "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4",
    "context": "b7ea0eb5-spark.jobserver.WordCountExample"
  }
}```
因?yàn)槭钱惒椒绞教峤蛔鳂I(yè),需要主動(dòng)檢查作業(yè)的執(zhí)行結(jié)果:

curl localhost:8090/jobs/5453779a-f004-45fc-a11d-a39dae0f9bf4
{
"duration": "6.341 secs",
"classPath": "spark.jobserver.WordCountExample",
"startTime": "2015-10-16T03:17:03.127Z",
"context": "b7ea0eb5-spark.jobserver.WordCountExample",
"result": {
"a": 2,
"b": 2,
"c": 1,
"see": 1
},
"status": "FINISHED",
"jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4"
}```

臨時(shí)同步方式提交作業(yè)

添加sync參數(shù),值為true

curl -d "input.string = a b c a b see" '172.16.31.63:8092/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true'```
請(qǐng)求的response中將包含計(jì)算結(jié)果數(shù)據(jù)

{
"result": {
"b": 2,
"a": 2,
"see": 1,
"c": 1
}
}```

常駐context同步方式提交作業(yè)

首先創(chuàng)建一個(gè)常駐context

curl -d "" '172.16.31.63:8092/contexts/test-context?num-cpu-cores=4&memory-per-node=512m'

基于此context來(lái)創(chuàng)建一個(gè)job

curl -d "input.string = a b c a b see" '172.16.31.63:8092/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true'

常駐Context集群已為其分配資源,并且會(huì)一直處于運(yùn)行狀態(tài),提交作業(yè)時(shí),將會(huì)復(fù)用此Context進(jìn)行計(jì)算。
如果Jobserver服務(wù)進(jìn)行重啟,之前的Context將會(huì)終止,其占用資源將被釋放。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容