I am relatively new to Air Flow. I started using the Task Flow API recently. How do you do conditionals or branching with the Task Flow API?
I have something like this setup. Each of the functions called are separate tasks. If the transform and load is successful then I want to archive the data item. If an error or empty result is returned then send a notification.
if load(transform(item)):
archive()
else:
notify()
Hello @hawk1278, thanks for reaching out!
I would suggest setting up notifications in case of failures using callbacks (on_failure_callback
) or email notifications, please see this guide.
Below you can see how to use branching with TaskFlow API. Task random_fun
randomly returns True
or False
and based on the returned value, task branching
decides whether to follow true_branch
or false_branch
.
@task
def random_fun():
import random
return random.randrange(-10, 10) > 0
@task.branch()
def branching(x):
if x is True:
return "true_branch"
else:
return "false_branch"
@task()
def true_branch():
print("True")
@task()
def false_branch():
print("False")
branching(random_fun()) >> [true_branch(), false_branch()]
1 Like
@magdagultekin I like your solution, but if you have a dummy task called “start” created with the dummy operator that must be performed before the random_fun task, what would be your approach for a less caotic task definition.
start = DummyOperator(task_id="start")
start >> random_fun()
@nelsoncardenas, that’s exactly how I’d do it! There’s no other way to create a dummy task unless I misunderstood your question.
Let’s say that you’d like to add start
and end
dummy tasks and have dependencies set as follows:
start >> branching(random_fun()) >> [true_branch(), false_branch()] >> end
.
In this case, for the last task you’d need to change the trigger rule which by default is set to all_success
, please see this guide.
end = EmptyOperator(task_id="end", trigger_rule="all_done")
On a side note - DummyOperator
has been renamed to EmptyOperator
.
1 Like
@magdagultekin Thanks for the advice, and especially for the EmptyOperator, I forgot it.
But I created the dag and it doesn’t generate the right graph in the interface.
I let you the dag file:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
@task
def random_fun():
import random
return random.randrange(-10, 10) > 0
@task.branch()
def branching(x):
if x is True:
return "true_branch"
else:
return "false_branch"
@task()
def true_branch():
print("True")
@task()
def false_branch():
print("False")
@dag(schedule_interval="@daily", start_date=datetime(2022, 1, 1), catchup=False)
def example_forum_branching():
start = EmptyOperator(task_id="start", trigger_rule="all_done")
end = EmptyOperator(task_id="end", trigger_rule="all_done")
start >> branching(random_fun()) >> [true_branch(), false_branch()] >> end
dag_example_forum_branching = example_forum_branching()
@nelsoncardenas, oh I see - your question is more around dependencies between decorated and traditional operators that consume XComs (see this issue).
In this case, I’d suggest the following:
- use
xcom_pull
in branching
task:
@task.branch()
def branching(**kwargs):
x = kwargs["ti"].xcom_pull(task_ids="random_fun")
if x is True:
return "true_branch"
else:
return "false_branch"
- change the dependencies:
start >> random_fun() >> branching() >> [true_branch(), false_branch()] >> end
This should result in such graph:
On a separate note, if you want to create exactly the same EmptyOperators
(with different task_ids
), you can use for
loop:
start, end = [EmptyOperator(task_id=tid, trigger_rule="all_done") for tid in ["start", "end"]]
2 Likes
@magdagultekin thanks for your answer. Actually, this is the approach I used
1 Like
Supposed that the task_id true_branch must receive data when return it, how we can mention that please ? return "true_branch"(data)
?