1. 前言
近期計劃做一個任務(wù)調(diào)度系統(tǒng),于是,重拾airflow,借機(jī)深入學(xué)習(xí)下。
主要調(diào)研和測試具體使用方法、能否滿足我們的項目需求,以及可能存在哪些坑。
不了解airflow的朋友,可以參考我的上篇文章:
Python - Airflow任務(wù)調(diào)度系統(tǒng)初識
簡單回顧一下兩組關(guān)鍵名詞:
- Dag -> DagRun(Dag Instance)
- Operator -> Task -> Task Instance
Core Concepts — Airflow Documentation (apache.org)
2. 使用方法
2.1. 編寫Dag文件,并測試

- 梳理實(shí)際用戶需求,是否存在選擇分支,是否存在人工審核步驟,是否存在任務(wù)重跑,等
- 通過不同的Operator,實(shí)現(xiàn)一系列Task,以及任務(wù)的入?yún)?出參、任務(wù)間通信、任務(wù)間依賴等(附件采用PythonOperator,實(shí)現(xiàn)了一個ETL的過程,采用xcom通信)
- 在Test環(huán)境,測試Dag文件,是否符合預(yù)期
比較常用的兩個Operator:
- PythonOperator:可以通過args、kwargs設(shè)置Task運(yùn)行時參數(shù),通過xcom完成Task間通信
- BranchOperator:可以在運(yùn)行時,決定走哪個任務(wù)分支,比如時間、服務(wù)器資源、上個任務(wù)的輸出、當(dāng)前Dag的狀態(tài)等
坑
不采用json、yaml描述dag,便于airflow實(shí)現(xiàn)功能更加豐富的任務(wù)流,以及更自由的配置。
但是,如果讓非軟件開發(fā)人員編寫Dag文件,無疑壓力巨大。因此,需要人工審核。
如果是個2C軟件,則可能需要jinja2來實(shí)時render一個dag.py文件。
Best Practices — Airflow Documentation (apache.org)
2.2. 上傳Dag文件到scheduler和worker節(jié)點(diǎn)

Dag文件目錄,需要在scheduler和worker機(jī)器上各存儲一份,這個是由Airflow系統(tǒng)架構(gòu)決定。
采用的方法,可以:
- nas盤,掛載到同一個集群的不同機(jī)器上(最好在一個局域網(wǎng)內(nèi))
- ansible手動push到服務(wù)器節(jié)點(diǎn)
- 在服務(wù)器節(jié)點(diǎn),啟動腳本,實(shí)時或定時同步oss/gitlab文件到本地
坑
如果Dag文件很多的話,文件的管理,可能需要namespace、mainfest等。
另外,是否需要先同步到worker節(jié)點(diǎn),再同步到scheduler節(jié)點(diǎn)。
2.3. trigger dag
- 自動trigger,例如cron、upstream-task
- 手動trigger,通過命令行、airflow browser、airflow restful api
2.4. backfill
如果dag的下面兩個參數(shù)為True,則dag一啟動,則會查看過去的dagrun,如果存在未執(zhí)行的dagrun,則回填。
所以,一般情況下,這兩個參數(shù)需要手動設(shè)置為False。
depends_on_past=False
catchup=False
3. 日常維護(hù)
- metadata數(shù)據(jù)過多,會導(dǎo)致系統(tǒng)響應(yīng)緩慢,通過airflow browser就可以明顯感覺到,需要手動清理db
- 任務(wù)告警與相關(guān)責(zé)任人的管理
- 任務(wù)流上線流程,需要嚴(yán)格測試,并增加權(quán)限控制
- 任務(wù)流在生產(chǎn)環(huán)境運(yùn)行時,需要引入pool的概念,來隔離不同任務(wù)流之間的資源競爭
- Scheduler只能跑一份,所以存在單點(diǎn)故障風(fēng)險。需要借助第三方組件airflow-scheduler-failover-controller實(shí)現(xiàn)Scheduler的高可用
4. 參考
微信公眾號【三幫大數(shù)據(jù)】的系列文章大數(shù)據(jù)調(diào)度平臺Airflow
5. 附,ETL腳本
import json
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
"cp_pipeline_1",
default_args={"retries": 2},
description="computing pipeline",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
depends_on_past=False,
catchup=False,
tags=["computing-platform"],
) as dag:
def extract(*args, **kwargs):
print('extract kwargs', kwargs)
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
return {'extract', 'over'} # return值也會進(jìn)入到xcom,downstream的task也可以通過xcom獲取
def transform(*args, **kwargs):
print('transform kwargs', kwargs)
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
def load(*args, **kwargs):
print('load kwargs', kwargs)
ti = kwargs["ti"]
total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
total_order_value = json.loads(total_value_string)
print('load result', total_order_value)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
op_args=[],
op_kwargs=dict(a1=1,b1='bb',c1=False),
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
op_args=[],
op_kwargs=dict(a2=1,b2='bb',c2=False),
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
op_args=[],
op_kwargs=dict(a3=1,b3='bb',c3=False),
)
extract_task >> transform_task >> load_task