airflow 動態(tài)創(chuàng)建task

airflow 動態(tài)創(chuàng)建task

通過http接口獲取一個列表結(jié)果,遍歷列表值,每條記錄動態(tài)創(chuàng)建一個task

實現(xiàn)方式

動態(tài)創(chuàng)建task需要寫兩個dag實現(xiàn),auto_rebuild_cube通過http的task獲取到需要遍歷的列表,提取name到xcom中。
第二個dag文件auto_build 通過 XCom.get_one 方法指定dag文件和execution_date,其中execution_date因為需要指定,所以我這里通過pendulum.now('Asia/Shanghai')直接拿的當前時間。

文件:auto_rebuild_cube

# -*- coding: utf-8 -*-
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.http_operator import SimpleHttpOperator

from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import json

import common
from common import china_days_ago

result = []

dag = DAG(
    dag_id='auto_rebuild_cube',
    default_args=common.default_args,
    start_date=china_days_ago(1),
    description='獲取遍歷列表',
    schedule_interval="9 * * * *",
)

get_all_cubes = SimpleHttpOperator(
    task_id="get_all_cubes",
    endpoint='kylin/api/cubes',
    headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
    data={"limit": "100"},
    method='GET',
    xcom_push=True,
    http_conn_id=common.global_kylin_http_id
)


def multitasking_task(**kwargs):
    xcom = kwargs['task_instance'].xcom_pull(task_ids="get_all_cubes")
    for data in json.loads(xcom):
        result.append(data['name'])
    kwargs['task_instance'].xcom_push(key="cubeNames", value=result)


multi_task = PythonOperator(
    task_id='multi_task',
    python_callable=multitasking_task,
    dag=dag,
    provide_context=True
)

push_task = BashOperator(
    task_id='push_task',
    bash_command="echo {{ task_instance.xcom_pull(key='cubeNames') }} ",
    dag=dag
)

trigger_build_cube = TriggerDagRunOperator(
    task_id='trigger_build_cube',
    trigger_dag_id="build_cube",
    python_callable=common.conditionally_trigger,
    params={'condition_param': True, 'message': '獲取列表成功,即將開始動態(tài)創(chuàng)建task'},
    dag=dag
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

latest_only >> get_all_cubes >> multi_task >> push_task >> trigger_build_cube

文件:auto_cube

import json

from airflow import DAG
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.http_operator import SimpleHttpOperator
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum

import common
from common import china_days_ago

dag = DAG(dag_id='build_cube',
          default_args=common.default_args,
          start_date=china_days_ago(1),
          schedule_interval=None)


def get_data():
    execution_date = pendulum.now('Asia/Shanghai')
    print("the execution_date is {}", execution_date)
    cube_names = XCom.get_one(dag_id='auto_rebuild_cube', key='cubeNames', execution_date=execution_date,
                              include_prior_dates=True)
    print("cubeNames is {}", cube_names)
    return cube_names


def multitasking_task(data):
    return SimpleHttpOperator(
        task_id="rebuild_cube_{}".format(data),
        endpoint='kylin/api/cubes/{}/rebuild'.format(data),
        headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
        data=json.dumps({"buildType": "BUILD"}),
        method='PUT',
        http_conn_id=common.global_kylin_http_id
    )


start = DummyOperator(
    task_id="start",
    dag=dag
)

end = DummyOperator(
    task_id="end",
    dag=dag
)

for data in get_data():
    start >> [multitasking_task(data)] >> end

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 做一個官方教程的搬運工,純粹為了自己過一遍腦子。 1.工作流定義示例 下面是定義工作流的示例代碼: 這個Airfl...
    路小漫閱讀 19,998評論 1 6
  • 介紹 到目前為止,我們已經(jīng)看到了很多Gradle構(gòu)建的屬性,并且知道了怎么去執(zhí)行Tasks。這一章,會更多的了解這...
    None_Ling閱讀 1,773評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,616評論 19 139
  • Java 常見英語單詞 (1.0 版本) 1. Java 基礎(chǔ)常見英語詞匯(70 個) OO: object-or...
    Nemo359閱讀 2,637評論 0 0
  • 夜鶯2517閱讀 128,172評論 1 9

友情鏈接更多精彩內(nèi)容