Hello - i have a dag like this
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator, BigQueryCheckOperator
from netdata_utilities.dest import dest
class MyBigQueryCheckOperator(BigQueryCheckOperator):
template_fields = ('sql','params')
default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 10, 5),
}
dag = DAG(
dag_id='sendgrid.user360',
default_args=default_args,
schedule_interval=timedelta(minutes=60),
catchup=False,
)
sql_sendgrid_user360 = """
with
sendgrid_user_360 as
(
SELECT
email,
count(distinct sg_message_id) as processed,
FROM
`sendgrid.messages_*`
group by 1
)
select
*
from
sendgrid_user_360
;
"""
sql_sendgrid_user360_validate = """
select
sum(1)=count(distinct email) as check_keys
from
{{ params.table }}
"""
with dag:
start = DummyOperator(task_id='start')
op_sendgrid_user360 = BigQueryExecuteQueryOperator(
task_id='sendgrid.user360',
destination_dataset_table=dest('sendgrid.user360_{{ ds_nodash }}'),
write_disposition='WRITE_TRUNCATE',
sql=sql_sendgrid_user360,
use_legacy_sql=False,
gcp_conn_id='google_cloud_default'
)
op_sendgrid_user360.doc_md = doc_sendgrid_user360
op_sendgrid_user360_validate = MyBigQueryCheckOperator(
task_id='validate__sendgrid.user360',
sql=sql_sendgrid_user360_validate,
params={'table': dest('sendgrid.user360_{{ ds_nodash }}')},
use_legacy_sql=False,
gcp_conn_id='google_cloud_default'
)
end = DummyOperator(task_id='end')
start >> op_sendgrid_user360 >> op_sendgrid_user360_validate >> end
So i have extended to create MyBigQueryCheckOperator
to try tell it to template the params.
And looking here it seems to get the parameters correct but the sql still looks like it has not had {{ ds_nodash }}
rendered.
Any idea what might be going on here?
p.s dest()
is just this helper function that will just send stuff to tmp.
dataset unless we are running in
NETDATA_ANALYTICS_BI_ENV=PROD
so trying to have the validation check understand this too.
from airflow.models import Variable
def dest(destination_dataset_table, prefix_dataset='tmp'):
"""If NETDATA_ANALYTICS_BI_ENV != PROD then write results to `prefix_dataset` instead.
:param destination_dataset_table: destination to write results to.
:return: destination_dataset_table: destination to write results to with prefix added if needed.
"""
NETDATA_ANALYTICS_BI_ENV = Variable.get('NETDATA_ANALYTICS_BI_ENV', 'UNK')
if NETDATA_ANALYTICS_BI_ENV == 'PROD':
return destination_dataset_table
else:
destination_dataset_table_list = destination_dataset_table.replace(':', '.').split('.')
destination_project = destination_dataset_table_list[0]
destination_dataset = prefix_dataset
destination_table = f'{destination_dataset_table_list[1]}_{destination_dataset_table_list[2]}'
return f'{destination_project}.{destination_dataset}.{destination_table}'