Python - Airflow再會

1. 前言

近期計劃做一個任務(wù)調(diào)度系統(tǒng),于是,重拾airflow,借機(jī)深入學(xué)習(xí)下。
主要調(diào)研和測試具體使用方法、能否滿足我們的項目需求,以及可能存在哪些坑。

不了解airflow的朋友,可以參考我的上篇文章:
Python - Airflow任務(wù)調(diào)度系統(tǒng)初識

簡單回顧一下兩組關(guān)鍵名詞:

  • Dag -> DagRun(Dag Instance)
  • Operator -> Task -> Task Instance

Core Concepts — Airflow Documentation (apache.org)

2. 使用方法

2.1. 編寫Dag文件,并測試

image.png
  • 梳理實(shí)際用戶需求,是否存在選擇分支,是否存在人工審核步驟,是否存在任務(wù)重跑,等
  • 通過不同的Operator,實(shí)現(xiàn)一系列Task,以及任務(wù)的入?yún)?出參、任務(wù)間通信、任務(wù)間依賴等(附件采用PythonOperator,實(shí)現(xiàn)了一個ETL的過程,采用xcom通信)
  • 在Test環(huán)境,測試Dag文件,是否符合預(yù)期

比較常用的兩個Operator:

  • PythonOperator:可以通過args、kwargs設(shè)置Task運(yùn)行時參數(shù),通過xcom完成Task間通信
  • BranchOperator:可以在運(yùn)行時,決定走哪個任務(wù)分支,比如時間、服務(wù)器資源、上個任務(wù)的輸出、當(dāng)前Dag的狀態(tài)等


不采用json、yaml描述dag,便于airflow實(shí)現(xiàn)功能更加豐富的任務(wù)流,以及更自由的配置。
但是,如果讓非軟件開發(fā)人員編寫Dag文件,無疑壓力巨大。因此,需要人工審核。
如果是個2C軟件,則可能需要jinja2來實(shí)時render一個dag.py文件。

Best Practices — Airflow Documentation (apache.org)

2.2. 上傳Dag文件到scheduler和worker節(jié)點(diǎn)

image.png

Dag文件目錄,需要在scheduler和worker機(jī)器上各存儲一份,這個是由Airflow系統(tǒng)架構(gòu)決定。
采用的方法,可以:

  • nas盤,掛載到同一個集群的不同機(jī)器上(最好在一個局域網(wǎng)內(nèi))
  • ansible手動push到服務(wù)器節(jié)點(diǎn)
  • 在服務(wù)器節(jié)點(diǎn),啟動腳本,實(shí)時或定時同步oss/gitlab文件到本地


如果Dag文件很多的話,文件的管理,可能需要namespace、mainfest等。
另外,是否需要先同步到worker節(jié)點(diǎn),再同步到scheduler節(jié)點(diǎn)。

2.3. trigger dag

  • 自動trigger,例如cron、upstream-task
  • 手動trigger,通過命令行、airflow browser、airflow restful api

2.4. backfill

如果dag的下面兩個參數(shù)為True,則dag一啟動,則會查看過去的dagrun,如果存在未執(zhí)行的dagrun,則回填。
所以,一般情況下,這兩個參數(shù)需要手動設(shè)置為False。

depends_on_past=False
catchup=False

3. 日常維護(hù)

  • metadata數(shù)據(jù)過多,會導(dǎo)致系統(tǒng)響應(yīng)緩慢,通過airflow browser就可以明顯感覺到,需要手動清理db
  • 任務(wù)告警與相關(guān)責(zé)任人的管理
  • 任務(wù)流上線流程,需要嚴(yán)格測試,并增加權(quán)限控制
  • 任務(wù)流在生產(chǎn)環(huán)境運(yùn)行時,需要引入pool的概念,來隔離不同任務(wù)流之間的資源競爭
  • Scheduler只能跑一份,所以存在單點(diǎn)故障風(fēng)險。需要借助第三方組件airflow-scheduler-failover-controller實(shí)現(xiàn)Scheduler的高可用

4. 參考

5. 附,ETL腳本

import json
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator


with DAG(
    "cp_pipeline_1",
    default_args={"retries": 2},
    description="computing pipeline",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    depends_on_past=False,
    catchup=False,
    tags=["computing-platform"],
) as dag:

    def extract(*args, **kwargs):
        print('extract kwargs', kwargs)
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)
        return {'extract', 'over'}  # return值也會進(jìn)入到xcom,downstream的task也可以通過xcom獲取

    def transform(*args, **kwargs):
        print('transform kwargs', kwargs)
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)

    def load(*args, **kwargs):
        print('load kwargs', kwargs)
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)
        print('load result', total_order_value)

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
        op_args=[],
        op_kwargs=dict(a1=1,b1='bb',c1=False),
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
        op_args=[],
        op_kwargs=dict(a2=1,b2='bb',c2=False),
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
        op_args=[],
        op_kwargs=dict(a3=1,b3='bb',c3=False),
    )

    extract_task >> transform_task >> load_task
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容