On_failure_callback and on_success_callback not getting triggered as required at DAG level and TASK level

Below is my code

from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.decorators import task, dag
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import airflow
import sending_mail

For triggering the events based on job completion of TASK’s

def on_trigger(context):
task_id = context.get(“task_instance”).task_id
state=context.get(“task_instance”).state
SUBJECT=“”
MSG = “”"
STATE: {state}

DAG:{dag}

RUN ID:{run_id}

TASK:{task}

START TIME:{start_date}

END TIME:{comp_date}

LOG URL:{log_url}

“”".format(
state=context.get(“task_instance”).state,
task=context.get(“task_instance”).task_id,
dag=context.get(“task_instance”).dag_id,
run_id=context.get(“task_instance”).run_id,
start_date=context.get(“task_instance”).start_date,
comp_date=context.get(“task_instance”).end_date,
log_url=context.get(“task_instance”),
)
if state == “success”:
SUBJECT=“Airflow success alert for “+task_id+” TASK”
elif state == “failed”:
SUBJECT=“Airflow failed alert for “+task_id+” TASK”
sending_mail.sending(MESSAGE, SUBJECT)

on_success_dag definition for overall DAG, which would be called post all tasks are completed only when all are successful

def on_success_dag(context):
dag = context.get(“task_instance”).dag_id
SUBJECT = “Airflow success alert for DAG - " + dag
MESSAGE = “””
SUCCESSFULLY COMPLETED THE DAG: {dag}

DURATION: {time}

RUN ID: {run_id}

“”".format(
dag=context.get(“task_instance”).dag_id,
time=context.get(“task_instance”).duration,
run_id=context[“dag_run”].run_id,
)
sending_mail.sending(MESSAGE, SUBJECT)

on_failure_dag definition for overall DAG, which would be called post all tasks are completed only EVEN any 1 of tasks failure

def on_failure_dag(context):
dag = context.get(“task_instance”).dag_id
SUBJECT = "Airflow failure alert for DAG - " + dag

failed_task_ids = []
dag_run = context.get("dag_run")
for task in dag_run.get_task_instances(state="failed"):
    failed_task_ids.append(task.task_id)

MESSAGE = """
    <b>FAILURE OF THE DAG:</b> {dag} <BR>
    <b>DURATION:</b> {time} <BR>
    <b>FOLLOWING TASK/s FAILED IN THE DAG:</b> {failed_task_ids} <BR>
    <b>EXECUTION DATE:</b> {execution_date} <BR>
    <b>LOG URL:</b> {log_url} <BR>
""".format(
    dag=dag,
    time=context.get("task_instance").duration,
    failed_task_ids=failed_task_ids,
    execution_date=context["execution_date"],
    log_url=context.get("task_instance"),
)
sending_mail.sending(MESSAGE, SUBJECT)

Default DAG syntax and its arguments

with DAG(
dag_id=“dag_id_name”,
schedule_interval=“0 * * * *”,
max_active_runs=1,
default_args={
“start_date”: datetime(2023, 6, 24),
“retries”: 1,
“retry_delay”: timedelta(minutes=5),
“catchup”: False,
“on_success_callback”: on_trigger,
“on_failure_callback”: on_trigger,

},
on_success_callback=on_success_dag,
on_failure_callback=on_failure_dag,
render_template_as_native_obj=True,
tags=["production"],

) as dag:

Executes the container

task1 = SSHOperator(
task_id=“oprt1”,
ssh_conn_id=‘connection1’,
command='sh -x script.sh ',
dag=dag,
)

task1

My version is 2.5.3 of Airflow and its on AWS Ubuntu

The dag_level triggers are defined at dag level and task_level triggers are defined at task level. And the required functions are also there.

However sometimes the 2 mails are trigger for the success of the DAG instead of the task level. Because there should be different mail for the task level and dag level right? but its not happening. Especially when there is a failure

Please guide.

Hey @rozai.dsouza

Couple of pointers to help identify and fix the problem:

  • Can you remove the conditional checks in task level and verify that on_trigger is getting called?
  • You can also check the task logs if there is call made to the callback method
  • For DAG level callbacks, suggest to check the log for the DAG callbacks at $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log and see if there is indeed a double call made to the DAG callback?

Check out this blog for details on callbacks.

Thanks

Thank you the removing of conditional has helped. I tired giving at the task level to call the trigger, for now its working fine. Thank you for the valuable inputs, it was helpful.

Regards
Rozai Dsouza