Airflow版本 2.0.1
根據(jù)官網(wǎng)提示,airflow 支持hook函數(shù),可以在DAG運(yùn)行開始之前 根據(jù)我們觸發(fā)的參數(shù) 指定隊(duì)列
https://github.com/apache/airflow/discussions/14987
新建 airflow_local_settings.py 文件, 放在${AIRFLOW_HOME}/config/airflow_local_settings.py
無(wú)需重啟scheduler 即可加載。
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun()
conf = dag_run.conf
# print(conf)
# conf為 trigger_dag 的 --conf 參數(shù)
if conf['key4']:
if 'change_queue' in conf:
return
else:
task_instance.queue = conf['key4']
conf["change_queue"] = 'true'
上面代碼表示,在DAG 剛剛要運(yùn)行的時(shí)候,會(huì)執(zhí)行上面的hook函數(shù) 拿到conf 根據(jù)conf中的 key4 重新執(zhí)行DAG的queue。
DAG觸發(fā)參數(shù)為
airflow dags trigger 'dag_test_bash' -r `uuidgen` --conf '{"key4":"test_param"}'
傳入的參數(shù) key4 的value 會(huì)賦值到 task_instance.queue 也就是在DAG運(yùn)行前 為其動(dòng)態(tài)分配了隊(duì)列,實(shí)際生產(chǎn)中 可以用該種方式去路由DAG到指定ip的機(jī)器去運(yùn)行,我們可以在每臺(tái)機(jī)器起自己IP+業(yè)務(wù)前綴的worker,然后在這里指定,如指定queue為 prod_10.0.0.1的queue。