Dynamic taskGroup mapping

Hi,
I’m facing an issue with a dag based on a dynamic taskgroup mapping.
My taskgroup contains 4 tasks :

  • 3 tasks can run independently, each one must returns a name of a file ;
  • the last task is dependent on the 3 previous tasks, and has 4 arguments : each filename from each task, and a dictionary that describes a computing order.
    In the taskGroup, the code is basically this one :
    @task_group
    def taskgroup(compute_order: dict, experience: dict):
    @task
    def task1(compute_order: dict, experience: dict):
    return filename1
    @task
    def task2(compute_order: dict, experience: dict):
    return filename2
    @task
    def task3(compute_order: dict, experience: dict):
    return filename3
    @task
    def compute_task(compute_order: dict, filename1: str, filename2:str, filename3: str):

computes and writes data in filename3 using filename1 and filename3

fname1 = task1(…)
fname2 = task2(…)
fname3 = task3(…)
taskcomputing = compute_task(my_computing_order, fname1, fname2, fname3)

[fname1, fname2, fname3] >> taskcomputing

taskgroup.partial(experience=“XXX”).expand(get_orders())

get_orders is a task that returns a list of 8000 dictionnaries.
When I look at the UI, I have the 3 tasks task1, task2 and task3 running in parallel, i.e. 8000 x 3 tasks instances.
My problem is that the scheduler seems to look permanently to taskcomputing that depends on the success of the task1 task2 and task3. So it very very slow because it loops on 8000 tasks instances waiting for running.

[2024-10-29T13:48:47.781+0000] {taskinstance.py:2066} DEBUG - Dependencies not met for <TaskInstance: reference_level_1.1.taskgroup_ref.reference_compute_indicator manual__2024-10-29T13:42:55.443887+00:00 map_index=1632 [None]>, dependency ‘Trigger Rule’ FAILED: Task’s trigger rule ‘all_success’ requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={‘taskgroup_ref.create_time_series_filename_11’, ‘get_list_orders’, ‘taskgroup_ref.create_reference_filename_11’}
[2024-10-29T13:48:47.781+0000] {taskinstance.py:2088} DEBUG - <TaskInstance: reference_level_1.1.taskgroup_ref.reference_compute_indicator manual__2024-10-29T13:42:55.443887+00:00 map_index=1632 [None]> dependency ‘Previous Dagrun State’ PASSED: True, The task did not have depends_on_past set.
[2024-10-29T13:48:47.782+0000] {taskinstance.py:2088} DEBUG - <TaskInstance: reference_level_1.1.taskgroup_ref.reference_compute_indicator manual__2024-10-29T13:42:55.443887+00:00 map_index=1633 [None]> dependency ‘Not In Retry Period’ PASSED: True, The task instance was not marked for retrying.
[2024-10-29T13:48:47.786+0000] {taskinstance.py:2088} DEBUG - <TaskInstance: reference_level_1.1.taskgroup_ref.reference_compute_indicator manual__2024-10-29T13:42:55.443887+00:00 map_index=1633 [None]> dependency ‘Trigger Rule’ PASSED: False, Task’s trigger rule ‘all_success’ requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={‘taskgroup_ref.create_time_series_filename_11’, ‘get_list_orders’, ‘taskgroup_ref.create_reference_filename_11’}

Sorry, the name of tasks is not exactly the same but it’s always a log like that.

I don’t think I write correctly the tasks dependencies in taskgroup but here I’m lost.
Is it a problem of airflow configuration or a problem of dependency between the last task and the 3 first ones ?

Thanks