Airflow Dynamic Task group mapping with XCom and loop

Hi Friends, @ magdagultekin

I am trying to create Airflow dynamic tasks and task groups using XCom values returned by another dynamic task.
The sample code is given below. But I am getting Attribute Error.

Basically, there are 2 dictionaries [d1, d2] and there should be 2 task groups (1 for each).

Please provide suggestion on how to fix this.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.decorators import task
from airflow import XComArg
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.task_group import TaskGroup


@task(task_id="generate_data")
def create_data(val):
    d1 = {"name": "Santanu", "city": val}
    d2 = {"name": "Ghosh", "city": val}
    return [d1, d2]


@task(task_id="show_data")
def display_data(dat, **kwargs):
    ti = kwargs["ti"]
    ti.xcom_push(key=f"{ti.task_id}_{ti.map_index}", value=dat)


default_args = {
    'owner': 'Airflow',
    'start_date': days_ago(1),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1}

dag_name_id = 'af_233_demo_12'

with DAG(
        dag_id=dag_name_id,
        default_args=default_args,
        schedule_interval='@daily',
        catchup=False
) as dag:

    START = BashOperator(task_id="start", bash_command='echo "starting batch pipeline"', do_xcom_push=False)
    STOP = BashOperator(task_id="stop", bash_command='echo "stopping batch pipeline"', trigger_rule=TriggerRule.NONE_SKIPPED, do_xcom_push=False)

    with TaskGroup(group_id="file_process") as tg1:
        display_data.expand(dat=create_data(val="Kolkata"))

    dd = XComArg("show_data")

    groups = []
    for d in dd:
        with TaskGroup(group_id=f"data_process_{d['name']}") as tg2:
            @task
            def print_data(v):
                print(v)
            print_data(d)
        groups.append(tg2)

    START >> tg1 >> groups >> STOP

Thanking you
Santanu

Hi @sanutopia, thanks for reaching out!

The best solution would be to create TaskGroups with expand() - however, it’s a work in progress (see this PR).

You can subscribe to apache/airflow repository to receive notifications when new releases are published (see guides here).

1 Like

Hi @ magdagultekin

Thanks for the reply.

I would appreciate if you kindly share any alternate solution approach, which is currently available with Airflow 2+

Thanking you
Santanu

Hi @magdagultekin,

To be more precise, I am trying to run multiple DataFusion Start Pipeline tasks depending on the number of arguments (list of dict) returned by the previous Python Operator task, which in turn reads the parameter values from BigQuery table.

The flow should look something like below,

start → read_bq[3] → [df_1, df_df_2, df_3] → stop

Any help or suggestion is appreciated.

Regards,
Santanu

Hi @sanutopia,

if you create tasks dynamically with dynamic task mapping, they will run in parallel the way you described (start >> read_bq[3] >> [df_1, df_df_2, df_3] >> stop) even without the TaskGroup.

Please see an example below - would this work for you for the time being when you can’t create TaskGroups with expand()?

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

VALUE = "my_city"

with DAG(dag_id="dynamic_task_mapping",
         start_date=pendulum.datetime(2022, 10, 1, tz="UTC"),
         schedule_interval="@daily",
         catchup=False,
         ) as dag:

    start, end = [EmptyOperator(task_id=tid, trigger_rule="all_done") for tid in ["start", "end"]]

    @task
    def create_data(val):
        d1 = {"name": "Santanu", "city": val}
        d2 = {"name": "Ghosh", "city": val}
        return [d1, d2]

    @task
    def print_data(data):
        print(data)

    create_data = create_data(val=VALUE)

    start >> create_data >> print_data.expand(data=create_data) >> end

Hi @ magdagultekin,

Thanks for your reply. Yes, it is helpful.

Regards,
Santanu

1 Like