I am trying to trigger a dag and provide “{{ ds }}” (and “{{ next_ds }}”) macro via the dag_run payload
for some reason it’s interpreted as a literal string “{{ ds }}”
i used the example from Airflow
controller dag:
import pprint
import airflow
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
from datetime import timedelta
pp = pprint.PrettyPrinter(indent=4)def conditionally_trigger(context, dag_run_obj):
“”“This function decides whether or not to Trigger the remote DAG”“”
c_p = context[‘params’][‘condition_param’]
print(“Controller DAG : conditionally_trigger = {}”.format(c_p))
if context[‘params’][‘condition_param’]:
dag_run_obj.payload = {‘message’: context[‘params’][‘message’]}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
return None#current_dt = datetime(2020,2,1).strftime(“%m/%d/%Y”)
#current_dt = print_execution_date()
#current_dt = current_dt + timedelta(days=1)
#current_dt.strftime(“%m/%d/%Y”)
#next_dt = ‘{{ next_ds }}’
#print(current_dt)args = {
‘owner’: ‘airflow’,
‘start_date’: datetime(2020, 2, 1),
‘retries’: 1,
‘email_on_failure’: False,
}dag = DAG(‘example_trigger_controller_dag’, description=‘ETL for all web tables’,
schedule_interval=‘0 5 * * *’,
default_args=args,
max_active_runs=1
)current_dt = “{{ ds }}”
#next_dt = ‘{{ next_ds }}’trigger = TriggerDagRunOperator(
task_id=‘test_trigger_dagrun’,
trigger_dag_id=“example_trigger_target_dag”,
python_callable=conditionally_trigger,
params={‘condition_param’: True, ‘message’: current_dt},
dag=dag
)
target dag
import pprint
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperatorpp = pprint.PrettyPrinter(indent=4)
args = {
‘start_date’: airflow.utils.dates.days_ago(2),
‘owner’: ‘airflow’,
}dag = DAG(
dag_id=‘example_trigger_target_dag’,
default_args=args,
schedule_interval=None,
)def run_this_func(**kwargs):
“”"
Print the payload “message” passed to the DagRun conf attribute.
:param dict kwargs: Context
“”"
print(“Remotely received value of {} for key=message”.
format(kwargs[‘dag_run’].conf[‘message’]))run_this = PythonOperator(
task_id=‘run_this’,
provide_context=True,
python_callable=run_this_func,
dag=dag,
)You can also access the DagRun object in templates
bash_task = BashOperator(
task_id=“bash_task”,
bash_command='echo “Here is the message: ’
'{{ dag_run.conf[“message”] if dag_run else “” }}” ',
dag=dag,
)
the log shows:
[2020-02-12 07:46:58,305] {logging_mixin.py:112} INFO - Remotely received value of **{{ ds }}** for key=message
[2020-02-12 07:46:58,305] {python_operator.py:114} INFO - Done. Returned value was: None
[2020-02-12 07:47:08,170] {logging_mixin.py:112} INFO - [2020-02-12 07:47:08,170] {local_task_job.py:103} INFO - Task exited with return code 0
any thoughts how can i achieve this?