Dynamic Task Mapping Xcom Handling

I just had a quick question on the behavior of Task Mapping and how it handles xcoms. I know it consolidates all the return value xcoms into a single array to be accessed later, but i was wondering if it does that with any xcom that the tasks create? Example, i want each task to set an xcom with a single value representing the id of the item it was working on, and then be able to get that list of ids in a later task.

Hello @Tgoad, thanks for reaching out!

You can push an XCom with a specific key and in the next step consolidate all XComs - they will have the same dag_id, logical_date and task_id but a different key. Please see an example below:

    ROUNDS = ["one", "two", "three"]

    @task()
    def which_round(x, **kwargs):
        ti = kwargs["ti"]
        round_number = f"Round number: {x}"
        ti.xcom_push(key=x, value=round_number)

    @task
    def consolidate_rounds(**kwargs):
        ti = kwargs["ti"]
        result = []
        for item in ROUNDS:
            result.append(ti.xcom_pull(key=item, task_ids="which_round")[0])
        print(result)

    which_round.expand(x=ROUNDS) >> consolidate_rounds()

In the logs of consolidate_rounds you’ll see:

{logging_mixin.py:115} INFO - ['Round number: one', 'Round number: two', 'Round number: three'].

Note that as a value passed from the mapped task is a lazy proxy - if you run print(ti.xcom_pull(key="one", task_ids="which_round")) directly, you would get something like this:

_LazyXComAccess(dag_id='my_dag', run_id='manual__2022-08-11T10:28:30.969038+00:00', task_id='which_round')

You can use normal sequence syntax on this object (e.g. ti.xcom_pull(key="one", task_ids="which_round")[0] ), or iterate through it with a for loop.