External dag dependency set on downstream which run 6 times per day

Hi Team,

actually we have the requirement like below.

-Dag1 runs on 1st of every month
-Dag2 depends on Dag1 and runs 6 times (3.30am ,7.30am,11.30am,3.30pm,7.30pm,11.30pm) per day with self dependency.

  • Self dependency is working fine by using execution_delta= (minutes=240)

Can some please help me to get out of this issue, Below is my example dag requirement.

import os
import sys
from airflow.exceptions import AirflowException
from airflow.models import TaskInstance, DagBag, DagModel, DagRun, Variable
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: True,
‘start_date’: datetime(2020, 7, 26),
‘email_on_failure’: True,
‘email_on_retry’: False,

‘retries’: 0,

‘retry_delay’: timedelta(minutes=30),

'provide_context': True,
'run_as_user': 'dev'

}

def on_dag_failure(context):
dag_run = context.get(‘dag_run’)
task_instances = dag_run.get_task_instances()
print(task_instances)

##########################################

dag = DAG(‘dag_2’,

concurrency=1,

      catchup=False,
      schedule_interval='30 03,07,11,15,19,23 1-15 * *',
      description = 'monthly  1st-15th ',
      default_args=default_args,

on_failure_callback=on_dag_failure

)

task_1 = BashOperator(
task_id=‘task_1’,
bash_command='cp /home/test.txt /user/test.txt ',
on_failure_callback = on_dag_failure,
execution_timeout=None,
dag=dag
)

ext_dag_1= ExternalTaskSensor(
task_id=‘ext_dag_1’,
external_task_id=None,
external_dag_id=‘dag_1’,
allowed_states=[‘success’],
# execution_delta=timedelta(minutes=5),
# execution_delta=timedelta(days=1),
dag=dag
)

ext_dag_2 = ExternalTaskSensor(
task_id=‘ext_dag_2’,
external_task_id=None,
external_dag_id=‘dag_2’,
allowed_states=[‘success’],
execution_delta=timedelta(minutes=240),
# execution_delta=timedelta(days=1),
dag=dag
)
task1.set_upstream(ext_dag_1)
task_1.set_upstream(ext_dag_2)

Hi @kumar.8ds

You have two DAGs. Let’s call them by their dag_id, dag_1 and dag_2.

dag_1 runs on a schedule on the 1st of every month.
dag_2 you want to run once every 4 hours every day. This can be achieved with the following cron string.

30 3/4 * * *

What I don’t understand is the relationship between dag_1 and dag_2. Do you only want dag_2 to run after dag_1 runs? so only on the first and not the rest of the month?