Hi Friends, @ magdagultekin
I am trying to create Airflow dynamic tasks and task groups using XCom values returned by another dynamic task.
The sample code is given below. But I am getting Attribute Error.
Basically, there are 2 dictionaries [d1, d2] and there should be 2 task groups (1 for each).
Please provide suggestion on how to fix this.
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.decorators import task
from airflow import XComArg
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.task_group import TaskGroup
@task(task_id="generate_data")
def create_data(val):
d1 = {"name": "Santanu", "city": val}
d2 = {"name": "Ghosh", "city": val}
return [d1, d2]
@task(task_id="show_data")
def display_data(dat, **kwargs):
ti = kwargs["ti"]
ti.xcom_push(key=f"{ti.task_id}_{ti.map_index}", value=dat)
default_args = {
'owner': 'Airflow',
'start_date': days_ago(1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1}
dag_name_id = 'af_233_demo_12'
with DAG(
dag_id=dag_name_id,
default_args=default_args,
schedule_interval='@daily',
catchup=False
) as dag:
START = BashOperator(task_id="start", bash_command='echo "starting batch pipeline"', do_xcom_push=False)
STOP = BashOperator(task_id="stop", bash_command='echo "stopping batch pipeline"', trigger_rule=TriggerRule.NONE_SKIPPED, do_xcom_push=False)
with TaskGroup(group_id="file_process") as tg1:
display_data.expand(dat=create_data(val="Kolkata"))
dd = XComArg("show_data")
groups = []
for d in dd:
with TaskGroup(group_id=f"data_process_{d['name']}") as tg2:
@task
def print_data(v):
print(v)
print_data(d)
groups.append(tg2)
START >> tg1 >> groups >> STOP
Thanking you
Santanu