I have a following scenario;
|----->Task1----| |---->Task3---|
start task-->| |-->Merge Task --->| | ----->End Task
|----->Task2----| |---->Task4---|
Currently the Task, Task2, Task3 and Task4 are ShortCircuitOperators, When one of the Task1 and Task2 are ShortCircuted all the downstream tasks are skipped.
But my requirement is to break the skipped state being propagated to Task3 and Task4 at Merge Task.
Cause I want the Task 3 and Task 4 to be run no matter what happens upstream.
Is there a way I can achieve this.? I want to have the dependencies in place as depicted/showed in the DAG.
Welcome to our forum @hrudayanath!
You should look into Trigger Rules!
As stated in the documentation,
One must be aware of the interaction between trigger rules and skipped tasks in schedule level. Skipped tasks will cascade through trigger rules
all_success
andall_failed
but notall_done
,one_failed
,one_success
,none_failed
,none_failed_or_skipped
,none_skipped
anddummy
.
To prevent the skipping state from cascading through the rest of the DAG, you will need to update the trigger rule of Task3 and Task4 to one of the rules where skip is not propagated. You probably want all_done
based on your description. With the all_done
trigger rule, those tasks will run regardless of what the state of the upstream is as long as it is done (has to finish execution).
Here is an example where b will always execute as long as a has finished executing.
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
a = DummyOperator(
task_id='a',
dag=dag
)
b = DummyOperator(
task_id='b',
trigger_rule=TriggerRule.ALLL_DONE,
dag=dag
)