XCOM & Sub-DAG (TriggerDAGRunOperator) problem

Hi there!
I’m having a rather hard time figuring out some issue from Airflow for my regular job.
Here’s the thing:

  • I’ve got a main DAG with 3 tasks: Setup_1 → SubDAG_Caller_1 → Read_XCOM_1
  • I’ve got a SubDAG with 2 tasks: SubDAG_Write_XCOM_1 → SubDAG_Read_XCOM_1

The point is to call the SubDAG first, write a simple value to XCOM, check that the value can be read in the same DAG, then check that the same value can also be read when getting back to the main DAG.

Test results:

  • first launch:
[Sub-DAG] 2022-07-26 18:06:14.839546 - Writing 15 to XCOM
[Main DAG] 2022-07-26 18:06:16.415813 - Read None from XCOM
[Sub-DAG] 2022-07-26 18:06:18.405713 - Read 15 from XCOM
  • second launch (no log removal, the 15 value was still present in database):
[Sub-DAG] 2022-07-26 18:11:34.640587 - Writing 15 to XCOM
[Main DAG] 2022-07-26 18:11:46.811229 - Read 15 from XCOM
[Sub-DAG] 2022-07-26 18:11:56.130983 - Read 15 from XCOM

Note: before getting this result with airflow 2.3.3, I tested with version 2.0 and 2.2.3 and between launches 1 and 2, I tried to remove the airflow logs and relaunch it, and I got a different result than launch 1 (the two first lines were inverted).

So can someone explain to me what I’m doing wrong or maybe, what I badly understood from Airflow ?

Here’s the source:

import logging

from datetime import datetime
from airflow.utils.dates import days_ago
from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
from airflow import DAG

AIRFLOW_LOGGER = logging.getLogger("airflow_task")

LOGGER = logging.getLogger('my_logger')
LOGGER.setLevel(logging.DEBUG)
FILE_HANDLER = logging.FileHandler('loop.log')
FILE_HANDLER.setLevel(logging.DEBUG)
LOGGER.addHandler(FILE_HANDLER)

DEFAULT_ARGS = {"owner": "airflow", "start_date": days_ago(1)}

# ******************************** DAGS ****************************************

dag = DAG(
    "DAG_Calling_SubDAG_XCOM",
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
    catchup=False
)

sub_dag = DAG(
    "SubDAG_XCOM",
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
    catchup=False
)

# ******************************** TASKS ***************************************

def task_setup(**kwargs):
    LOGGER.info("*************************************************************")

def task_subdag_write_xcom(**kwargs):
    num=15
    LOGGER.info(f"[Sub-DAG] {datetime.now()} - Writing {num} to XCOM")
    AIRFLOW_LOGGER.info(f"[Sub-DAG] Writing {num} to XCOM")
    task_instance = kwargs["ti"]
    task_instance.xcom_push(key="MyValue",value=num)

def task_subdag_read_xcom(**kwargs):
    task_instance = kwargs["ti"]
    num = task_instance.xcom_pull(
        key="MyValue",
        task_ids="SubDAG_Write_XCOM_1",
        dag_id="SubDAG_XCOM",
        include_prior_dates=True
    )
    LOGGER.info(f"[Sub-DAG] {datetime.now()} - Read {num} from XCOM")
    AIRFLOW_LOGGER.info(f"[Sub-DAG] Read {num} from XCOM")

def task_read_xcom(**kwargs):
    task_instance = kwargs["ti"]
    num = task_instance.xcom_pull(
        key="MyValue",
        task_ids="SubDAG_Write_XCOM_1",
        dag_id="SubDAG_XCOM",
        include_prior_dates=True
    )
    LOGGER.info(f"[Main DAG] {datetime.now()} - Read {num} from XCOM")
    AIRFLOW_LOGGER.info(f"[Main DAG] Read {num} from XCOM")

# ****************************** OPERATORS *************************************

setup_op = PythonOperator(
    task_id="Setup_1",
    python_callable=task_setup,
    dag=dag
)

attributes = {
}
call_sub_dag_op = TriggerDagRunOperator(
    task_id="SubDAG_Caller_1",
    trigger_dag_id="SubDAG_XCOM",
    conf=attributes,
    dag=dag
)

read_xcom_op = PythonOperator(
    task_id="Read_XCOM_1",
    python_callable=task_read_xcom,
    dag=dag
)

setup_op >> call_sub_dag_op >> read_xcom_op

subdag_write_xcom_op = PythonOperator(
    task_id="SubDAG_Write_XCOM_1",
    python_callable=task_subdag_write_xcom,
    dag=sub_dag
)

subdag_read_xcom_op = PythonOperator(
    task_id="SubDAG_Read_XCOM_1",
    python_callable=task_subdag_read_xcom,
    dag=sub_dag
)

subdag_write_xcom_op >> subdag_read_xcom_op

Okay, from what I understood, once the TriggerDAGRun triggers the second DAG, although this one hasn’t yet completed, its state is directly changed to success, then the next task from the main DAG is executed.

This explains why we don’t have the desired result, and it seems that to avoid that, we must use the wait_for_completion parameter of TriggerDAGRunOperator.
Edit: new problem encountered here: when using this parameter, the TriggerDAGRun endlessly waits for the other DAG to end, but the problem is that the first task of the other DAG is set to the “queued” state, never being updated, so infinite loop here!

The problem comes from the fact that I’m using the default SequentialExecutor in airflow.cfg (with the default SQLite database): having changed it to LocalExecutor and also installed the Airflow database in MySQL, this infinite loop problem doesn’t appear!

Now the XCOM problem where the main task reading the None value remains…

Found it!
My XCOM problem doesn’t appear anew if I’m adding the following parameters to the TriggerDagRunOperator:

  • execution_date=“{{ds}}”,
  • reset_dag_run=True
    (although I guess that the second one is not that important in this case).

So in the end:

  1. Don’t use the default SequentialExecutor + SQLite for the database, but at least LocalExecutor and another database solution
  2. Check that your operator has the right parameters. In my case, the following works:
call_sub_dag_op = TriggerDagRunOperator(
    task_id="SubDAG_Caller_1",
    trigger_dag_id="SubDAG_XCOM",
    execution_date="{{ds}}",
    reset_dag_run=True,
    wait_for_completion=True,
    poke_interval=30,
    dag=dag
)