標(biāo)簽: crontab 調(diào)度
雖然現(xiàn)在越來越多的開源機(jī)器學(xué)習(xí)工具支持分布式訓(xùn)練,但分布式機(jī)器學(xué)習(xí)平臺(tái)的搭建和運(yùn)維的門檻通常是比較高的。另外一方面,有一些業(yè)務(wù)場(chǎng)景的訓(xùn)練數(shù)據(jù)其實(shí)并不是很大,在一臺(tái)普通的開發(fā)機(jī)上訓(xùn)練個(gè)把小時(shí)足矣。單機(jī)的機(jī)器學(xué)習(xí)工具使用起來通常要比分布式平臺(tái)好用很多。
特征工程在機(jī)器學(xué)習(xí)任務(wù)中占了很大的一部分比重,使用hive sql這樣的高級(jí)語(yǔ)言處理起來比較方面和快捷。因此,通常特征工程、樣本構(gòu)建都在離線分布式集群(hive集群)上完成,訓(xùn)練任務(wù)在數(shù)據(jù)量不大時(shí)可以在gateway機(jī)器上完成。這就涉及到幾個(gè)問題:
- gateway機(jī)器上的daily訓(xùn)練任務(wù)什么時(shí)候開始執(zhí)行?
- 模型訓(xùn)練結(jié)束,并對(duì)新數(shù)據(jù)做出預(yù)測(cè)后如何把數(shù)據(jù)上傳到分布式集群?
- 如何通知后置任務(wù)開始執(zhí)行?
對(duì)于第一個(gè)問題,理想的解決方案是公司大數(shù)據(jù)平臺(tái)的調(diào)度系統(tǒng)能夠調(diào)度某臺(tái)具體gateway上部署的任務(wù),并且可以獲取任務(wù)執(zhí)行的狀態(tài),在任務(wù)執(zhí)行成功后可以自動(dòng)調(diào)度后置任務(wù)。然而,有時(shí)候調(diào)度系統(tǒng)還沒有這么智能的時(shí)候,就需要我們自己想辦法解決了。Crontab是Unix和類Unix的操作系統(tǒng)中用于設(shè)置周期性被執(zhí)行的指令的工具。使用Crontab可以每天定時(shí)啟動(dòng)任務(wù)。美中不足在于必須要自己檢查前置任務(wù)是否已經(jīng)結(jié)束,也就是說我們要的數(shù)據(jù)有沒有產(chǎn)出,同時(shí)還要有一個(gè)讓后置任務(wù)等待當(dāng)前任務(wù)結(jié)束的機(jī)制。
檢查前置任務(wù)是否已經(jīng)結(jié)束
如果前置任務(wù)是hive任務(wù),那么結(jié)束標(biāo)志通常是一個(gè)hive表產(chǎn)生了特定分區(qū),我們只需要檢查這個(gè)分區(qū)是否存在就可以了。有個(gè)問題需要注意的是,可能在hive任務(wù)執(zhí)行過程中分區(qū)已經(jīng)產(chǎn)生,但任務(wù)沒有完全結(jié)束前數(shù)據(jù)還沒有寫完,這個(gè)時(shí)候啟動(dòng)后續(xù)任務(wù)是不正確。解決辦法就是在任務(wù)結(jié)束時(shí)為當(dāng)前表添加一個(gè)空的“標(biāo)志分區(qū)”,比如原來的分區(qū)是“pt=20170921”,我們可以添加一個(gè)空的分區(qū)“pt=20170921.done”(分區(qū)字段的類型為string時(shí)可用),或者“pt=-20170921”(分區(qū)字段的類型為int時(shí)可用)。然后,crontab調(diào)度的后置任務(wù)需要檢查這個(gè)“標(biāo)志分區(qū)”是否存在。
function log_info()
{
if [ "$LOG_LEVEL" != "WARN" ] && [ "$LOG_LEVEL" != "ERROR" ]
then
echo "`date +"%Y-%m-%d %H:%M:%S"` [INFO] ($$)($USER): $*";
fi
}
function check_partition() {
log_info "function [$FUNCNAME] begin"
#table,dt
temp=`hive -e "show partitions $1"`
echo $temp|grep -wq "$2"
if [ $? -eq 0 ];then
log_info "$1 parition $2 exists, ok"
return 0
else
log_info "$1 parition $2 doesn't exists"
return 1
fi
log_info "function [$FUNCNAME] end"
}
如果前置任務(wù)是MapReduce或者Spark任務(wù),那么結(jié)束標(biāo)志通常是在HDFS上產(chǎn)出了一個(gè)特定的路徑,后置任務(wù)只需要檢查這個(gè)特定路徑是否存在就可以。
## 功能: 檢查給定的文件或目錄在hadoop上是否存在
## $1 文件或者目錄, 不支持*號(hào)通配符
## $? return 0 if file exist, none-0 otherwise
function hadoop_check_file_exist()
{
## check params
if [ $# -ne 1 ]
then
log_info "Unexpected params for hadoop_check_file_exist() function! Usage: hadoop_check_file_exist <dir_or_file>";
return 1;
fi
## do it
log_info "${HADOOP_EXEC} --config ${HADOOP_CONF} fs -test -e $1"
${HADOOP_EXEC} --config ${HADOOP_CONF} fs -test -e "$1"
local ret=$?
if [ $ret -eq 0 ]
then
log_info "$1 does exist on Hadoop"
return 0;
else
log_info "($ret)$1 does NOT exist on Hadoop"
return 2;
fi
return 0;
}
其實(shí),hive任務(wù)的表的內(nèi)容也是存儲(chǔ)在HDFS上,因此也可以用檢查HDFS路徑的方法,來判斷前置hive任務(wù)是否已經(jīng)結(jié)束。可以用下面命令查看hive表對(duì)應(yīng)的HDFS路徑。
hive -e "desc formatted $tablename;"
循環(huán)等待前置任務(wù)結(jié)束
當(dāng)前置任務(wù)還沒有結(jié)束時(shí),需要循環(huán)等待。有兩種方法,一種是自己在Bash腳本里寫代碼,如下:
hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
while [ $? -ne 0 ]
do
local hh=`date '+%H'`
if [ $hh -gt 23 ]
then
echo "timeout, partition still not exist"
exit 1
fi
log_info "$hbase_dir/$table_name/pt=-$bizdate doesn't exist, wait for a while"
sleep 5m
hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
done
第二種方法,是利用crontab的周期性調(diào)度功能。比如可以讓crontab每隔5分鐘調(diào)度一次任務(wù)。這個(gè)時(shí)候需要注意的是,可能前一次調(diào)度的進(jìn)程還沒有執(zhí)行結(jié)束,后一次調(diào)度就已經(jīng)開始。這個(gè)時(shí)候可以使用linux flock文件鎖實(shí)現(xiàn)任務(wù)鎖定,解決沖突。
flock [-sxon][-w #] file [-c] command
-s, --shared: 獲得一個(gè)共享鎖
-x, --exclusive: 獲得一個(gè)獨(dú)占鎖
-u, --unlock: 移除一個(gè)鎖,通常是不需要的,腳本執(zhí)行完會(huì)自動(dòng)丟棄鎖
-n, --nonblock: 如果沒有立即獲得鎖,直接失敗而不是等待
-w, --timeout: 如果沒有立即獲得鎖,等待指定時(shí)間
-o, --close: 在運(yùn)行命令前關(guān)閉文件的描述符號(hào)。用于如果命令產(chǎn)生子進(jìn)程時(shí)會(huì)不受鎖的管控
-c, --command: 在shell中運(yùn)行一個(gè)單獨(dú)的命令
-h, --help 顯示幫助
-V, --version: 顯示版本
其中,file是一個(gè)空文件即可。比如,crontab文件可以這樣寫:
*/5 6-23 * * * flock -xn /tmp/pop_score.lock -c 'bash /home/xudong.yang/pop_score/train/main.sh -T -p -c >/dev/null 2>&1'
如果使用這種方法,腳本里面檢查前置任務(wù)沒有結(jié)束時(shí)就直接退出當(dāng)前進(jìn)程即可,像下面這樣:
hadoop_check_file_exist "$hbase_dir/$table_name/pt=-$bizdate"
if [ $? -ne 0 ]; then
log_info "$hbase_dir/$table_name/pt=-$bizdate doesn't exist, wait for a while"
exit 1
fi
雖然文件鎖能解決crontab調(diào)度沖突的問題,但是我們只希望腳本被成功執(zhí)行一次,任務(wù)結(jié)束之后,后續(xù)的調(diào)度直接退出。還有一種情況需要考慮,有可能crontab調(diào)度的任務(wù)的正在運(yùn)行,這個(gè)時(shí)候我們自己也手動(dòng)啟動(dòng)了同樣的任務(wù),如何避免這樣的沖突呢?
無非就是要有個(gè)標(biāo)記任務(wù)已經(jīng)成功運(yùn)行或者正在運(yùn)行標(biāo)識(shí)能夠在腳本里讀取,如何做到呢?對(duì)就是在指定目錄下建立特定名稱的空文件。在腳本開始的時(shí)候堅(jiān)持標(biāo)記文件是否存在,存在就直接退出。在任務(wù)正常運(yùn)行結(jié)束的時(shí)候touch成功執(zhí)行的標(biāo)記。結(jié)構(gòu)大概如下:
# 變量定義等
......
[ -f $data_home/$bizdate/DONE ] && { log_info "DONE file exists, exit" >> $log_file_path; exit 0; }
[ -f $data_home/$bizdate/RUNNING ] && { log_info "RUNNING file exists, exit" >> $log_file_path; exit 0; }
touch $data_home/$bizdate/RUNNING
trap "rm -f $data_home/$bizdate/RUNNING; echo Bye." EXIT QUIT ABRT INT HUP TERM
# do something here
......
if [ -f $data_home/$bizdate/RUNNING ]
then
mv $data_home/$bizdate/RUNNING $data_home/$bizdate/DONE
else
touch $data_home/$bizdate/DONE
fi
exit 0;
有了RUNNING標(biāo)記就不怕手動(dòng)執(zhí)行任務(wù)時(shí)和crontab調(diào)度沖突了;有了DONE標(biāo)記就不怕每天的任務(wù)被調(diào)度多次了。
從分布式集群下載數(shù)據(jù)
從hdfs下載數(shù)據(jù)的函數(shù):
## 功能: 將hadoop上的文件下載到本地并merge到一個(gè)文件中
## $1 hadoop葉子目錄 或 文件名--支持通配符 (*)
## $2 本地文件名
## $? return 0 if success, none-0 otherwise
function hadoop_getmerge()
{
## check params
if [ $# -ne 2 ]
then
log_info "Unexpected params for hadoop_getmerge() function! Usage: hadoop_getmerge <hadoop_file> <local_file>";
return 1;
fi
if [ -f $2 ]
then
log_info "Can not do hadoop_getmerge because local file $2 already exists!"
return 2;
fi
## do it
${HADOOP_EXEC} --config ${HADOOP_CONF} fs -getmerge $1 $2;
if [ $? -ne 0 ]
then
log_info "Do hadoop_getmerge FAILED! Source: $1, target: $2";
return 3;
else
log_info "Do hadoop_getmerge OK! Source: $1, target: $2";
return 0;
fi
return 0;
}
HIVE表里的數(shù)據(jù)也可以先找到對(duì)應(yīng)的HDFS目錄,然后用上面的函數(shù)下載數(shù)據(jù),唯一需要注意的是,HIVE表必須stored as textfile,否則下載下來的數(shù)據(jù)不可用。
萬(wàn)一hive表不是已文本文件的格式存儲(chǔ)的怎么辦呢?不要緊,還是有辦法的,如下:
mkdir -p $data_home/$bizdate/raw
declare sql="
set hive.support.quoted.identifiers=None;
insert overwrite local directory '$data_home/$bizdate/raw'
row format delimited fields terminated by '\t'
select \`(pt)?+.+\` from $table_name where pt=$bizdate;
"
log_info $sql
$hive -e "$sql"
上傳數(shù)據(jù)到分布式集群
模型訓(xùn)練和預(yù)測(cè)之后,必須把預(yù)測(cè)數(shù)據(jù)上傳到分布式集群,以便后續(xù)處理。
local create_table_sql="
create table if not exists $target_table_name (
......
)
partitioned by (pt int)
row format delimited fields terminated by '\t'
lines terminated by '\n'
stored as textfile;
"
log_info $create_table_sql
$hive -e "$create_table_sql"
local upload_sql="load data local inpath '$data_home/$bizdate/$predict_file' into table $target_table_name partition(pt=${bizdate});"
log_info $upload_sql
$hive -e "$upload_sql"
通知后置任務(wù)開始執(zhí)行
其實(shí)crontab沒法通知后置任務(wù)當(dāng)前任務(wù)已經(jīng)結(jié)束,那怎么辦呢?
把真正的后置任務(wù)加一個(gè)前置依賴任務(wù),而這個(gè)依賴任務(wù)是部署在調(diào)度系統(tǒng)上的一個(gè)shell任務(wù),該任務(wù)的前置任務(wù)是crontab調(diào)度任務(wù)的前置任務(wù),并且這個(gè)任務(wù)做的唯一一件事情就是循環(huán)檢查crontab調(diào)度任務(wù)的數(shù)據(jù)有沒有產(chǎn)出,已經(jīng)產(chǎn)出就結(jié)束,沒有產(chǎn)出就sleep一小段時(shí)間之后再繼續(xù)檢查。
check_partition $table_name $bizdate
while [ $? -ne 0 ]
do
sleep 5m
hh=`date '+%H'`
if [ $hh -gt 23 ]
then
echo "timeout, partition still not exist"
exit 1
fi
check_partition $table_name $bizdate
done
那些年,我們踩過的坑
一、crontab調(diào)度任務(wù)不會(huì)自動(dòng)export環(huán)境變量
開始的時(shí)候,手動(dòng)執(zhí)行腳本正常運(yùn)行,但是crontab調(diào)度每次都會(huì)在hadoop fs -test -e $path命令執(zhí)行出錯(cuò),表現(xiàn)為明明$path已經(jīng)存在,但指令總是返回1,而不是0 。經(jīng)過苦苦排查之后才發(fā)現(xiàn),hadoop依賴的環(huán)境變量JAVA_HOME和HADOOP_HOME沒有在腳本里導(dǎo)入。而用戶在終端里登錄到服務(wù)器上時(shí),這2個(gè)環(huán)境變量是自動(dòng)導(dǎo)入的。所以,務(wù)必記得在腳本開始的時(shí)候就導(dǎo)入環(huán)境變量:
#!/bin/bash
export JAVA_HOME=/usr/local/jdk
export HADOOP_HOME=...
二、crontab調(diào)度任務(wù)不能寫太多標(biāo)準(zhǔn)輸出,否則任務(wù)會(huì)在某個(gè)時(shí)刻自動(dòng)中斷
這個(gè)也挺坑的,務(wù)必記得在crontab的指令里重定向標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤到一個(gè)文件里,或者重定向到unix的黑洞/dev/null里。
*/5 6-23 * * * flock -xn /tmp/pop_score.lock -c 'bash /home/xudong.yang/pop_score/train/main.sh -T -p -c >/dev/null 2>&1'
這里推薦在腳本使用tee命令同時(shí)輸出日志到終端和文件,這樣用戶手動(dòng)執(zhí)行的時(shí)候可以直接在終端里看到程序的執(zhí)行情況,crontab調(diào)度里可以查看日志文件來排查問題。當(dāng)然,輸出到終端的部分,在使用crontab調(diào)度時(shí)需要重定向到黑洞里。
main | tee -a $log_file_path 2>&1
if [ ${PIPESTATUS[0]} -ne 0 ]
then
log_error "run failed."
exit 1;
fi