airflow概念

[toc]

Airflow平臺(tái)是一個(gè)描述、執(zhí)行、監(jiān)控工作流的工具。

DAGs

DAG(a Directed Acyclic Graph 有向無(wú)環(huán)圖)是Airflow中一組需要運(yùn)行任務(wù)組合,DAG以反應(yīng)任務(wù)間關(guān)系和依賴的方式組織。
Dag描述你的工作流是如何運(yùn)行。DAGs是由在Airflow的DAG_FOLDER目錄下的一些標(biāo)準(zhǔn)Python文件定義的。Airflow會(huì)通過(guò)執(zhí)行每一個(gè)文件中的代碼來(lái)動(dòng)態(tài)的建立DAG對(duì)象。每一個(gè)DAGs有任意數(shù)量的任務(wù)組成。通常,一個(gè)DAGs有相應(yīng)的簡(jiǎn)單邏輯的工作流。

有效范圍(Scope)

Airflow會(huì)載入來(lái)自DAGfile文件中可以導(dǎo)入的所有DAG對(duì)象。這意味著DAG必須是全局的,比如下面兩個(gè)DAGs,只有dag_1會(huì)被加載,另外一個(gè)只是本地有效范圍。

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

有時(shí)會(huì)非常有用。例如,SubDagOperator(子DAG操作)通常的模式是把子DAG放在一個(gè)函數(shù)里面,這樣Airflow不會(huì)把子DAG當(dāng)作一個(gè)獨(dú)立DAG而加載子DAG。

默認(rèn)參數(shù)

default_args的參數(shù)傳遞給DAG,就會(huì)傳遞給這個(gè)DAG所有的operator。因此一些共同的參數(shù)傳遞給很多個(gè)operator時(shí)不需要設(shè)置很多次。

default_args = {
    'start_date': datetime(2016, 1, 1),
    'owner': 'Airflow'
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

Operators(任務(wù)處理器)

DAGs描述工作流怎么運(yùn)行,Operators確定實(shí)際上干什么。在工作流中operator(處理器)描述一個(gè)任務(wù)。
任務(wù)處理器:

  • BashOperator: 執(zhí)行bash命令。
  • PythonOperator:調(diào)用任意數(shù)量的Python函數(shù)。
  • EmailOperator:發(fā)送郵件。
  • SimpleHttpOperator:發(fā)送HTTP請(qǐng)求。
  • MySqlOperator、SqliteOperator、PostgresOperator、MsSqlOperator、OracleOperator、JdbcOperator等等:執(zhí)行sql命令。
  • Sensor:等一定的時(shí)間、文件、數(shù)據(jù)庫(kù)行等
    另外除了這些基本任務(wù)處理器,還有更多的處理器: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator。任務(wù)處理器只有被指派給DAG之后才會(huì)被airflow加載。

DAG Assignment(DAG 指派器)

在operator在明確創(chuàng)建,dag指派,或者其他operator指引的實(shí)際, DAG Assignment才會(huì)創(chuàng)建。

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)

# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

Tasks(任務(wù))

一旦任務(wù)處理器實(shí)例化后,它會(huì)被成為任務(wù)。實(shí)例化成具體的值時(shí),調(diào)用抽象的任務(wù)處理器,參數(shù)化任務(wù)變成DAG中一個(gè)點(diǎn)。

Task Instances(任務(wù)實(shí)例)

任務(wù)實(shí)例化代表一個(gè)任務(wù)具體的運(yùn)行,由DAG、任務(wù)、時(shí)間的組合成。任務(wù)實(shí)例化也有一個(gè)明確的狀態(tài),狀態(tài)如 “running”, “success”, “failed”, “skipped”, “up for retry”等等。

Task LifeCycle(任務(wù)生命周期)

一個(gè)任務(wù)從開(kāi)始到結(jié)束中間由幾個(gè)過(guò)程。在Airflow的UI中(圖形和樹(shù)界面),這些階段采用不同的顏色代表不同的階段。


生命狀態(tài)不同的顏色表示

一個(gè)正常的工作流會(huì)經(jīng)歷如下階段:
1、no status (調(diào)度器創(chuàng)建空的任務(wù)實(shí)例)
2、queued (調(diào)度其把任務(wù)放在隊(duì)列里)
3、running (worker選擇任務(wù)和執(zhí)行任務(wù))
4、success (任務(wù)完成)

Workflows(工作流)

有些術(shù)語(yǔ)概念容易混淆。

  • DAG:一些有順序的工作的描述。
  • Operator(任務(wù)處理器):一類執(zhí)行一些工作的模版。
  • Task(任務(wù)):任務(wù)處理器的參數(shù)化實(shí)例。
  • Task Instance: 1、任務(wù)需要被指定給DAG;2、一個(gè)運(yùn)行的DAG相關(guān)的狀態(tài)。
    DAGs和Operator創(chuàng)建任務(wù)實(shí)例,創(chuàng)建復(fù)雜的工作流。

其他功能

Airflow除了上面的核心對(duì)象,還有一些并發(fā)訪問(wèn)資源,相互通信,條件執(zhí)行等高級(jí)特征的行為。

Hooks

Hooks是面向Hive, S3, MySQL, Postgres, HDFS和 Pig其他平臺(tái)和數(shù)據(jù)庫(kù)的接口。Hooks盡可能的實(shí)現(xiàn)了一些共同的接口,跟任務(wù)處理器構(gòu)造塊類似。它也用airflow.models.connection.Connection模型搜索主機(jī)名字和認(rèn)證信息。

Pools(airflow池)

當(dāng)太多任務(wù)同時(shí)處理時(shí),系統(tǒng)會(huì)崩潰。Airflow池面對(duì)任意數(shù)量的任務(wù)時(shí)會(huì)限制執(zhí)行任務(wù)的并發(fā)數(shù)。在用戶界面中通過(guò)airflow池的名字和指定數(shù)量的工作槽來(lái)管理airflow池列表。任務(wù)實(shí)例化時(shí)pool參數(shù)使任務(wù)和已存在的pools相關(guān)聯(lián)。

aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

觸發(fā)規(guī)則

盡管通常工作流觸發(fā)任務(wù)需要上流的任務(wù)都成功,但是Airflow提供來(lái)更加復(fù)雜的依賴設(shè)置。

  • all_success: 默認(rèn)。所有上流的任務(wù)都成功。
  • all_failed: 所有上流任務(wù)都失?。╝ failed or upstream_failed state)。
  • all_done: 所有上流任務(wù)都執(zhí)行了。
  • one_failed: 至少上流任務(wù)一個(gè)失敗了,不需要等所有上流任務(wù)都執(zhí)行完。
  • one_success: 至少上流任務(wù)一個(gè)成功了,不需要等所有上流任務(wù)都執(zhí)行完。
  • none_failed:所有上流任務(wù)沒(méi)有失敗。上流任務(wù)成功或者被跳過(guò)。 、
  • none_skipped: 上流任務(wù)沒(méi)有跳過(guò)的。上流任務(wù)是success, failed, or upstream_failed state之一。
  • dummy: 依賴只是顯示。

備注
翻譯自:https://airflow.apache.org/docs/stable/concepts.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文將介紹 Airflow 這一款優(yōu)秀的調(diào)度工具。主要包括 Airflow 的服務(wù)構(gòu)成、Airflow 的 Web...
    a7f00a9019ae閱讀 64,129評(píng)論 6 42
  • 聲明:本文轉(zhuǎn)自我的個(gè)人博客,有興趣的可以查看原文。轉(zhuǎn)發(fā)請(qǐng)注明來(lái)源。 最近工作需要,使用airflow搭建了公司的E...
    此星爺非彼星爺閱讀 40,137評(píng)論 3 19
  • 在快速啟動(dòng)部分中設(shè)置很簡(jiǎn)單,構(gòu)建生產(chǎn)級(jí)環(huán)境需要更多的工作,下面來(lái)了解一下。 1. 設(shè)置配置選項(xiàng) 第一次運(yùn)行Airf...
    路小漫閱讀 9,838評(píng)論 0 3
  • 脈脈清輝明月夜,晃晃燈下著詩(shī)文 不眠總憶從前事,北望京城思故人
    夢(mèng)裡不知曾是客閱讀 241評(píng)論 0 2
  • 中國(guó)文化博大精深,源遠(yuǎn)流長(zhǎng),漢字就是中國(guó)文化的一種載體,雖然從小學(xué)習(xí)漢語(yǔ),但是下面的段子,我想說(shuō)這是逼死老外的節(jié)奏...
    沫沫原夏閱讀 6,161評(píng)論 2 9

友情鏈接更多精彩內(nèi)容