If you want to perform some actions in response to a DAG’s final state, failure or success, then these on_failure_callback or on_success_callback should accomplish its respective situations. The same can be applied for the task using on_failure_callback or on_success_callback.
Create a function that accepts one argument for the context to be passed into.
For DAG callbacks, since the code is ran in the scheduler, you will need to check scheduler logs or have the code interact with some other systems that you can monitor.
For task callbacks, the outputs are available in the logs of the task instance that ran.
from datetime import datetime
from airflow.exceptions import AirflowException
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def on_failure_callback(context):
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print(task_instances)
def failure_func():
raise AirflowException()
dag = DAG(
dag_id='dag_with_templated_dir',
start_date=datetime(2020, 1, 1),
on_failure_callback=on_failure_callback,
catchup=False,
max_active_runs=1,
default_args={
'on_failure_callback': on_failure_callback,
}
)
example_task = \
BashOperator(
task_id='my_task',
bash_command='exit 1',
dag=dag
)
example_python_task = \
PythonOperator(
task_id='my_python_task',
python_callable=failure_func,
on_failure_callback=on_failure_callback,
dag=dag
)
As seen in the example, there are three ways to set the respective callbacks.
- Through the default args, which are passed into ALL operator as keyword arguments
- Through the individual operators who are children of the BaseOperator
- (Only DAGs) Through the DAG initialization