[toc]
Airflow平臺(tái)是一個(gè)描述、執(zhí)行、監(jiān)控工作流的工具。
DAGs
DAG(a Directed Acyclic Graph 有向無(wú)環(huán)圖)是Airflow中一組需要運(yùn)行任務(wù)組合,DAG以反應(yīng)任務(wù)間關(guān)系和依賴的方式組織。
Dag描述你的工作流是如何運(yùn)行。DAGs是由在Airflow的DAG_FOLDER目錄下的一些標(biāo)準(zhǔn)Python文件定義的。Airflow會(huì)通過(guò)執(zhí)行每一個(gè)文件中的代碼來(lái)動(dòng)態(tài)的建立DAG對(duì)象。每一個(gè)DAGs有任意數(shù)量的任務(wù)組成。通常,一個(gè)DAGs有相應(yīng)的簡(jiǎn)單邏輯的工作流。
有效范圍(Scope)
Airflow會(huì)載入來(lái)自DAGfile文件中可以導(dǎo)入的所有DAG對(duì)象。這意味著DAG必須是全局的,比如下面兩個(gè)DAGs,只有dag_1會(huì)被加載,另外一個(gè)只是本地有效范圍。
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
有時(shí)會(huì)非常有用。例如,SubDagOperator(子DAG操作)通常的模式是把子DAG放在一個(gè)函數(shù)里面,這樣Airflow不會(huì)把子DAG當(dāng)作一個(gè)獨(dú)立DAG而加載子DAG。
默認(rèn)參數(shù)
default_args的參數(shù)傳遞給DAG,就會(huì)傳遞給這個(gè)DAG所有的operator。因此一些共同的參數(shù)傳遞給很多個(gè)operator時(shí)不需要設(shè)置很多次。
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'Airflow'
}
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow
Operators(任務(wù)處理器)
DAGs描述工作流怎么運(yùn)行,Operators確定實(shí)際上干什么。在工作流中operator(處理器)描述一個(gè)任務(wù)。
任務(wù)處理器:
- BashOperator: 執(zhí)行bash命令。
- PythonOperator:調(diào)用任意數(shù)量的Python函數(shù)。
- EmailOperator:發(fā)送郵件。
- SimpleHttpOperator:發(fā)送HTTP請(qǐng)求。
- MySqlOperator、SqliteOperator、PostgresOperator、MsSqlOperator、OracleOperator、JdbcOperator等等:執(zhí)行sql命令。
- Sensor:等一定的時(shí)間、文件、數(shù)據(jù)庫(kù)行等
另外除了這些基本任務(wù)處理器,還有更多的處理器: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator。任務(wù)處理器只有被指派給DAG之后才會(huì)被airflow加載。
DAG Assignment(DAG 指派器)
在operator在明確創(chuàng)建,dag指派,或者其他operator指引的實(shí)際, DAG Assignment才會(huì)創(chuàng)建。
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
Tasks(任務(wù))
一旦任務(wù)處理器實(shí)例化后,它會(huì)被成為任務(wù)。實(shí)例化成具體的值時(shí),調(diào)用抽象的任務(wù)處理器,參數(shù)化任務(wù)變成DAG中一個(gè)點(diǎn)。
Task Instances(任務(wù)實(shí)例)
任務(wù)實(shí)例化代表一個(gè)任務(wù)具體的運(yùn)行,由DAG、任務(wù)、時(shí)間的組合成。任務(wù)實(shí)例化也有一個(gè)明確的狀態(tài),狀態(tài)如 “running”, “success”, “failed”, “skipped”, “up for retry”等等。
Task LifeCycle(任務(wù)生命周期)
一個(gè)任務(wù)從開(kāi)始到結(jié)束中間由幾個(gè)過(guò)程。在Airflow的UI中(圖形和樹(shù)界面),這些階段采用不同的顏色代表不同的階段。

一個(gè)正常的工作流會(huì)經(jīng)歷如下階段:
1、no status (調(diào)度器創(chuàng)建空的任務(wù)實(shí)例)
2、queued (調(diào)度其把任務(wù)放在隊(duì)列里)
3、running (worker選擇任務(wù)和執(zhí)行任務(wù))
4、success (任務(wù)完成)
Workflows(工作流)
有些術(shù)語(yǔ)概念容易混淆。
- DAG:一些有順序的工作的描述。
- Operator(任務(wù)處理器):一類執(zhí)行一些工作的模版。
- Task(任務(wù)):任務(wù)處理器的參數(shù)化實(shí)例。
- Task Instance: 1、任務(wù)需要被指定給DAG;2、一個(gè)運(yùn)行的DAG相關(guān)的狀態(tài)。
DAGs和Operator創(chuàng)建任務(wù)實(shí)例,創(chuàng)建復(fù)雜的工作流。
其他功能
Airflow除了上面的核心對(duì)象,還有一些并發(fā)訪問(wèn)資源,相互通信,條件執(zhí)行等高級(jí)特征的行為。
Hooks
Hooks是面向Hive, S3, MySQL, Postgres, HDFS和 Pig其他平臺(tái)和數(shù)據(jù)庫(kù)的接口。Hooks盡可能的實(shí)現(xiàn)了一些共同的接口,跟任務(wù)處理器構(gòu)造塊類似。它也用airflow.models.connection.Connection模型搜索主機(jī)名字和認(rèn)證信息。
Pools(airflow池)
當(dāng)太多任務(wù)同時(shí)處理時(shí),系統(tǒng)會(huì)崩潰。Airflow池面對(duì)任意數(shù)量的任務(wù)時(shí)會(huì)限制執(zhí)行任務(wù)的并發(fā)數(shù)。在用戶界面中通過(guò)airflow池的名字和指定數(shù)量的工作槽來(lái)管理airflow池列表。任務(wù)實(shí)例化時(shí)pool參數(shù)使任務(wù)和已存在的pools相關(guān)聯(lián)。
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
觸發(fā)規(guī)則
盡管通常工作流觸發(fā)任務(wù)需要上流的任務(wù)都成功,但是Airflow提供來(lái)更加復(fù)雜的依賴設(shè)置。
- all_success: 默認(rèn)。所有上流的任務(wù)都成功。
- all_failed: 所有上流任務(wù)都失?。╝ failed or upstream_failed state)。
- all_done: 所有上流任務(wù)都執(zhí)行了。
- one_failed: 至少上流任務(wù)一個(gè)失敗了,不需要等所有上流任務(wù)都執(zhí)行完。
- one_success: 至少上流任務(wù)一個(gè)成功了,不需要等所有上流任務(wù)都執(zhí)行完。
- none_failed:所有上流任務(wù)沒(méi)有失敗。上流任務(wù)成功或者被跳過(guò)。 、
- none_skipped: 上流任務(wù)沒(méi)有跳過(guò)的。上流任務(wù)是success, failed, or upstream_failed state之一。
- dummy: 依賴只是顯示。