Below is my code
from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.decorators import task, dag
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import airflow
import sending_mail
For triggering the events based on job completion of TASK’s
def on_trigger(context):
task_id = context.get(“task_instance”).task_id
state=context.get(“task_instance”).state
SUBJECT=“”
MSG = “”"
STATE: {state}
DAG:{dag}
RUN ID:{run_id}
TASK:{task}
START TIME:{start_date}
END TIME:{comp_date}
LOG URL:{log_url}
“”".format(
state=context.get(“task_instance”).state,
task=context.get(“task_instance”).task_id,
dag=context.get(“task_instance”).dag_id,
run_id=context.get(“task_instance”).run_id,
start_date=context.get(“task_instance”).start_date,
comp_date=context.get(“task_instance”).end_date,
log_url=context.get(“task_instance”),
)
if state == “success”:
SUBJECT=“Airflow success alert for “+task_id+” TASK”
elif state == “failed”:
SUBJECT=“Airflow failed alert for “+task_id+” TASK”
sending_mail.sending(MESSAGE, SUBJECT)
on_success_dag definition for overall DAG, which would be called post all tasks are completed only when all are successful
def on_success_dag(context):
dag = context.get(“task_instance”).dag_id
SUBJECT = “Airflow success alert for DAG - " + dag
MESSAGE = “””
SUCCESSFULLY COMPLETED THE DAG: {dag}
DURATION: {time}
RUN ID: {run_id}
“”".format(
dag=context.get(“task_instance”).dag_id,
time=context.get(“task_instance”).duration,
run_id=context[“dag_run”].run_id,
)
sending_mail.sending(MESSAGE, SUBJECT)
on_failure_dag definition for overall DAG, which would be called post all tasks are completed only EVEN any 1 of tasks failure
def on_failure_dag(context):
dag = context.get(“task_instance”).dag_id
SUBJECT = "Airflow failure alert for DAG - " + dag
failed_task_ids = []
dag_run = context.get("dag_run")
for task in dag_run.get_task_instances(state="failed"):
failed_task_ids.append(task.task_id)
MESSAGE = """
<b>FAILURE OF THE DAG:</b> {dag} <BR>
<b>DURATION:</b> {time} <BR>
<b>FOLLOWING TASK/s FAILED IN THE DAG:</b> {failed_task_ids} <BR>
<b>EXECUTION DATE:</b> {execution_date} <BR>
<b>LOG URL:</b> {log_url} <BR>
""".format(
dag=dag,
time=context.get("task_instance").duration,
failed_task_ids=failed_task_ids,
execution_date=context["execution_date"],
log_url=context.get("task_instance"),
)
sending_mail.sending(MESSAGE, SUBJECT)
Default DAG syntax and its arguments
with DAG(
dag_id=“dag_id_name”,
schedule_interval=“0 * * * *”,
max_active_runs=1,
default_args={
“start_date”: datetime(2023, 6, 24),
“retries”: 1,
“retry_delay”: timedelta(minutes=5),
“catchup”: False,
“on_success_callback”: on_trigger,
“on_failure_callback”: on_trigger,
},
on_success_callback=on_success_dag,
on_failure_callback=on_failure_dag,
render_template_as_native_obj=True,
tags=["production"],
) as dag:
Executes the container
task1 = SSHOperator(
task_id=“oprt1”,
ssh_conn_id=‘connection1’,
command='sh -x script.sh ',
dag=dag,
)
task1
My version is 2.5.3 of Airflow and its on AWS Ubuntu
The dag_level triggers are defined at dag level and task_level triggers are defined at task level. And the required functions are also there.
However sometimes the 2 mails are trigger for the success of the DAG instead of the task level. Because there should be different mail for the task level and dag level right? but its not happening. Especially when there is a failure
Please guide.