Set Connection params at Runtime

Lets say I have a Databricks connection setup and saved in Airflow. Is there a way at runtime that I can feed or override some of the parameters in that connection. IE. It would be nice if I could set up a generalized connection for databricks one time for a client and then utilize that one saved connection but override the ‘host’ or ‘schema’ param at Dag runtime depending on which databricks env I want to run in.

Hi @Deslyxia, thanks for reaching out!

You can update the connections leveraging a function decorator @provide_session that provides a session.

Below you can see an example based on a dummy HTTP connection (test_http_conn). After updating host, the change is saved to the database.

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.models import Connection
from airflow.operators.bash import BashOperator
from airflow.utils.db import provide_session

@provide_session
def update_conn(conn_id, session=None):
    conn = session.query(Connection).filter(Connection.conn_id == conn_id).one()
    conn.host = 'updated_host'
    session.add(conn)
    session.commit()


with DAG(dag_id='my_test_dag',
         start_date=pendulum.datetime(2022, 9, 7, tz="UTC"),
         schedule_interval=None,
         ) as dag:

    original_conn, updated_conn = [BashOperator(
        task_id=tid,
        bash_command='echo {{ conn.test_http_conn.host }}',
    ) for tid in ['original_conn', 'updated_conn']]

    @task
    def change_host():
        update_conn(conn_id = 'test_http_conn')

    original_conn >> change_host() >> updated_conn
1 Like