suga
September 20, 2021, 3:43pm
1
I want to retrieve dag run time config passed while triggering dag. I want to use this to branch based on the load type.
How do I pass the run time config value to the python function in my dag. Please help.
My code is given below:
load_type = BranchPythonOperator(
task_id='load_type',
python_callable=load_type,
#load_type = '{{ dag_run.conf }}'
load_type = "{{ dag_run.conf['load_type'] }}"
)
def load_type(**kwargs):
load_type = ''
dag_run = kwargs.get('dag_run')
load_type = kwargs.get('load_type')
#load_type = kwargs['dag_run'].conf['load_type']
for key, value in kwargs.items():
print(key, '-', value)
print (dag_run)
if dag_run is not None:
print("inside if stmt")
load_type = kwargs['dag_run'].conf['load_type']
#load_type = kwargs['dag_run'].conf.get('load_type')
print("received load_type: ", load_type)
if load_type == 'Incremental':
next_task_id = 'IncrementalBranch'
elif load_type == 'History':
next_task_id = 'HistoryBranch'
else:
next_task_id = 'IncrementalBranch'
return next_task_id
Hi @suga !
Passing Jinja expressions as args to operators can only be done for template_fields
of an operator. For the BranchPythonOperator
these fields are op_args
and op_kwargs
(since this operator inherits from the PythonOperator
). You can try passing in the “load_type” value into op_kwargs
like this:
load_type = BranchPythonOperator(
task_id='load_type',
python_callable=load_type,
op_kwargs={"load_type": "{{ dag_run.conf['load_type'] }}"},
)
Then you can change the load_type()
function signature to handle the new argument explicitly if you wish:
def load_type(load_type, **kwargs):
...
1 Like
suga
September 20, 2021, 5:57pm
3
Thank you Josh for the response. I have implemented in a similar way…
def load_type(ds, **kwargs):
if ‘load_type’ in kwargs[‘dag_run’].conf:
load_type = kwargs[‘dag_run’].conf[‘load_type’]
else:
load_type = ‘Incremental’
#print ("received load_type: ", load_type)
if load_type == 'Incremental':
next_task_id = 'IncrementalBranch'
elif load_type == 'History':
next_task_id = 'HistoryBranch'
else:
next_task_id = 'IncrementalBranch'
return next_task_id
#using provide context
load_type = BranchPythonOperator(
task_id='load_type',
provide_context=True,
python_callable=load_type,
dag=dag,
)