Our system pulls generated reports from a retailer with an unreliable reporting system. Basically we submit the report and check back every five minutes in hopes that it will be finished. Sometimes these reports are never finished. We have our DAGs set up to just call it quits after 23 hours so it can start pulling all reports again an hour later. If this timeout is reached is there a way to call a final clean up function before the DAG stops its execution?
Hi @Tgoad, thanks for reaching out!
You can achieve this with trigger rules, please see this guide.
After the task that checks if the report is finished, you can add one more task with a final clean up. If you set trigger_rule="all_done"
in your last task, it will run once all upstream tasks are done with their execution - no matter if they fail or succeed.
Please see an example below:
@task(retries=1)
def task_that_fails():
import time
import sys
time.sleep(10)
sys.exit()
@task(trigger_rule="all_done")
def final_task():
print("Run no matter what")
task_that_fails() >> final_task()
I appreciate the response but with a DAG level timeout that task won’t run when the time out is reached as it is also a standard task in the DAG and subject to the timeout. I am essentially look for the functionality of the task’s on_failure_callback but for a timeout being reached.
@Tgoad, you can set up timeout at the task level (using execution_timeout
), so after that particular task fails, the next task will still run.
I need the DAG total run time to not exceed 23 hours, not the individual tasks as report submissions get staggered because of scheduling and number of reports and i want a strict cut off no matter when the actual report was submitted.
@Tgoad, you can have one more DAG with two (or more) tasks:
- ExternalTaskSensor that will wait for a different DAG (or a task in an external DAG) to complete - not necessarily succeed - for a specific logical date; in your case this would be the DAG that pulls the reports.
- Your cleanup function.
Here’s an example:
A. trigger_dag
randomly fails.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
default_args = {
"retries": 0,
}
with DAG(dag_id="trigger_dag",
start_date=datetime(2022, 8, 1),
schedule_interval="@daily",
default_args=default_args,
) as dag:
@task()
def random_fun():
import random
import sys
if random.randrange(-10, 10) <= 0:
sys.exit()
else:
print("Positive")
random_fun()
B. triggered_dag
runs only if trigger_dag
fails for a specific logical date, otherwise DAGRun fails.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
default_args = {
"retries": 0,
}
with DAG(dag_id="triggered_dag",
start_date=datetime(2022, 8, 1),
schedule_interval="@daily",
default_args=default_args,
) as dag:
check_external_task = ExternalTaskSensor(
task_id="check_external_task",
external_dag_id="trigger_dag", # Ensure this equals the dag_id of the external DAG
allowed_states=["failed"],
mode="reschedule",
timeout=30,
)
finish = EmptyOperator(task_id="finish")
check_external_task >> finish