Hi there!
I’m having a rather hard time figuring out some issue from Airflow for my regular job.
Here’s the thing:
- I’ve got a main DAG with 3 tasks: Setup_1 → SubDAG_Caller_1 → Read_XCOM_1
- I’ve got a SubDAG with 2 tasks: SubDAG_Write_XCOM_1 → SubDAG_Read_XCOM_1
The point is to call the SubDAG first, write a simple value to XCOM, check that the value can be read in the same DAG, then check that the same value can also be read when getting back to the main DAG.
Test results:
- first launch:
[Sub-DAG] 2022-07-26 18:06:14.839546 - Writing 15 to XCOM
[Main DAG] 2022-07-26 18:06:16.415813 - Read None from XCOM
[Sub-DAG] 2022-07-26 18:06:18.405713 - Read 15 from XCOM
- second launch (no log removal, the 15 value was still present in database):
[Sub-DAG] 2022-07-26 18:11:34.640587 - Writing 15 to XCOM
[Main DAG] 2022-07-26 18:11:46.811229 - Read 15 from XCOM
[Sub-DAG] 2022-07-26 18:11:56.130983 - Read 15 from XCOM
Note: before getting this result with airflow 2.3.3, I tested with version 2.0 and 2.2.3 and between launches 1 and 2, I tried to remove the airflow logs and relaunch it, and I got a different result than launch 1 (the two first lines were inverted).
So can someone explain to me what I’m doing wrong or maybe, what I badly understood from Airflow ?
Here’s the source:
import logging
from datetime import datetime
from airflow.utils.dates import days_ago
from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
from airflow import DAG
AIRFLOW_LOGGER = logging.getLogger("airflow_task")
LOGGER = logging.getLogger('my_logger')
LOGGER.setLevel(logging.DEBUG)
FILE_HANDLER = logging.FileHandler('loop.log')
FILE_HANDLER.setLevel(logging.DEBUG)
LOGGER.addHandler(FILE_HANDLER)
DEFAULT_ARGS = {"owner": "airflow", "start_date": days_ago(1)}
# ******************************** DAGS ****************************************
dag = DAG(
"DAG_Calling_SubDAG_XCOM",
schedule_interval=None,
default_args=DEFAULT_ARGS,
catchup=False
)
sub_dag = DAG(
"SubDAG_XCOM",
schedule_interval=None,
default_args=DEFAULT_ARGS,
catchup=False
)
# ******************************** TASKS ***************************************
def task_setup(**kwargs):
LOGGER.info("*************************************************************")
def task_subdag_write_xcom(**kwargs):
num=15
LOGGER.info(f"[Sub-DAG] {datetime.now()} - Writing {num} to XCOM")
AIRFLOW_LOGGER.info(f"[Sub-DAG] Writing {num} to XCOM")
task_instance = kwargs["ti"]
task_instance.xcom_push(key="MyValue",value=num)
def task_subdag_read_xcom(**kwargs):
task_instance = kwargs["ti"]
num = task_instance.xcom_pull(
key="MyValue",
task_ids="SubDAG_Write_XCOM_1",
dag_id="SubDAG_XCOM",
include_prior_dates=True
)
LOGGER.info(f"[Sub-DAG] {datetime.now()} - Read {num} from XCOM")
AIRFLOW_LOGGER.info(f"[Sub-DAG] Read {num} from XCOM")
def task_read_xcom(**kwargs):
task_instance = kwargs["ti"]
num = task_instance.xcom_pull(
key="MyValue",
task_ids="SubDAG_Write_XCOM_1",
dag_id="SubDAG_XCOM",
include_prior_dates=True
)
LOGGER.info(f"[Main DAG] {datetime.now()} - Read {num} from XCOM")
AIRFLOW_LOGGER.info(f"[Main DAG] Read {num} from XCOM")
# ****************************** OPERATORS *************************************
setup_op = PythonOperator(
task_id="Setup_1",
python_callable=task_setup,
dag=dag
)
attributes = {
}
call_sub_dag_op = TriggerDagRunOperator(
task_id="SubDAG_Caller_1",
trigger_dag_id="SubDAG_XCOM",
conf=attributes,
dag=dag
)
read_xcom_op = PythonOperator(
task_id="Read_XCOM_1",
python_callable=task_read_xcom,
dag=dag
)
setup_op >> call_sub_dag_op >> read_xcom_op
subdag_write_xcom_op = PythonOperator(
task_id="SubDAG_Write_XCOM_1",
python_callable=task_subdag_write_xcom,
dag=sub_dag
)
subdag_read_xcom_op = PythonOperator(
task_id="SubDAG_Read_XCOM_1",
python_callable=task_subdag_read_xcom,
dag=sub_dag
)
subdag_write_xcom_op >> subdag_read_xcom_op