How can I set my on_failure and on_success callbacks to have the task context?

If you want to perform some actions in response to a DAG’s final state, failure or success, then these on_failure_callback or on_success_callback should accomplish its respective situations. The same can be applied for the task using on_failure_callback or on_success_callback.

Create a function that accepts one argument for the context to be passed into.

For DAG callbacks, since the code is ran in the scheduler, you will need to check scheduler logs or have the code interact with some other systems that you can monitor.

For task callbacks, the outputs are available in the logs of the task instance that ran.

from datetime import datetime

from airflow.exceptions import AirflowException
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


def on_failure_callback(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(task_instances)


def failure_func():
    raise AirflowException()


dag = DAG(
    dag_id='dag_with_templated_dir',
    start_date=datetime(2020, 1, 1),
    on_failure_callback=on_failure_callback,
    catchup=False,
    max_active_runs=1,
    default_args={
        'on_failure_callback': on_failure_callback,
    }
)

example_task = \
    BashOperator(
        task_id='my_task',
        bash_command='exit 1',
        dag=dag
    )

example_python_task = \
    PythonOperator(
        task_id='my_python_task',
        python_callable=failure_func,
        on_failure_callback=on_failure_callback,
        dag=dag
    )

As seen in the example, there are three ways to set the respective callbacks.

  1. Through the default args, which are passed into ALL operator as keyword arguments
  2. Through the individual operators who are children of the BaseOperator
  3. (Only DAGs) Through the DAG initialization

You should not pass provide_context=True to your PythonOperator. This will provide context to the actual callable being used by the PythonOp, and will error out since your callable my_task_py does not accept any arguments. You can try this locally with:

airflow test example_dag my_task 20121212

Meanwhile, your on_failure_callback func will have the context provided to it by the DAG.

I would also recommend against adding provide_context=True to default_args unless every operator requires it, which is rarely the case in polyglot DAGs.

3 Likes

Thank you @dsynkov! I have corrected the example and provided more details.

And yes I definitely do not recommend putting provide_context in default_args (Only for PythonOperator as far as I know) since these are passed to ALL operators, which could have unintended consequences.

1 Like

Hi, I tried to implemet [on_failure_callback] in dag level, when dag fails am unable to see that dag failure status in the logs, same thing i implemeted in task level then able to see task failure status in logs. could you please help me out on this

I am facing the same thing. I am unable to find the logs of callbacks at the dag level. I can see the logs of callbacks at the task level.

@venkat @bluejeans59 The logs for the callbacks set at the DAG level are generated at a different location, as indicated here: airflow/dag.py at main · apache/airflow · GitHub

We need to have access to this at the UI level. I think a request needs to be made for this enhancement.