使用crontab調(diào)度hadoop任務(wù)和機(jī)器學(xué)習(xí)任務(wù)的正確姿勢(shì)

標(biāo)簽: crontab 調(diào)度


雖然現(xiàn)在越來越多的開源機(jī)器學(xué)習(xí)工具支持分布式訓(xùn)練,但分布式機(jī)器學(xué)習(xí)平臺(tái)的搭建和運(yùn)維的門檻通常是比較高的。另外一方面,有一些業(yè)務(wù)場(chǎng)景的訓(xùn)練數(shù)據(jù)其實(shí)并不是很大,在一臺(tái)普通的開發(fā)機(jī)上訓(xùn)練個(gè)把小時(shí)足矣。單機(jī)的機(jī)器學(xué)習(xí)工具使用起來通常要比分布式平臺(tái)好用很多。

特征工程在機(jī)器學(xué)習(xí)任務(wù)中占了很大的一部分比重,使用hive sql這樣的高級(jí)語(yǔ)言處理起來比較方面和快捷。因此,通常特征工程、樣本構(gòu)建都在離線分布式集群(hive集群)上完成,訓(xùn)練任務(wù)在數(shù)據(jù)量不大時(shí)可以在gateway機(jī)器上完成。這就涉及到幾個(gè)問題:

  1. gateway機(jī)器上的daily訓(xùn)練任務(wù)什么時(shí)候開始執(zhí)行?
  2. 模型訓(xùn)練結(jié)束,并對(duì)新數(shù)據(jù)做出預(yù)測(cè)后如何把數(shù)據(jù)上傳到分布式集群?
  3. 如何通知后置任務(wù)開始執(zhí)行?

對(duì)于第一個(gè)問題,理想的解決方案是公司大數(shù)據(jù)平臺(tái)的調(diào)度系統(tǒng)能夠調(diào)度某臺(tái)具體gateway上部署的任務(wù),并且可以獲取任務(wù)執(zhí)行的狀態(tài),在任務(wù)執(zhí)行成功后可以自動(dòng)調(diào)度后置任務(wù)。然而,有時(shí)候調(diào)度系統(tǒng)還沒有這么智能的時(shí)候,就需要我們自己想辦法解決了。Crontab是Unix和類Unix的操作系統(tǒng)中用于設(shè)置周期性被執(zhí)行的指令的工具。使用Crontab可以每天定時(shí)啟動(dòng)任務(wù)。美中不足在于必須要自己檢查前置任務(wù)是否已經(jīng)結(jié)束,也就是說我們要的數(shù)據(jù)有沒有產(chǎn)出,同時(shí)還要有一個(gè)讓后置任務(wù)等待當(dāng)前任務(wù)結(jié)束的機(jī)制。

檢查前置任務(wù)是否已經(jīng)結(jié)束

如果前置任務(wù)是hive任務(wù),那么結(jié)束標(biāo)志通常是一個(gè)hive表產(chǎn)生了特定分區(qū),我們只需要檢查這個(gè)分區(qū)是否存在就可以了。有個(gè)問題需要注意的是,可能在hive任務(wù)執(zhí)行過程中分區(qū)已經(jīng)產(chǎn)生,但任務(wù)沒有完全結(jié)束前數(shù)據(jù)還沒有寫完,這個(gè)時(shí)候啟動(dòng)后續(xù)任務(wù)是不正確。解決辦法就是在任務(wù)結(jié)束時(shí)為當(dāng)前表添加一個(gè)空的“標(biāo)志分區(qū)”,比如原來的分區(qū)是“pt=20170921”,我們可以添加一個(gè)空的分區(qū)“pt=20170921.done”(分區(qū)字段的類型為string時(shí)可用),或者“pt=-20170921”(分區(qū)字段的類型為int時(shí)可用)。然后,crontab調(diào)度的后置任務(wù)需要檢查這個(gè)“標(biāo)志分區(qū)”是否存在。

function log_info()
{
    if [ "$LOG_LEVEL" != "WARN" ] && [ "$LOG_LEVEL" != "ERROR" ]
    then
        echo "`date +"%Y-%m-%d %H:%M:%S"` [INFO] ($$)($USER): $*";
    fi
}

function check_partition() {
   log_info "function [$FUNCNAME] begin"
   #table,dt
   temp=`hive -e "show partitions $1"`
   echo $temp|grep -wq "$2"
   if [ $? -eq 0 ];then
       log_info "$1 parition $2 exists, ok"
       return 0
   else
       log_info "$1 parition $2 doesn't exists"
       return 1
   fi  
   log_info "function [$FUNCNAME] end"
}

如果前置任務(wù)是MapReduce或者Spark任務(wù),那么結(jié)束標(biāo)志通常是在HDFS上產(chǎn)出了一個(gè)特定的路徑,后置任務(wù)只需要檢查這個(gè)特定路徑是否存在就可以。

## 功能: 檢查給定的文件或目錄在hadoop上是否存在
## $1 文件或者目錄, 不支持*號(hào)通配符
## $? return 0 if file exist, none-0 otherwise
function hadoop_check_file_exist()
{
    ## check params
    if [ $# -ne 1 ]
    then
        log_info "Unexpected params for hadoop_check_file_exist() function! Usage: hadoop_check_file_exist <dir_or_file>";
        return 1;
    fi

    ## do it
    log_info "${HADOOP_EXEC} --config ${HADOOP_CONF} fs -test -e $1"
    ${HADOOP_EXEC} --config ${HADOOP_CONF} fs -test -e "$1"
    local ret=$?
    if [ $ret -eq 0 ]
    then
        log_info "$1 does exist on Hadoop"
        return 0;
    else
        log_info "($ret)$1 does NOT exist on Hadoop"
        return 2;
    fi
    return 0;
}

其實(shí),hive任務(wù)的表的內(nèi)容也是存儲(chǔ)在HDFS上,因此也可以用檢查HDFS路徑的方法,來判斷前置hive任務(wù)是否已經(jīng)結(jié)束。可以用下面命令查看hive表對(duì)應(yīng)的HDFS路徑。

hive -e "desc formatted $tablename;"

循環(huán)等待前置任務(wù)結(jié)束

當(dāng)前置任務(wù)還沒有結(jié)束時(shí),需要循環(huán)等待。有兩種方法,一種是自己在Bash腳本里寫代碼,如下:

  hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
  while [ $? -ne 0 ] 
  do  
    local hh=`date '+%H'`
    if [ $hh -gt 23 ]
    then
        echo "timeout, partition still not exist"
        exit 1
    fi  
    log_info "$hbase_dir/$table_name/pt=-$bizdate doesn't exist, wait for a while"
    sleep 5m
    hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
  done 

第二種方法,是利用crontab的周期性調(diào)度功能。比如可以讓crontab每隔5分鐘調(diào)度一次任務(wù)。這個(gè)時(shí)候需要注意的是,可能前一次調(diào)度的進(jìn)程還沒有執(zhí)行結(jié)束,后一次調(diào)度就已經(jīng)開始。這個(gè)時(shí)候可以使用linux flock文件鎖實(shí)現(xiàn)任務(wù)鎖定,解決沖突。

flock [-sxon][-w #] file [-c] command
-s, --shared:    獲得一個(gè)共享鎖
-x, --exclusive: 獲得一個(gè)獨(dú)占鎖
-u, --unlock:    移除一個(gè)鎖,通常是不需要的,腳本執(zhí)行完會(huì)自動(dòng)丟棄鎖
-n, --nonblock:  如果沒有立即獲得鎖,直接失敗而不是等待
-w, --timeout:   如果沒有立即獲得鎖,等待指定時(shí)間
-o, --close:     在運(yùn)行命令前關(guān)閉文件的描述符號(hào)。用于如果命令產(chǎn)生子進(jìn)程時(shí)會(huì)不受鎖的管控
-c, --command:   在shell中運(yùn)行一個(gè)單獨(dú)的命令
-h, --help       顯示幫助
-V, --version:   顯示版本

其中,file是一個(gè)空文件即可。比如,crontab文件可以這樣寫:

*/5 6-23 * * * flock -xn /tmp/pop_score.lock -c 'bash /home/xudong.yang/pop_score/train/main.sh -T -p -c >/dev/null 2>&1'

如果使用這種方法,腳本里面檢查前置任務(wù)沒有結(jié)束時(shí)就直接退出當(dāng)前進(jìn)程即可,像下面這樣:

  hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
  if [ $? -ne 0 ]; then
    log_info "$hbase_dir/$table_name/pt=-$bizdate doesn't exist, wait for a while"
    exit 1
  fi

雖然文件鎖能解決crontab調(diào)度沖突的問題,但是我們只希望腳本被成功執(zhí)行一次,任務(wù)結(jié)束之后,后續(xù)的調(diào)度直接退出。還有一種情況需要考慮,有可能crontab調(diào)度的任務(wù)的正在運(yùn)行,這個(gè)時(shí)候我們自己也手動(dòng)啟動(dòng)了同樣的任務(wù),如何避免這樣的沖突呢?

無非就是要有個(gè)標(biāo)記任務(wù)已經(jīng)成功運(yùn)行或者正在運(yùn)行標(biāo)識(shí)能夠在腳本里讀取,如何做到呢?對(duì)就是在指定目錄下建立特定名稱的空文件。在腳本開始的時(shí)候堅(jiān)持標(biāo)記文件是否存在,存在就直接退出。在任務(wù)正常運(yùn)行結(jié)束的時(shí)候touch成功執(zhí)行的標(biāo)記。結(jié)構(gòu)大概如下:

# 變量定義等
......
[ -f $data_home/$bizdate/DONE ] && { log_info "DONE file exists, exit" >> $log_file_path; exit 0; }
[ -f $data_home/$bizdate/RUNNING ] && { log_info "RUNNING file exists, exit" >> $log_file_path; exit 0; }

touch $data_home/$bizdate/RUNNING
trap "rm -f $data_home/$bizdate/RUNNING; echo Bye." EXIT QUIT ABRT INT HUP TERM
# do something here
......
if [ -f $data_home/$bizdate/RUNNING ]
then
    mv $data_home/$bizdate/RUNNING $data_home/$bizdate/DONE
else
    touch $data_home/$bizdate/DONE
fi
exit 0;

有了RUNNING標(biāo)記就不怕手動(dòng)執(zhí)行任務(wù)時(shí)和crontab調(diào)度沖突了;有了DONE標(biāo)記就不怕每天的任務(wù)被調(diào)度多次了。

從分布式集群下載數(shù)據(jù)

從hdfs下載數(shù)據(jù)的函數(shù):

## 功能: 將hadoop上的文件下載到本地并merge到一個(gè)文件中
## $1 hadoop葉子目錄 或 文件名--支持通配符 (*)
## $2 本地文件名
## $? return 0 if success, none-0 otherwise
function hadoop_getmerge()
{
    ## check params
    if [ $# -ne 2 ]
    then
        log_info "Unexpected params for hadoop_getmerge() function! Usage: hadoop_getmerge <hadoop_file> <local_file>";
        return 1;
    fi

    if [ -f $2 ]
    then
        log_info "Can not do hadoop_getmerge because local file $2 already exists!"
        return 2;
    fi

    ## do it
    ${HADOOP_EXEC} --config ${HADOOP_CONF} fs -getmerge $1 $2;
    if [ $? -ne 0 ]
    then
        log_info "Do hadoop_getmerge FAILED! Source: $1, target: $2";
        return 3;
    else
        log_info "Do hadoop_getmerge OK! Source: $1, target: $2";
        return 0;
    fi

    return 0;
}

HIVE表里的數(shù)據(jù)也可以先找到對(duì)應(yīng)的HDFS目錄,然后用上面的函數(shù)下載數(shù)據(jù),唯一需要注意的是,HIVE表必須stored as textfile,否則下載下來的數(shù)據(jù)不可用。
萬(wàn)一hive表不是已文本文件的格式存儲(chǔ)的怎么辦呢?不要緊,還是有辦法的,如下:

  mkdir -p $data_home/$bizdate/raw
  declare sql="
    set hive.support.quoted.identifiers=None;
    insert overwrite local directory '$data_home/$bizdate/raw'
    row format delimited fields terminated by '\t'
    select \`(pt)?+.+\` from $table_name where pt=$bizdate; 
  "
  log_info $sql
  $hive -e "$sql"

上傳數(shù)據(jù)到分布式集群

模型訓(xùn)練和預(yù)測(cè)之后,必須把預(yù)測(cè)數(shù)據(jù)上傳到分布式集群,以便后續(xù)處理。

  local create_table_sql="
    create table if not exists $target_table_name (
        ......
    )
    partitioned by (pt int)
    row format delimited fields terminated by '\t' 
    lines terminated by '\n' 
    stored as textfile;
  "
  log_info $create_table_sql
  $hive -e "$create_table_sql"

  local upload_sql="load data local inpath '$data_home/$bizdate/$predict_file' into table $target_table_name partition(pt=${bizdate});"
  log_info $upload_sql
  $hive -e "$upload_sql"

通知后置任務(wù)開始執(zhí)行

其實(shí)crontab沒法通知后置任務(wù)當(dāng)前任務(wù)已經(jīng)結(jié)束,那怎么辦呢?

把真正的后置任務(wù)加一個(gè)前置依賴任務(wù),而這個(gè)依賴任務(wù)是部署在調(diào)度系統(tǒng)上的一個(gè)shell任務(wù),該任務(wù)的前置任務(wù)是crontab調(diào)度任務(wù)的前置任務(wù),并且這個(gè)任務(wù)做的唯一一件事情就是循環(huán)檢查crontab調(diào)度任務(wù)的數(shù)據(jù)有沒有產(chǎn)出,已經(jīng)產(chǎn)出就結(jié)束,沒有產(chǎn)出就sleep一小段時(shí)間之后再繼續(xù)檢查。

check_partition $table_name $bizdate
while [ $? -ne 0 ] 
do
  sleep 5m
  hh=`date '+%H'`
  if [ $hh -gt 23 ]
  then
      echo "timeout, partition still not exist"
      exit 1
  fi  
  check_partition $table_name $bizdate
done

那些年,我們踩過的坑

一、crontab調(diào)度任務(wù)不會(huì)自動(dòng)export環(huán)境變量

開始的時(shí)候,手動(dòng)執(zhí)行腳本正常運(yùn)行,但是crontab調(diào)度每次都會(huì)在hadoop fs -test -e $path命令執(zhí)行出錯(cuò),表現(xiàn)為明明$path已經(jīng)存在,但指令總是返回1,而不是0 。經(jīng)過苦苦排查之后才發(fā)現(xiàn),hadoop依賴的環(huán)境變量JAVA_HOME和HADOOP_HOME沒有在腳本里導(dǎo)入。而用戶在終端里登錄到服務(wù)器上時(shí),這2個(gè)環(huán)境變量是自動(dòng)導(dǎo)入的。所以,務(wù)必記得在腳本開始的時(shí)候就導(dǎo)入環(huán)境變量:

#!/bin/bash
export JAVA_HOME=/usr/local/jdk
export HADOOP_HOME=...

二、crontab調(diào)度任務(wù)不能寫太多標(biāo)準(zhǔn)輸出,否則任務(wù)會(huì)在某個(gè)時(shí)刻自動(dòng)中斷

這個(gè)也挺坑的,務(wù)必記得在crontab的指令里重定向標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤到一個(gè)文件里,或者重定向到unix的黑洞/dev/null里。

*/5 6-23 * * * flock -xn /tmp/pop_score.lock -c 'bash /home/xudong.yang/pop_score/train/main.sh -T -p -c >/dev/null 2>&1'

這里推薦在腳本使用tee命令同時(shí)輸出日志到終端和文件,這樣用戶手動(dòng)執(zhí)行的時(shí)候可以直接在終端里看到程序的執(zhí)行情況,crontab調(diào)度里可以查看日志文件來排查問題。當(dāng)然,輸出到終端的部分,在使用crontab調(diào)度時(shí)需要重定向到黑洞里。

main | tee -a $log_file_path 2>&1
if [ ${PIPESTATUS[0]} -ne 0 ]
then
    log_error "run failed."
    exit 1;
fi
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容