如何重跑 Cloud Composer 上的任務
Retry Cloud Composer Task
想要重跑 Cloud Composer 上的任務可以透過 Airflow Cli 來清除任務狀態, 迫使 Airflow 重新排程這個任務
清除指令, 如下
airflow tasks clear -s ${EXECUTION_DATE} -t ${TASK_ID} -d -y ${DAG_ID}
註:EXECUTION_DATE 的格式是以 ISO-8601 來表示, 所以會長這樣 "2022-07-26T00:00:00+00:00"
由於 Cloud Composer 會將 Airflow 部署在 GKE 上, 若要執行上面的指令, 必須
1. 取得 GKE 得 crendential
2. 找到 Airflow 管理工具的 Pod 執行 Airflow 指令
另一個方法是
使用 gcloud composer 來將指令傳給 Airflow
EXECUTION_DATE ="2022-07-26T00:00:00+00:00"
DAG_ID="sysops-daily-backup"
TASK_ID ="^result_chk$"
gcloud composer environments run \
--location="us-central1" \
--project="stage" sysops tasks clear -- {DAG_ID} -s {EXECUTION_DATE} -t {TASK_ID} -y -d
-d 是連下游任務的狀態都清除
方法二:在 AirFlow DAG 中使用 retry 機制來清除上游任務的狀態
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t {task_id} -y -d {DAG_ID}'
)
return clear_tasks.execute(context=context)
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
) as dag:
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
on_failure_callback=clear_upstream_task
)
留言
張貼留言