Run next task group if the same task group from previous DAG Run finished

Hi everyone,

I have a DAG with a lots of task groups (50+). Every task group has the same set of tasks, but they work with different tables (independent from each other). I would like to have an opportunity to process each table as fast as possible but I cannot run the next processing of each table if the current one has not been finished yet.

I use wait_for_downstream parameter but I have one case when it’s not applicable to my DAG.
Let’s say I have the following structure:

@task_group(group_id="A")
def typical_tg_for_table_A():
    task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_8
    task_3 >> task_6 >> task_7 >> task_8

@task_group(group_id="B")
def typical_tg_for_table_B():
    task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_8
    task_3 >> task_6 >> task_7 >> task_8

Inputs:

  1. task_1 has parameter wait_for_downstream=True;
  2. task_1 could be skipped as a result of it’s own execution and this will lead to the skipping of all all other tasks and task group itself (by design and business needs);
  3. I have to backfill 30+ DAG Runs and I don’t want to slow down dag runs for task_group_A if task_group_B has not been finished yet.
  4. task_3 can not be running in parallel with task_3 from previous DAG Run
  5. Also task_3 can not be running until task_5 and task_7 are finished (This is a reason why I can not set wait_for_downstream=True for task_3 because downstream tasks are task_4 and task_6)

Problem:
Let’s say I have 3 DAG Runs named 1, 2, 3 and we will focus on the slowest task_group (B):

  1. task_2 in Dag Run 1 completed successfully;
  2. task_3 in Dag Run 1 is starting + task_1 in Dag Run 2 is starting too;
  3. task_1 in Dag Run 2 is skipping → all other tasks in Dag Run 2 are skipping too;
  4. Dag Run 3 is starting and I then I have the situation when task_3 from Dag Run 1 and Dag Run 3 run in parallel and this is not applicable.

How could I avoid this situation? It looks like I should run task_1 only if task_8 from previous DAG Run’s the same task group is over.

Sorry for the large amount of text and I would be very grateful for any advice!