How can I pass SQL as a file w/ Airflow's Postgres Operator?

To pass SQL as a file when leveraging the Postgres Operator you just have to provide a file name with .sql at the end and make sure it’s in your image.

You could consider:

  • Creating a sub folder in the dags folder named sql and putting all of your .sql files in there
  • Using jinja templates and macros in your .sql files too (if you wanted to pass in execution_date to filter your sql)

Example

DAG Code

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 8, 25),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('example_dag',
            max_active_runs=3,
            schedule_interval='@daily',
            default_args=default_args) as dag:

    t1 = PostgresOperator(
        task_id='my_task',
        sql='sql/my_query.sql'
    )

Astro-Airflow Directory


β”œβ”€β”€ dags
β”‚   β”œβ”€β”€ example-dag.py
β”‚   └── sql
β”‚       └── my_query.sql
β”œβ”€β”€ include
β”œβ”€β”€ packages.txt
β”œβ”€β”€ plugins
β”‚   └── example-plugin.py
└── requirements.txt
3 Likes

Another option if you want your sql folder to exist outside of the dag folder is to put the path into the arguments for your dag:

with DAG(β€˜example_dag’,
max_active_runs=3,
schedule_interval=β€˜@daily’,
template_searchpath=β€˜/usr/local/airflow/sql’,
default_args=default_args)

Then you can just reference the file name:

t1 = PostgresOperator(
task_id=β€˜my_task’,
sql=β€˜my_query.sql’
)

Directory Tree:

β”œβ”€β”€ dags
β”‚ β”œβ”€β”€ example-dag.py
β”œβ”€β”€ sql
β”‚ └── my_query.sql
β”œβ”€β”€ include
β”œβ”€β”€ packages.txt
β”œβ”€β”€ plugins
β”‚ └── example-plugin.py
└── requirements.txt

Also might be a good idea to use an airflow variable to store the path and just do:

template_searchpath=Variable.get(β€˜sql_path’)

2 Likes