How can I set my on_failure and on_success callbacks to have the task context?

#1

If you’re setting up Slack alerts or want to otherwise pass an on_failure_callback function and would like to, say, access a file from a worker upon failure, you can retrieve context.

  1. Pass the context to the function by defining it as follows:

def on_failure_callback(context)

  1. Make sure you’re also passing in provide_context=True either as a kwarg or default_arg. Check out this example, and notice how provide_context is passed to default_args:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=30),
    'provide_context': True
}

def my_task_py():
    print('Hello World')

def on_dag_failure(context):
    print_task_instances(context)

def print_task_instances(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(task_instances)

##########################################

dag = DAG('example_dag',
          concurrency=1,
          catchup=False,
          schedule_interval='@hourly',
          default_args=default_args,
          on_failure_callback=on_dag_failure)

example_task = PythonOperator(
    task_id='my_task',
    python_callable=my_task_py,
    dag=dag)```
0 Likes

I wrote a file to disk - why can't a downstream task find it?
I’m writing files to the workers that I sometimes need to retrieve. How can I do that?