Task Flow API and branching

I am relatively new to Air Flow. I started using the Task Flow API recently. How do you do conditionals or branching with the Task Flow API?

I have something like this setup. Each of the functions called are separate tasks. If the transform and load is successful then I want to archive the data item. If an error or empty result is returned then send a notification.

if load(transform(item)):
   archive()
else:
    notify()

Hello @hawk1278, thanks for reaching out!

I would suggest setting up notifications in case of failures using callbacks (on_failure_callback) or email notifications, please see this guide.

Below you can see how to use branching with TaskFlow API. Task random_fun randomly returns True or False and based on the returned value, task branching decides whether to follow true_branch or false_branch.

    @task
    def random_fun():
        import random

        return random.randrange(-10, 10) > 0

    @task.branch()
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task()
    def true_branch():
        print("True")

    @task()
    def false_branch():
        print("False")

    branching(random_fun()) >> [true_branch(), false_branch()]
1 Like

@magdagultekin I like your solution, but if you have a dummy task called “start” created with the dummy operator that must be performed before the random_fun task, what would be your approach for a less caotic task definition.

start = DummyOperator(task_id="start")

start >> random_fun()

@nelsoncardenas, that’s exactly how I’d do it! There’s no other way to create a dummy task unless I misunderstood your question.

Let’s say that you’d like to add start and end dummy tasks and have dependencies set as follows:
start >> branching(random_fun()) >> [true_branch(), false_branch()] >> end.
In this case, for the last task you’d need to change the trigger rule which by default is set to all_success, please see this guide.

end = EmptyOperator(task_id="end", trigger_rule="all_done")

On a side note - DummyOperator has been renamed to EmptyOperator.

1 Like

@magdagultekin Thanks for the advice, and especially for the EmptyOperator, I forgot it.

But I created the dag and it doesn’t generate the right graph in the interface.

image

I let you the dag file:

from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator


@task
def random_fun():
    import random

    return random.randrange(-10, 10) > 0


@task.branch()
def branching(x):
    if x is True:
        return "true_branch"
    else:
        return "false_branch"


@task()
def true_branch():
    print("True")


@task()
def false_branch():
    print("False")


@dag(schedule_interval="@daily", start_date=datetime(2022, 1, 1), catchup=False)
def example_forum_branching():

    start = EmptyOperator(task_id="start", trigger_rule="all_done")
    end = EmptyOperator(task_id="end", trigger_rule="all_done")

    start >> branching(random_fun()) >> [true_branch(), false_branch()] >> end


dag_example_forum_branching = example_forum_branching()

@nelsoncardenas, oh I see - your question is more around dependencies between decorated and traditional operators that consume XComs (see this issue).

In this case, I’d suggest the following:

  1. use xcom_pull in branching task:
@task.branch()
def branching(**kwargs):
    x = kwargs["ti"].xcom_pull(task_ids="random_fun")
    if x is True:
        return "true_branch"
    else:
        return "false_branch"
  1. change the dependencies:
start >> random_fun() >> branching() >> [true_branch(), false_branch()] >> end

This should result in such graph:
Screenshot 2022-08-18 at 10.05.06

On a separate note, if you want to create exactly the same EmptyOperators (with different task_ids), you can use for loop:

start, end = [EmptyOperator(task_id=tid, trigger_rule="all_done") for tid in ["start", "end"]]
2 Likes

@magdagultekin thanks for your answer. Actually, this is the approach I used :sweat_smile:

1 Like