Yes!
Although Airflow doesn’t currently have an email_on_success
task parameter like it does for email_on_failure
or email_on_retry
. We can make use of the on_success_callback
parameter and some of Airflow’s email utility functions to send some notifications.
Airflow SMTP Environment Variables
Ensure your SMTP environment variables have been configured per the instructions here: https://www.astronomer.io/docs/airflow-alerts/
Update DAG definition
This method for getting emails sent on task success depends on defining a function in your top-level DAG code and passing it to the on_success_callback
parameter in default_args
.
We’ll make use of the send_email_smtp
method in airflow.utils.email
to do the heavy lifting. I’ve abstracted it within a secondary function as the only object passed to the function called by on_success_callback
is the airflow context variable and I’d like to be able to pass it other parameters (like the destination email address here). Simply add this code to your top-level code as I’ve added it to example_dag.py
and you’ll be set. The only relevant value in the default_args
parameter below is on_success_callback
.
from airflow.utils.email import send_email_smtp
def task_success_callback(context):
outer_task_success_callback(context, email='destination@email.com')
def outer_task_success_callback(context, email):
subject = "[Airflow] DAG {0} - Task {1}: Success".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Succeeded on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(email, subject, html_content)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'on_success_callback': task_success_callback,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}