Hi Team,
actually we have the requirement like below.
-Dag1 runs on 1st of every month
-Dag2 depends on Dag1 and runs 6 times (3.30am ,7.30am,11.30am,3.30pm,7.30pm,11.30pm) per day with self dependency.
- Self dependency is working fine by using execution_delta= (minutes=240)
Can some please help me to get out of this issue, Below is my example dag requirement.
import os
import sys
from airflow.exceptions import AirflowException
from airflow.models import TaskInstance, DagBag, DagModel, DagRun, Variable
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: True,
‘start_date’: datetime(2020, 7, 26),
‘email_on_failure’: True,
‘email_on_retry’: False,
‘retries’: 0,
‘retry_delay’: timedelta(minutes=30),
'provide_context': True,
'run_as_user': 'dev'
}
def on_dag_failure(context):
dag_run = context.get(‘dag_run’)
task_instances = dag_run.get_task_instances()
print(task_instances)
##########################################
dag = DAG(‘dag_2’,
concurrency=1,
catchup=False,
schedule_interval='30 03,07,11,15,19,23 1-15 * *',
description = 'monthly 1st-15th ',
default_args=default_args,
on_failure_callback=on_dag_failure
)
task_1 = BashOperator(
task_id=‘task_1’,
bash_command='cp /home/test.txt /user/test.txt ',
on_failure_callback = on_dag_failure,
execution_timeout=None,
dag=dag
)
ext_dag_1= ExternalTaskSensor(
task_id=‘ext_dag_1’,
external_task_id=None,
external_dag_id=‘dag_1’,
allowed_states=[‘success’],
# execution_delta=timedelta(minutes=5),
# execution_delta=timedelta(days=1),
dag=dag
)
ext_dag_2 = ExternalTaskSensor(
task_id=‘ext_dag_2’,
external_task_id=None,
external_dag_id=‘dag_2’,
allowed_states=[‘success’],
execution_delta=timedelta(minutes=240),
# execution_delta=timedelta(days=1),
dag=dag
)
task1.set_upstream(ext_dag_1)
task_1.set_upstream(ext_dag_2)