title: Azkaban Learning
date: 2017-01-11 11:54:03
tags: [Azkaban,調(diào)度系統(tǒng),大數(shù)據(jù)組件]
categories: "調(diào)度系統(tǒng)"
Azkaban
關(guān)鍵字:Azkaban簡(jiǎn)介、大數(shù)據(jù)作業(yè)調(diào)度系統(tǒng)
這篇文章適合對(duì)azkaban有一定了解的人閱讀。建議先粗讀:
AZ開發(fā)文檔:http://azkaban.github.io/azkaban/docs/latest/#overview
強(qiáng)子哥的源碼分析:https://my.oschina.net/qiangzigege/blog/653198
(以下內(nèi)容部分摘自上兩個(gè)鏈接)
azkaban源碼: git clone https://github.com/azkaban/azkaban.git
Azkaban 簡(jiǎn)介
Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.
Azkaban 是由Linkedln公司為了解決hadoop 作業(yè)之間的依賴而實(shí)現(xiàn)的。因?yàn)橛幸恍〦TL作業(yè)以及數(shù)據(jù)分析產(chǎn)品需要按照一定的順序去執(zhí)行。
隨著hadoop用戶的逐年增加,Azkaban從一個(gè)簡(jiǎn)單的服務(wù)解決方案發(fā)展成為一個(gè)更加健壯魯棒的方案。
Azkaban的系統(tǒng)架構(gòu)主要由三個(gè)組件組成:
- WebServer :暴露Restful API,提供分發(fā)作業(yè)和調(diào)度作業(yè)功能;
- ExecServer :對(duì)WebServer 暴露 API ,提供執(zhí)行作業(yè)的功能;
- MySQL :數(shù)據(jù)存儲(chǔ),實(shí)現(xiàn)Web 和 Exec之間的數(shù)據(jù)共享和部分狀態(tài)的同步。

多執(zhí)行節(jié)點(diǎn)模式下,更細(xì)節(jié)一點(diǎn)的架構(gòu)圖可以如下,圖中省略MySQL數(shù)據(jù)庫(kù):

非常簡(jiǎn)單而直觀
WebServer
暴露Restful API
在azkaban-webserver工程中,可以非常清晰地看到對(duì)外暴露的Servlet,其中最主要的幾個(gè)是:
- ExecutorServlet 主要提供立即執(zhí)行作業(yè)、取消作業(yè)流、暫停作業(yè)流、獲取流或節(jié)點(diǎn)日志等接口
- ScheduleServlet 主要提供設(shè)置調(diào)度、設(shè)置Sla報(bào)警規(guī)則、獲取調(diào)度信息等接口
- HistoryServlet 主要提供查看作業(yè)流執(zhí)行歷史的接口
- ProjectManagerServlet 主要提供上傳項(xiàng)目zip包、下載項(xiàng)目zip包、刪除項(xiàng)目、獲取流的DAG等接口
分發(fā)作業(yè)
ExecutorManager 主要承擔(dān)這部分的功能,所有類型的作業(yè)(包括立即執(zhí)行和調(diào)度執(zhí)行),都會(huì)通過submitExecutableFlow(ExecutableFlow exflow, String userId)這個(gè)方法進(jìn)行提交。
在該方法中,我們可以看到:如果是多執(zhí)行節(jié)點(diǎn)模式下,執(zhí)行實(shí)例先放進(jìn)分發(fā)隊(duì)列中;如果是單節(jié)點(diǎn)模式下,立即調(diào)用dispatch方法進(jìn)行分發(fā)。
if (isMultiExecutorMode()) {
//Take MultiExecutor route
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
// assign only local executor we have
Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference.getExecId());
throw e;
}
}
在多執(zhí)行節(jié)點(diǎn)模式下,執(zhí)行實(shí)例被放進(jìn)分發(fā)隊(duì)列。隊(duì)列會(huì)被線程 QueueProcessorThread 定時(shí)處理。
/* Method responsible for processing the non-dispatched flows */
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
// if we have dispatched more than maxContinuousFlowProcessed or
// It has been more then activeExecutorsRefreshWindow millisec since we
// refreshed
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
/**
* <pre>
* TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
* Currently we try each queued flow once to infer a global busy state
* Possible improvements:-
* 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
* 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
* taking out all the filters which do not depend on the flow but are still being part of Selector.
* Assumptions:-
* 1. no one else except QueueProcessor is updating ExecutableFlow update time
* 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
* </pre>
*/
if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
// put back in the queue
queuedFlows.enqueue(exflow, reference);
long sleepInterval =
activeExecutorsRefreshWindow
- (currentTime - lastExecutorRefreshTime);
// wait till next executor refresh
sleep(sleepInterval);
} else {
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
}
// do not count failed flow processsing (flows still in queue)
if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
currentContinuousFlowProcessed++;
}
}
}
selectExecutorAndDispatchFlow 方法先是選擇執(zhí)行節(jié)點(diǎn)(選擇節(jié)點(diǎn)的實(shí)現(xiàn)比較有意思),選好節(jié)點(diǎn)后最終也是調(diào)用了dispatch進(jìn)行作業(yè)分發(fā)。
/* process flow with a snapshot of available Executors */
private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
if (selectedExecutor != null) {
try {
dispatch(reference, exflow, selectedExecutor);
} catch (ExecutorManagerException e) {
logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
selectedExecutor, exflow.getExecutionId()), e);
handleDispatchExceptionCase(reference, exflow, selectedExecutor,
availableExecutors);
}
} else {
handleNoExecutorSelectedCase(reference, exflow);
}
}
}
因?yàn)閃eb 和Exec 之間是通過mysql進(jìn)行數(shù)據(jù)共享的,所以dispatch進(jìn)行作業(yè)分發(fā)的邏輯非常簡(jiǎn)單,就是簡(jiǎn)單地通過HTTP請(qǐng)求傳遞execId等信息,其余所需要的數(shù)據(jù)都通過數(shù)據(jù)庫(kù)讀寫完成。
調(diào)度作業(yè)
調(diào)度作業(yè)是調(diào)度系統(tǒng)的最重要的功能之一,也是Azkaban里相對(duì)復(fù)雜的一個(gè)模塊。調(diào)度是通過ScheduleManager對(duì)外暴露,對(duì)應(yīng)著的結(jié)構(gòu)是Schedule;對(duì)內(nèi)是通過TriggerManager來(lái)實(shí)現(xiàn),對(duì)應(yīng)著的結(jié)構(gòu)是Trigger。
所有的調(diào)度信息都通過ScheduleManager.scheduleFlow傳入,可以看到傳入?yún)?shù)包含了項(xiàng)目id、項(xiàng)目名字、流名字、第一次調(diào)度時(shí)間戳、時(shí)區(qū)、調(diào)度周期、下一次執(zhí)行時(shí)間、提交時(shí)間、提交人。對(duì)于一個(gè)調(diào)度來(lái)說,最關(guān)鍵的信息無(wú)非是第一次調(diào)度時(shí)間和調(diào)度周期。
public Schedule scheduleFlow(final int scheduleId, final int projectId,
final String projectName, final String flowName, final String status,
final long firstSchedTime, final DateTimeZone timezone,
final ReadablePeriod period, final long lastModifyTime,
final long nextExecTime, final long submitTime, final String submitUser)
從scheduleFlow 往下可以看到調(diào)用了TriggerBasedScheduleLoader.insertSchedule。這個(gè)方法里邊先是將Schedule轉(zhuǎn)換成了Trigger,然后將Trigger放到了TriggerManager里邊。scheduleToTrigger方法寫的非常簡(jiǎn)潔巧妙,讀者自行研究,此處不作細(xì)致分析。
@Override
public void insertSchedule(Schedule s) throws ScheduleManagerException {
Trigger t = scheduleToTrigger(s);
try {
triggerManager.insertTrigger(t, t.getSubmitUser());
s.setScheduleId(t.getTriggerId());
} catch (TriggerManagerException e) {
throw new ScheduleManagerException("Failed to insert new schedule!", e);
}
}
我們?cè)诶^續(xù)看看Trigger被塞到TriggerManager做了些啥。從下邊可以看到,先是調(diào)用triggerLoader寫進(jìn)數(shù)據(jù)庫(kù),然后就放到了一個(gè)線程runnerThread中去。
public void insertTrigger(Trigger t) throws TriggerManagerException {
synchronized (syncObj) {
try {
triggerLoader.addTrigger(t);
} catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
接下來(lái)就顯而易見了,這個(gè)線程TriggerScannerThread runnerThread 定期檢查Trigger是否應(yīng)該觸發(fā)(onTriggerTrigger)或者終止(onTriggerExpire)。
private void checkAllTriggers() throws TriggerManagerException {
long now = System.currentTimeMillis();
// sweep through the rest of them
for (Trigger t : triggers) {
try {
scannerStage = "Checking for trigger " + t.getTriggerId();
boolean shouldSkip = true;
if (shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
if (justFinishedFlows.containsKey(execId)) {
logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
shouldSkip = false;
}
}
if (shouldSkip && t.getNextCheckTime() > now) {
shouldSkip = false;
}
if (shouldSkip) {
logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
}
if (logger.isDebugEnabled()) {
logger.info("Checking trigger " + t.getTriggerId());
}
if (t.getStatus().equals(TriggerStatus.READY)) {
if (t.triggerConditionMet()) {
onTriggerTrigger(t);
} else if (t.expireConditionMet()) {
onTriggerExpire(t);
}
}
if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
removeTrigger(t);
} else {
t.updateNextCheckTime();
}
} catch (Throwable th) {
//skip this trigger, moving on to the next one
logger.error("Failed to process trigger with id : " + t.getTriggerId(), th);
}
}
}
Trigger觸發(fā)的時(shí)候就會(huì)調(diào)用自己的action.doAction(),調(diào)度任務(wù)的Trigger的action一般都是ExecuteFlowAction,其doAction方法如下。方法主要做了兩個(gè)事情,第一個(gè)是構(gòu)建執(zhí)行實(shí)例ExecutableFlow,第二個(gè)是如果該調(diào)度設(shè)置了報(bào)警規(guī)則,則構(gòu)建SlaTrigger。
構(gòu)建執(zhí)行實(shí)例完成后,可以看到調(diào)用了executorManager.submitExecutableFlow(exflow, submitUser) 進(jìn)行作業(yè)分發(fā),這樣子,就跟上文提到的作業(yè)分發(fā)殊途同歸。下邊不再分析。
@Override
public void doAction() throws Exception {
if (projectManager == null || executorManager == null) {
throw new Exception("ExecuteFlowAction not properly initialized!");
}
Project project = projectManager.getProject(projectId);
if (project == null) {
logger.error("Project to execute " + projectId + " does not exist!");
throw new RuntimeException("Error finding the project to execute "
+ projectId);
}
Flow flow = project.getFlow(flowName);
if (flow == null) {
logger.error("Flow " + flowName + " cannot be found in project "
+ project.getName());
throw new RuntimeException("Error finding the flow to execute "
+ flowName);
}
ExecutableFlow exflow = new ExecutableFlow(project, flow);
exflow.setSubmitUser(submitUser);
exflow.addAllProxyUsers(project.getProxyUsers());
if (executionOptions == null) {
executionOptions = new ExecutionOptions();
}
if (!executionOptions.isFailureEmailsOverridden()) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
if (!executionOptions.isSuccessEmailsOverridden()) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
exflow.setExecutionOptions(executionOptions);
try {
executorManager.submitExecutableFlow(exflow, submitUser);
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
}
// deal with sla
if (slaOptions != null && slaOptions.size() > 0) {
int execId = exflow.getExecutionId();
for (SlaOption sla : slaOptions) {
logger.info("Adding sla trigger " + sla.toString() + " to execution "
+ execId);
SlaChecker slaFailChecker =
new SlaChecker("slaFailChecker", sla, execId);
Map<String, ConditionChecker> slaCheckers =
new HashMap<String, ConditionChecker>();
slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
Condition triggerCond =
new Condition(slaCheckers, slaFailChecker.getId()
+ ".isSlaFailed()");
// if whole flow finish before violate sla, just expire
SlaChecker slaPassChecker =
new SlaChecker("slaPassChecker", sla, execId);
Map<String, ConditionChecker> expireCheckers =
new HashMap<String, ConditionChecker>();
expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
Condition expireCond =
new Condition(expireCheckers, slaPassChecker.getId()
+ ".isSlaPassed()");
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<String> slaActions = sla.getActions();
for (String act : slaActions) {
if (act.equals(SlaOption.ACTION_ALERT)) {
SlaAlertAction slaAlert =
new SlaAlertAction("slaAlert", sla, execId);
actions.add(slaAlert);
} else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
KillExecutionAction killAct =
new KillExecutionAction("killExecution", execId);
actions.add(killAct);
}
}
Trigger slaTrigger =
new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond,
actions);
slaTrigger.getInfo().put("monitored.finished.execution",
String.valueOf(execId));
slaTrigger.setResetOnTrigger(false);
slaTrigger.setResetOnExpire(false);
logger.info("Ready to put in the sla trigger");
triggerManager.insertTrigger(slaTrigger);
logger.info("Sla inserted.");
}
}
}
WebServer總結(jié)
下邊用一張圖簡(jiǎn)單總結(jié)

ExecServer
暴露Restful API
Azkaban3.0后就開始支持多執(zhí)行節(jié)點(diǎn)部署。單個(gè)執(zhí)行節(jié)點(diǎn)比較簡(jiǎn)單,對(duì)web暴露的API也比較少,主要是:
- ExecutorServlet 主要提供執(zhí)行、取消、暫停、日志查詢等接口。
執(zhí)行作業(yè)
這里簡(jiǎn)單看下執(zhí)行節(jié)點(diǎn)執(zhí)行一個(gè)作業(yè)的流程是怎樣的。我們?cè)贓xecutorServlet中看到所有的執(zhí)行作業(yè)請(qǐng)求都經(jīng)過handleAjaxExecute方法,這個(gè)方法簡(jiǎn)單地將執(zhí)行id傳遞給FlowRunnerManager:
private void handleAjaxExecute(HttpServletRequest req,
Map<String, Object> respMap, int execId) throws ServletException {
try {
flowRunnerManager.submitFlow(execId);
} catch (ExecutorManagerException e) {
e.printStackTrace();
logger.error(e);
respMap.put(RESPONSE_ERROR, e.getMessage());
}
}
FlowRunnerManager 通過submitFlow方法提交工作流去執(zhí)行。先是構(gòu)建執(zhí)行實(shí)例ExecutableFlow,然后準(zhǔn)備執(zhí)行目錄setupFlow(flow),然后生成FlowRunner,然后提交到線程池去運(yùn)行executorService.submit(runner)。
public void submitFlow(int execId) throws ExecutorManagerException {
// Load file and submit
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
+ " is already running.");
}
ExecutableFlow flow = null;
flow = executorLoader.fetchExecutableFlow(execId);
if (flow == null) {
throw new ExecutorManagerException("Error loading flow with exec "
+ execId);
}
// Sets up the project files and execution directory.
setupFlow(flow);
// Setup flow runner
FlowWatcher watcher = null;
ExecutionOptions options = flow.getExecutionOptions();
if (options.getPipelineExecutionId() != null) {
Integer pipelineExecId = options.getPipelineExecutionId();
FlowRunner runner = runningFlows.get(pipelineExecId);
if (runner != null) {
watcher = new LocalFlowWatcher(runner);
} else {
watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
}
}
int numJobThreads = numJobThreadPerFlow;
if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
try {
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
FLOW_NUM_JOB_THREADS));
if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
.isProjectWhitelisted(flow.getProjectId(),
WhitelistType.NumJobPerFlow))) {
numJobThreads = numJobs;
}
} catch (Exception e) {
throw new ExecutorManagerException(
"Failed to set the number of job threads "
+ options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
+ " for flow " + execId, e);
}
}
FlowRunner runner =
new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
.setNumJobThreads(numJobThreads).addListener(this);
configureFlowLevelMetrics(runner);
// Check again.
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
+ " is already running.");
}
// Finally, queue the sucker.
runningFlows.put(execId, runner);
try {
// The executorService already has a queue.
// The submit method below actually returns an instance of FutureTask,
// which implements interface RunnableFuture, which extends both
// Runnable and Future interfaces
Future<?> future = executorService.submit(runner);
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
// update the last submitted time.
this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
+ "The number of running flows has reached the system configured limit."
+ "Please notify Azkaban administrators");
}
}
FlowRunner本身也繼承與Runnable,其run方法里邊調(diào)用了 runFlow方法,方法內(nèi)容如下。方法里按照樹的層次結(jié)構(gòu)逐層訪問DAG圖的每一個(gè)job,逐個(gè)去提交執(zhí)行。
private void runFlow() throws Exception {
logger.info("Starting flows");
runReadyJob(this.flow);
updateFlow();
while (!flowFinished) {
synchronized (mainSyncObj) {
if (flowPaused) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
continue;
} else {
if (retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
}
}
}
}
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
updateFlow();
logger.info("Finished Flow");
}
對(duì)于單個(gè)job,最后構(gòu)造一個(gè)JobRunner去執(zhí)行之。
private void runExecutableNode(ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
executorService.submit(runner);
activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
}
;
}
private JobRunner createJobRunner(ExecutableNode node) {
// Load job file.
File path = new File(execDir, node.getJobSource());
JobRunner jobRunner =
new JobRunner(node, path.getParentFile(), executorLoader,
jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
if (validateUserProxy) {
jobRunner.setValidatedProxyUsers(proxyUsers);
}
jobRunner.setDelayStart(node.getDelayedExecution());
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
if (JobCallbackManager.isInitialized()) {
jobRunner.addListener(JobCallbackManager.getInstance());
}
configureJobLevelMetrics(jobRunner);
return jobRunner;
}
每個(gè)jobRunner在執(zhí)行的時(shí)候,都去插件模塊里邊尋找對(duì)應(yīng)的插件來(lái)進(jìn)行job的類型加載。每種job類型都有對(duì)應(yīng)的run方法。最后就是調(diào)用run方法去執(zhí)行job。各種不同類型的job可以參考azkaban默認(rèn)的job類型以及 azkaban-plugin工程里邊實(shí)現(xiàn)的一些hadoop相關(guān)作業(yè)類型。
try {
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
} catch (JobTypeManagerException e) {
logger.error("Failed to build job type", e);
return false;
}
Azkaban Plugin
azkaban的插件機(jī)制使得可以非常方便的增加插件類型,從而支持運(yùn)行更多的作業(yè)類型。azkaban的hadoop插件可以從以下倉(cāng)庫(kù)中找到:
git clone https://github.com/azkaban/azkaban-plugins.git
插件的實(shí)現(xiàn)
其中插件的類繼承關(guān)系圖如下。每種插件作業(yè)都會(huì)單獨(dú)起一個(gè)進(jìn)程去執(zhí)行。其中ProcessJob就是負(fù)責(zé)起進(jìn)程的一個(gè)類;JavaProcessJob繼承自它,特化為Java進(jìn)程;其他的hadoop插件又各自繼承自JavaProcessJob。如果要自己實(shí)現(xiàn)插件類型,只要繼承JavaProcessJob類,在繼承子類里邊調(diào)用插件的Wrapper類就可以了。具體細(xì)節(jié)可以看代碼實(shí)現(xiàn)。
