I just had a quick question on the behavior of Task Mapping and how it handles xcoms. I know it consolidates all the return value xcoms into a single array to be accessed later, but i was wondering if it does that with any xcom that the tasks create? Example, i want each task to set an xcom with a single value representing the id of the item it was working on, and then be able to get that list of ids in a later task.
Hello @Tgoad, thanks for reaching out!
You can push an XCom with a specific key
and in the next step consolidate all XComs - they will have the same dag_id
, logical_date
and task_id
but a different key
. Please see an example below:
ROUNDS = ["one", "two", "three"]
@task()
def which_round(x, **kwargs):
ti = kwargs["ti"]
round_number = f"Round number: {x}"
ti.xcom_push(key=x, value=round_number)
@task
def consolidate_rounds(**kwargs):
ti = kwargs["ti"]
result = []
for item in ROUNDS:
result.append(ti.xcom_pull(key=item, task_ids="which_round")[0])
print(result)
which_round.expand(x=ROUNDS) >> consolidate_rounds()
In the logs of consolidate_rounds
you’ll see:
{logging_mixin.py:115} INFO - ['Round number: one', 'Round number: two', 'Round number: three']
.
Note that as a value passed from the mapped task is a lazy proxy - if you run print(ti.xcom_pull(key="one", task_ids="which_round"))
directly, you would get something like this:
_LazyXComAccess(dag_id='my_dag', run_id='manual__2022-08-11T10:28:30.969038+00:00', task_id='which_round')
You can use normal sequence syntax on this object (e.g. ti.xcom_pull(key="one", task_ids="which_round")[0]
), or iterate through it with a for
loop.