Pull xcom inside a task group class instance

I’ve created a task group class that contains a common pattern of tasks I need to instantiate several times in my DAG. I have a task that submits a job request to an API and returns the API response, and then I need to extract the job id from that response and pass this to a sensor which will poll a separate API until the job has completed. However, I can’t get the XCOM pull syntax right.

I believe this is because I need to prefix the task_id I’m pulling from with the group_id but this group_id will be different in each instance of the class I create. Can anyone help me fix the syntax for my xcom pull so it is passing in the group_id of the current task_instance?

class MyTaskGroup(TaskGroup):
    def __init__(self, database_connection):
        super().__init__(group_id=f'register_schemas{database_connection['name']}', tooltip=f'Registering Schemas')

        @task(task_group=self)
        def refresh_schema_connections(database_connection):
            client = get_api_client()
            return client.submit_job(database_connection)
        
        schema_job = refresh_schema_connections(database_connection['id'])
        
        wait_for_schema_job_completion = MyJobSensor(
            task_id=f"wait_for_schema_job_completion", 
            task_group=self,
            mode="reschedule",
            job_id="{{ task_instance.xcom_pull(task_instance.task_group_id + '.refresh_schema_connections')['output']['id'] }}"
        )
        schema_job >> wait_for_schema_job_completion

Hey @andos

The correct syntax to pull the xcom should task_group_id.task_id as explained in What is the task_id in Task Groups?.

Hope this helps.

Thanks
Manmeet