Retrieving dag run time config values

I want to retrieve dag run time config passed while triggering dag. I want to use this to branch based on the load type.

How do I pass the run time config value to the python function in my dag. Please help.
My code is given below:

load_type = BranchPythonOperator(

          task_id='load_type',

          python_callable=load_type,

          #load_type = '{{ dag_run.conf }}'

          load_type = "{{ dag_run.conf['load_type'] }}"

)

def load_type(**kwargs):

load_type = ''

dag_run = kwargs.get('dag_run')

load_type = kwargs.get('load_type')

#load_type = kwargs['dag_run'].conf['load_type']

for key, value in kwargs.items():

    print(key, '-', value)

print (dag_run)

if dag_run is not None:

    print("inside if stmt")

    load_type = kwargs['dag_run'].conf['load_type']

    #load_type = kwargs['dag_run'].conf.get('load_type')



print("received load_type: ", load_type)

if load_type == 'Incremental':

    next_task_id = 'IncrementalBranch'

elif load_type == 'History':

    next_task_id = 'HistoryBranch' 

else:

    next_task_id = 'IncrementalBranch'

return next_task_id

Hi @suga!

Passing Jinja expressions as args to operators can only be done for template_fields of an operator. For the BranchPythonOperator these fields are op_args and op_kwargs (since this operator inherits from the PythonOperator). You can try passing in the “load_type” value into op_kwargs like this:

load_type = BranchPythonOperator(
    task_id='load_type',
    python_callable=load_type,
    op_kwargs={"load_type": "{{ dag_run.conf['load_type'] }}"},
)

Then you can change the load_type() function signature to handle the new argument explicitly if you wish:

def load_type(load_type, **kwargs):
...
1 Like

Thank you Josh for the response. I have implemented in a similar way…

def load_type(ds, **kwargs):
if ‘load_type’ in kwargs[‘dag_run’].conf:
load_type = kwargs[‘dag_run’].conf[‘load_type’]
else:
load_type = ‘Incremental’
#print("received load_type: ", load_type)

if load_type == 'Incremental':
    next_task_id = 'IncrementalBranch'
elif load_type == 'History':
    next_task_id = 'HistoryBranch' 
else:
    next_task_id = 'IncrementalBranch'
return next_task_id

#using provide context
load_type = BranchPythonOperator(
    task_id='load_type',
    provide_context=True,       
    python_callable=load_type,
    dag=dag,
)