Hi all,
I want to know how can i get a xcom value defined in a other dag called with TriggerDagRunOperator.
My idea is to know how can we can communicate between a dag an a other called with TriggerDagRunOperator.
Thanks in advance for your help.
Hi all,
I want to know how can i get a xcom value defined in a other dag called with TriggerDagRunOperator.
My idea is to know how can we can communicate between a dag an a other called with TriggerDagRunOperator.
Thanks in advance for your help.
Hi @emeid!
Just to confirm, are you asking if you can use an XCom
from DAG1 in a TriggerDagRunOperator
within DAG2 that triggers DAG3?
Hi,
Let me clarify the topic.
I have Dag1 And dag2.
In dag1 i Call dag2 With TriggerDagRunOperator. In the dag2 i used xcom push there.
My question is. Is it possible to use xcom pull in dag1 after calling Dag2 to get xcom value created in dag2?
Thanks in advance for your help.
Understood. Yes it is possible to pull XComs
from a separate DAG but I’d be cautious and aware of execution_date
(more on this below) as well as cross-DAG dependencies between the two DAGs.
The xcom_pull()
function does have a dag_id
input parameter so as I mentioned you can pull XComs
from other DAGs.
def xcom_pull(
self,
task_ids: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[str] = None,
key: str = XCOM_RETURN_KEY,
include_prior_dates: bool = False,
session: Session = None,
)
...
Be mindful that by default, xcom_pull()
will retrieve the XCom
for the input task_id(s)
and key(s)
for the execution_date
of the TaskInstance
. Meaning the default XCom
pulled is that of the same execution_date
of the task pulling the XCom
. You do have the ability to retrieve all XComs
for prior execution dates as well using the include_prior_dates
parameter if needed.
A side note, the xcom_push()
function has an execution_date
input parameter so you can specify the execution_date
that the pushed XCom
will be tied to. The default value is the execution_date
of the task pushing the XCom
.
def xcom_push(
self,
key: str,
value: Any,
execution_date: Optional[datetime] = None,
session: Session = None,
)
...
Relating these, be aware the execution_date
between the pushed and pulled XComs
do need to match for the correct value to be pulled by DAG2. This might not be an issue depending on your DAG processing and timing but just something to be aware of.
I hope this helps!
Hi Josh,
Thank you for your response. Now I have a clear idea about that.
I think in my use case it would not be prudent to manage the execution date.
My main Idea is to know what happens in dag2 and decide in dag 1 if the pipeline will stop or not.
I changed my approach and decided to create a fail task inside dag2 thereby in dag1 the triggerDagRunOperator task will fail and stop my pipeline. Unfortunately, the dag2 fails but the triggerDagRunOperator task in Dag1 does not fail. I don’t understand why?
Please find in attach my code
Thanks in advance for your help.
(Attachment test_dag2.py is missing)
(Attachment main_test_dag1.py is missing)
this is the code:
dag2:
import random
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow import AirflowException
def task_to_fail():
raise AirflowException(“Error msj”)
def _pull_xcom_test(**kwargs):
print(kwargs[‘ti’])
xcom_val= kwargs[‘ti’].xcom_pull(key=‘xcom_text2’)
#xcom_val= kwargs[‘ti’].xcom_pull(task_ids=‘run_mydag1’,key=‘xcom_text1’)
if int(xcom_val) < 5:
return “run_xcom_less_than_5”
else:
return “run_xcom_great_than_5”
def _push_xcom_test(**kwargs):
print(kwargs[‘ti’])
kwargs[‘ti’].xcom_push(key=‘xcom_text2’,value=2)
args = {
‘owner’: ‘airflow’,
}
with DAG(
dag_id=‘dag2’,
default_args=args,
start_date=days_ago(1),
schedule_interval="@daily",
tags=[‘example’, ‘example2’],
) as dag:
run_start = DummyOperator(
task_id=‘start’,
)
inf_5 = DummyOperator(
task_id=‘run_xcom_less_than_5’,
)
sup_5 = DummyOperator(
task_id=‘run_xcom_great_than_5’,
)
push_xcom = PythonOperator(
task_id=‘push_xcom’,
python_callable=_push_xcom_test,provide_context=True,
)
branching = BranchPythonOperator(
task_id=‘branching2’,
python_callable=_pull_xcom_test,provide_context=True,
)
run_start >> push_xcom >> branching >> [inf_5, sup_5]
‘’’
join = DummyOperator(
task_id=‘join’,
trigger_rule=‘all_success’,
)
‘’’
stop_ppline = PythonOperator(
task_id=‘stop_ppline’,
python_callable=task_to_fail,provide_context=True,
)
inf_5 >> stop_ppline
Dag1:
import random
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
from airflow.operators.dagrun_operator import TriggerDagRunOperator
def _pull_xcom_test(**kwargs):
print(kwargs[‘ti’])
xcom_val= kwargs[‘ti’].xcom_pull(key=‘xcom_text2’)
#xcom_val= kwargs[‘ti’].xcom_pull(task_ids=‘run_mydag1’,key=‘xcom_text1’)
if int(xcom_val) < 5:
return “run_xcom_less_than_5”
else:
return “run_xcom_great_than_5”
def _push_xcom_test(**kwargs):
print(kwargs[‘ti’])
kwargs[‘ti’].xcom_push(key=‘xcom_text2’,value=20)
args = {
‘owner’: ‘airflow’,
}
with DAG(
dag_id=‘dag1’,
default_args=args,
start_date=days_ago(1),
schedule_interval="@daily",
tags=[‘example’, ‘example2’],
) as dag:
run_start = DummyOperator(
task_id=‘start’,
)
run_mydag1 = TriggerDagRunOperator(
task_id=“run_mydag2”,
trigger_dag_id=‘dag2’,
allowed_states=[‘all_success’]
)
#run_start >> run_mydag1 >> branching >> [inf_5, sup_5]
join = DummyOperator(
task_id=‘join’,
trigger_rule=‘all_success’,
)
run_start >> run_mydag1 >> join