Not sure why it wont render {{ ds_nodash }}

Hello - i have a dag like this

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator, BigQueryCheckOperator

from netdata_utilities.dest import dest


class MyBigQueryCheckOperator(BigQueryCheckOperator):
    template_fields = ('sql','params')


default_args = {
    "owner": "Airflow",
    "start_date": datetime(2021, 10, 5),
}

dag = DAG(
    dag_id='sendgrid.user360',
    default_args=default_args,
    schedule_interval=timedelta(minutes=60),
    catchup=False,
)

sql_sendgrid_user360 = """
with

sendgrid_user_360 as
(
SELECT
  email,
  count(distinct sg_message_id) as processed,
FROM
  `sendgrid.messages_*`
group by 1
)


select
  *
from
  sendgrid_user_360
;
"""

sql_sendgrid_user360_validate = """
select
  sum(1)=count(distinct email) as check_keys
from 
  {{ params.table }}
"""

with dag:

    start = DummyOperator(task_id='start')

    op_sendgrid_user360 = BigQueryExecuteQueryOperator(
        task_id='sendgrid.user360',
        destination_dataset_table=dest('sendgrid.user360_{{ ds_nodash }}'),
        write_disposition='WRITE_TRUNCATE',
        sql=sql_sendgrid_user360,
        use_legacy_sql=False,
        gcp_conn_id='google_cloud_default'
    )
    op_sendgrid_user360.doc_md = doc_sendgrid_user360

    op_sendgrid_user360_validate = MyBigQueryCheckOperator(
        task_id='validate__sendgrid.user360',
        sql=sql_sendgrid_user360_validate,
        params={'table': dest('sendgrid.user360_{{ ds_nodash }}')},
        use_legacy_sql=False,
        gcp_conn_id='google_cloud_default'
    )

    end = DummyOperator(task_id='end')

    start >> op_sendgrid_user360 >> op_sendgrid_user360_validate >> end

So i have extended to create MyBigQueryCheckOperator to try tell it to template the params.

And looking here it seems to get the parameters correct but the sql still looks like it has not had {{ ds_nodash }} rendered.

Any idea what might be going on here?

p.s dest() is just this helper function that will just send stuff to tmp. dataset unless we are running in
NETDATA_ANALYTICS_BI_ENV=PROD so trying to have the validation check understand this too.

from airflow.models import Variable


def dest(destination_dataset_table, prefix_dataset='tmp'):
    """If NETDATA_ANALYTICS_BI_ENV != PROD then write results to `prefix_dataset` instead.

    :param destination_dataset_table: destination to write results to.
    :return: destination_dataset_table: destination to write results to with prefix added if needed.
    """

    NETDATA_ANALYTICS_BI_ENV = Variable.get('NETDATA_ANALYTICS_BI_ENV', 'UNK')
    if NETDATA_ANALYTICS_BI_ENV == 'PROD':
        return destination_dataset_table
    else:
        destination_dataset_table_list = destination_dataset_table.replace(':', '.').split('.')
        destination_project = destination_dataset_table_list[0]
        destination_dataset = prefix_dataset
        destination_table = f'{destination_dataset_table_list[1]}_{destination_dataset_table_list[2]}'
        return f'{destination_project}.{destination_dataset}.{destination_table}'

Hi @andrewm4894!

When you call the dest() function in the BigQueryExecuteQueryOperator this is considered top-level code. This means that each time the DAG is parsed this function is executed. While you should try to avoid top-level code as much as possible in you DAG files to avoid performance degradations, this also means that the dest() function doesn’t use the rendered value in your Jinja expression for {{ ds_nodash }} but the literal string “{{ ds_nodash }}”.

Try checking out user_defined_macros – great info here. You can define dest() as a user_defined_macro at the DAG level and the entire function itself can be accessed within a Jinja expression or directly within the SQL query you want to execute.

Hope this helps!

1 Like