Airflow: wait for previous dag run to complete

I want to set up a dag there are few cases that I would like to address while creating the dag.

  1. Next run of the dag should be skiped if the current dag is in execution or failed, using catchup=False and max_active_runs=1 for this, do I need to use wait_for_downstream for this?

Hi @milindnandankar! Welcome to the forum and thank you for reaching out :smiley:

Utilizing catchup and max_active_runs is the correct intuition to create the environment you wanted though I don’t think it is possible to skip dagruns with the given conditions.

catchup will prevent new runs from being created if it will exceed max_active_runs, which is set at 1. This won’t stop Airflow from skipping new dagruns if the latest dagrun failed before the next scheduled run.

wait_for_downstream, if set as True, will prevent downstream tasks of the current dagrun from running if the upstream task instances of previous dagrun are not in the success state. However, this implies that current dagrun is still created regardless of the state of the previous dagrun only that it finished before the next scheduled run, which isn’t what you need I don’t think.

The ask is to skip the next dagun when either of the following situation occurs:

  1. the current dagrun is still running after the time of the next scheduled run
  2. the current dagrun failed after the time of the next scheduled run
  3. the current dagrun failed but before the next scheduled run

For scenario 1 and 2, the next dagrun is already skipped with catchup set as False and max_active_runs set at 1, though it is more accurate to say the dagruns missed during the execution of the latest dagrun are skipped. I’m not sure if you count this as skipping the next run for scenario 2.

For scenario 3, since the latest dagrun failed before the next scheduled run, catchup is not involved here. Airflow will still create a dagrun regardless and I don’t think there is anything that will prevent it from doing so.

After some consideration, the best course of action is to have a mechanism for stopping execution depending on the state of the latest dagrun. However that is difficult to achieve with the stock Airflow in hand. I won’t go into too much detail but essentially you need to query the database to find that information which would require you to create a custom sensor.

Another possible solution would be to use the ExternalTaskSensor to see what the state of a task from the previous dagrun is, which you can obtain by setting execution_delta as the time difference between each scheduled run. This solution however would require you to have catchup to be set as True so that there is always a previous run to check on. Here is an example of what it would look like,

from airflow.sensors.external_task_sensor import ExternalTaskMarker
from airflow.utils.state import State

previous_dagrun_task_sensor = ExternalTaskSensor(
    task_id='previous_dagrun_task',
    external_dag_id='my_dag',
    external_task_id='previous_dagrun_task',
    allowed_states=[State.SUCCESS],
    execution_delta=timedelta(minutes=30),
    dag=DAG
)

Of course, I could be misinterpreting your specifications, so let me know if that helps otherwise provide more details so I can think of some approach for you!

3 Likes

Hi @Alan, thanks a lot for detailed description!

All of it makes sense. I was exploring the option of ExternalTaskSensor but got confused with execution_delta as I am planning to skip the current runs if previous is still running but I think you made a valid point of setting catchup to True so that there is always a previous run to check on.

I have one more scenario which I did not mention earlier. My first task in the dag checks on a particular condition and if the condition is met skips all the successive tasks, in that case, previous_dagrun_task will also be skipped, should I add State.skipped as well in allowed_states.

previous_dagrun_task_sensor = ExternalTaskSensor(
task_id=‘previous_dagrun_task’,
external_dag_id=‘my_dag’,
external_task_id=‘previous_dagrun_task’,
allowed_states=[State.SUCCESS, State.skipped],
execution_delta=timedelta(minutes=30),
dag=DAG
)

Let me know your thoughts on this

I see. As I understand, the first task is called condition_check followed by previous_dagrun_task. There are two scenarios.

  1. condition_check failed and the rest of the downstream tasks will be set as upstream_failed.
  2. condition_check passes and the rest of the downstream tasks will be set as skipped.

This results in nothing getting done so I might be misunderstanding something.

I think the sensor needs to be first to be the canary test for the previous dagrun.

  1. If the sensor passes, then the previous run passed meaning it will continue onward
  2. If the sensor failed, then the previous run never passed meaning you should skip the rest of the pipeline.

Then this indicates that you don’t need a sensor to continuously check because you only need to check once, which you can do with the proper parameters. However, you have two possible paths after the check. One is to skip and the other is to continue. You cannot do that with just a sensor but you can do so with a BranchPythonOperator and depending on the check you return the appropriate task id.

Hopefully that makes sense! If not, I can draft up a simple example to demostrate.

@Alan, thanks!

Understood how BranchPythonOperator works but still wanted to understand why ExternalTaskSensor won’t work here.

Basically I have four tasks:
previous_dagrun_task: checks the status of dag_complete_task
condition_check: checks the condition, if true trigger task_a else skip task_a
task_a: Does the actual work
dag_complete_task: Dummy task to mark dag is complete

If the condition is True then task_a and dag_complete_task will run and will be marked success
and if condition is false the task_a and dag_complete_task will be skipped, in this scenario won’t allowed_states=[State.SUCCESS, State.SKIPPED] work with ExternalTaskSensor, is it not valid?
I couldn’t find any example for this scenario, but I believe technically it should.
Let me know your thoughts.

Ahhh. I think I misunderstood the original description. I thought the condition_check is before the previous_dagrun_task. However, it sounds like it is the opposite based on your clarification. I provided the task dependencies as what I think it is with an additional task, do_nothing, that does nothing.

previous_dagrun_task >> condition_check
condition_check >> do_nothing
condition_check >> task_a >> dag_complete_task

The two routes are

  • When the condition passed, it goes to the task_a route
  • When the condition failed, it goes to the do_nothing route

When one route is taken the other is skipped.

As for the allowed state that previous_dagrun_task permits, I think if both State.SUCCESS and State.SKIPPED are included then it might complicate the logic and would require more safeguards.

This doesn’t account for when the dagrun fails within the dagrun_timeout restraint where the most probable point of failure would be at task_a meaning dag_complete_task will be in a failed state too. If dag_complete_task from the previous dagrun failed then the current previous_dagrun_task will never succeed , which would mean the current dag_complete_task won’t succeed either.


Let me elaborate on my previous answer since I think I missed a few important details. When I said you should use the ExternalTaskSensor, I meant you need utilize ExternalTaskSensor's execute function in BranchPythonOperator's python callable function. The dependency is similar to what you had described with a little adjustment.

from datetime import timedelta

from airflow.models import DAG

from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator

from airflow.utils.state import State


def some_work():
    pass


def condition_check(**context):
    # Inset additional check logic
    check_bool = True

    previous_task_a = ExternalTaskSensor(
        task_id='previous_task_a',
        external_dag_id=dag.dag_id,
        external_task_id=task_a.task_id,
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(minutes=30),
    )
    previous_task_a_succeeded = previous_task_a.poke(context)

    previous_do_nothing = ExternalTaskSensor(
        task_id='previous_do_nothing',
        external_dag_id=dag.dag_id,
        external_task_id=task_a.task_id,
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(minutes=30),
    )
    previous_do_nothing_succeeded = previous_do_nothing.poke(context)

    if check_bool and (previous_task_a_succeeded or previous_do_nothing_succeeded):
        return task_a.task_id
    else:
        return do_nothing.task_id


dag = DAG(dag_id='my_dag', start_date=datetime(2020, 1, 1))

condition_check = BranchPythonOperator(
    task_id='condition_check',
    python_callable=condition_check,
    provide_context=True,
    dag=dag
)

do_nothing = DummyOperator(
    task_id='do_nothing',
    dag=dag
)

task_a = PythonOperator(
    task_id='task_a',
    python_callable=some_work,
    dag=dag
)

condition_check >> do_nothing
condition_check >> task_a

Using python operator to check the original logic you want as well as the previous states of the tasks, I think this would cover all the requirements you listed previously. I am checking the if state of the previous two tasks, do_nothing and task_a. are success.

do_nothing task_a
T F
F T
F F

There are three scenario as shown in the table above.

  1. do nothing passed which means task_a skipped. Next dagrun execute as normal.
  2. do nothing skipped, task_a passed. Next dagrun execute as normal.
  3. do nothing skipped, task_a failed. Next dagrun skipped.

Hopefully that clarifies what I meant in my pervious answer.

*The code may contain typos but theoretically it should work. :crossed_fingers: