Get xcom value define in a TriggerDagRunOperator

Hi all,

I want to know how can i get a xcom value defined in a other dag called with TriggerDagRunOperator.

My idea is to know how can we can communicate between a dag an a other called with TriggerDagRunOperator.

Thanks in advance for your help.

Hi @emeid!

Just to confirm, are you asking if you can use an XCom from DAG1 in a TriggerDagRunOperator within DAG2 that triggers DAG3?

Hi,
Let me clarify the topic.
I have Dag1 And dag2.
In dag1 i Call dag2 With TriggerDagRunOperator. In the dag2 i used xcom push there.
My question is. Is it possible to use xcom pull in dag1 after calling Dag2 to get xcom value created in dag2?

Thanks in advance for your help.

:+1: Understood. Yes it is possible to pull XComs from a separate DAG but I’d be cautious and aware of execution_date (more on this below) as well as cross-DAG dependencies between the two DAGs.

The xcom_pull() function does have a dag_id input parameter so as I mentioned you can pull XComs from other DAGs.

def xcom_pull(
        self,
        task_ids: Optional[Union[str, Iterable[str]]] = None,
        dag_id: Optional[str] = None,
        key: str = XCOM_RETURN_KEY,
        include_prior_dates: bool = False,
        session: Session = None,
    )
...

Be mindful that by default, xcom_pull() will retrieve the XCom for the input task_id(s) and key(s) for the execution_date of the TaskInstance. Meaning the default XCom pulled is that of the same execution_date of the task pulling the XCom. You do have the ability to retrieve all XComs for prior execution dates as well using the include_prior_dates parameter if needed.

A side note, the xcom_push() function has an execution_date input parameter so you can specify the execution_date that the pushed XCom will be tied to. The default value is the execution_date of the task pushing the XCom.

def xcom_push(
        self,
        key: str,
        value: Any,
        execution_date: Optional[datetime] = None,
        session: Session = None,
    )
...

Relating these, be aware the execution_date between the pushed and pulled XComs do need to match for the correct value to be pulled by DAG2. This might not be an issue depending on your DAG processing and timing but just something to be aware of.

I hope this helps!

Hi Josh,

Thank you for your response. Now I have a clear idea about that.
I think in my use case it would not be prudent to manage the execution date.

My main Idea is to know what happens in dag2 and decide in dag 1 if the pipeline will stop or not.
I changed my approach and decided to create a fail task inside dag2 thereby in dag1 the triggerDagRunOperator task will fail and stop my pipeline. Unfortunately, the dag2 fails but the triggerDagRunOperator task in Dag1 does not fail. I don’t understand why?
Please find in attach my code

Thanks in advance for your help.

(Attachment test_dag2.py is missing)

(Attachment main_test_dag1.py is missing)

this is the code:
dag2:

import random

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
from airflow.operators.dagrun_operator import TriggerDagRunOperator

from airflow import AirflowException

def task_to_fail():
raise AirflowException(“Error msj”)

def _pull_xcom_test(**kwargs):
print(kwargs[‘ti’])
xcom_val= kwargs[‘ti’].xcom_pull(key=‘xcom_text2’)
#xcom_val= kwargs[‘ti’].xcom_pull(task_ids=‘run_mydag1’,key=‘xcom_text1’)

if int(xcom_val) < 5:
return “run_xcom_less_than_5”
else:
return “run_xcom_great_than_5”

end if

def _push_xcom_test(**kwargs):
print(kwargs[‘ti’])
kwargs[‘ti’].xcom_push(key=‘xcom_text2’,value=2)

args = {
‘owner’: ‘airflow’,
}

with DAG(
dag_id=‘dag2’,
default_args=args,
start_date=days_ago(1),
schedule_interval="@daily",
tags=[‘example’, ‘example2’],
) as dag:

run_start = DummyOperator(
task_id=‘start’,
)

inf_5 = DummyOperator(
task_id=‘run_xcom_less_than_5’,
)
sup_5 = DummyOperator(
task_id=‘run_xcom_great_than_5’,
)

push_xcom = PythonOperator(
task_id=‘push_xcom’,
python_callable=_push_xcom_test,provide_context=True,
)

branching = BranchPythonOperator(
task_id=‘branching2’,
python_callable=_pull_xcom_test,provide_context=True,
)

run_start >> push_xcom >> branching >> [inf_5, sup_5]
‘’’
join = DummyOperator(
task_id=‘join’,
trigger_rule=‘all_success’,
)
‘’’

stop_ppline = PythonOperator(
task_id=‘stop_ppline’,
python_callable=task_to_fail,provide_context=True,
)

inf_5 >> stop_ppline

Dag1:

import random

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
from airflow.operators.dagrun_operator import TriggerDagRunOperator

def _pull_xcom_test(**kwargs):
print(kwargs[‘ti’])
xcom_val= kwargs[‘ti’].xcom_pull(key=‘xcom_text2’)
#xcom_val= kwargs[‘ti’].xcom_pull(task_ids=‘run_mydag1’,key=‘xcom_text1’)

if int(xcom_val) < 5:
return “run_xcom_less_than_5”
else:
return “run_xcom_great_than_5”

end if

def _push_xcom_test(**kwargs):
print(kwargs[‘ti’])
kwargs[‘ti’].xcom_push(key=‘xcom_text2’,value=20)

args = {
‘owner’: ‘airflow’,
}

with DAG(
dag_id=‘dag1’,
default_args=args,
start_date=days_ago(1),
schedule_interval="@daily",
tags=[‘example’, ‘example2’],
) as dag:

run_start = DummyOperator(
task_id=‘start’,
)

run_mydag1 = TriggerDagRunOperator(
task_id=“run_mydag2”,

Ensure this equals the dag_id of the DAG to trigger

trigger_dag_id=‘dag2’,
allowed_states=[‘all_success’]
)

#run_start >> run_mydag1 >> branching >> [inf_5, sup_5]

join = DummyOperator(
task_id=‘join’,
trigger_rule=‘all_success’,
)
run_start >> run_mydag1 >> join