I want to create a DAG that get a list of jobs from DB and for each job of the list I want to trigger a second dag (or execute the same group of tasks). Each second dag has to be executed in serial not in parallel, it means the when the first job finish the second one is executed.
Due to this, I cannot use the Dynamic Tasks mapping as they execute the tasks in parallel.
@task
def get_commands_groups_per_job(**kwargs):
@dag(
)
def trigger_job(**kwargs):
commands_groups = get_commands_groups_per_job()
cgs = XComArg("get_commands_groups_per_job")
tasks = []
for command_group in cgs:
cg = command_group['command_group_id']
job = command_group['job_id']
with TaskGroup(group_id=tg_id) as my_group:
update_commands_group_status_running = SimpleHttpOperator()
trigger = TriggerDagRunOperator(wait_for_completion=True)
groups.append(my_group)
task_flow = groups[0]
for i in range(1, len(groups)):
flow = flow >> groups[I]
When I’m trying to do this, I get this error: TypeError: ‘XComArg’ object is not iterable