Ahhh. I think I misunderstood the original description. I thought the condition_check
is before the previous_dagrun_task
. However, it sounds like it is the opposite based on your clarification. I provided the task dependencies as what I think it is with an additional task, do_nothing
, that does nothing.
previous_dagrun_task >> condition_check
condition_check >> do_nothing
condition_check >> task_a >> dag_complete_task
The two routes are
- When the condition passed, it goes to the
task_a
route
- When the condition failed, it goes to the
do_nothing
route
When one route is taken the other is skipped.
As for the allowed state that previous_dagrun_task
permits, I think if both State.SUCCESS
and State.SKIPPED
are included then it might complicate the logic and would require more safeguards.
This doesn’t account for when the dagrun fails within the dagrun_timeout restraint where the most probable point of failure would be at task_a
meaning dag_complete_task
will be in a failed state too. If dag_complete_task
from the previous dagrun failed then the current previous_dagrun_task
will never succeed , which would mean the current dag_complete_task
won’t succeed either.
Let me elaborate on my previous answer since I think I missed a few important details. When I said you should use the ExternalTaskSensor, I meant you need utilize ExternalTaskSensor
's execute function in BranchPythonOperator
's python callable function. The dependency is similar to what you had described with a little adjustment.
from datetime import timedelta
from airflow.models import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.utils.state import State
def some_work():
pass
def condition_check(**context):
# Inset additional check logic
check_bool = True
previous_task_a = ExternalTaskSensor(
task_id='previous_task_a',
external_dag_id=dag.dag_id,
external_task_id=task_a.task_id,
allowed_states=[State.SUCCESS],
execution_delta=timedelta(minutes=30),
)
previous_task_a_succeeded = previous_task_a.poke(context)
previous_do_nothing = ExternalTaskSensor(
task_id='previous_do_nothing',
external_dag_id=dag.dag_id,
external_task_id=task_a.task_id,
allowed_states=[State.SUCCESS],
execution_delta=timedelta(minutes=30),
)
previous_do_nothing_succeeded = previous_do_nothing.poke(context)
if check_bool and (previous_task_a_succeeded or previous_do_nothing_succeeded):
return task_a.task_id
else:
return do_nothing.task_id
dag = DAG(dag_id='my_dag', start_date=datetime(2020, 1, 1))
condition_check = BranchPythonOperator(
task_id='condition_check',
python_callable=condition_check,
provide_context=True,
dag=dag
)
do_nothing = DummyOperator(
task_id='do_nothing',
dag=dag
)
task_a = PythonOperator(
task_id='task_a',
python_callable=some_work,
dag=dag
)
condition_check >> do_nothing
condition_check >> task_a
Using python operator to check the original logic you want as well as the previous states of the tasks, I think this would cover all the requirements you listed previously. I am checking the if state of the previous two tasks, do_nothing
and task_a
. are success.
do_nothing |
task_a |
T |
F |
F |
T |
F |
F |
There are three scenario as shown in the table above.
- do nothing passed which means task_a skipped. Next dagrun execute as normal.
- do nothing skipped, task_a passed. Next dagrun execute as normal.
- do nothing skipped, task_a failed. Next dagrun skipped.
Hopefully that clarifies what I meant in my pervious answer.
*The code may contain typos but theoretically it should work.