I have a sql file and I need to create a DAG using that sql. I am new to Airflow and using it first time. Anyone knows how to do. Thanks in advance. My sql files has these queries :-
DECLARE idx, col_cnt, row_cnt, idx_row INT64;
DECLARE col_name, col_flag STRING;
DECLARE cmp_cond,lookup_query, lookup_query_row STRING;
DECLARE col_list ARRAY ;
DECLARE is_required BOOLEAN;
DECLARE event_names_len, valid_values_len INT64;
DECLARE logic_based_fields STRING; – this varible is used to hard-coded the rules that are not in the lookup table
DECLARE current_event_date STRING DEFAULT CONCAT("’",CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) AS STRING),"’");
– Re-create temp table to get invalid flags fields from base and lookup tables
CREATE OR REPLACE TABLE st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
AS
SELECT
base.column_name,
base.column_flag,
base.required_field_flag,
base.event_names,
base.valid_values,
base.field_name,
base.__row_number,
DENSE_RANK() OVER(PARTITION BY base.column_name ORDER BY base.__row_number) AS field_rank
FROM
(
SELECT
bc.column_name,
vlk.field_name,
bc.column_flag,
vlk.required_field_flag,
vlk.event_names,
vlk.valid_values,
ROW_NUMBER() OVER() AS __row_number
FROM
(SELECT
column_name as column_flag,
SUBSTR (column_name, 1, INSTR(column_name, ‘is_invalid’)-2) column_name
FROM
st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.INFORMATION_SCHEMA.COLUMNS
WHERE
table_name = ‘st_vix_ott_dev_dq_monitoring_base_test’
AND column_name LIKE “%is_invalid%”
) bc
INNER JOIN st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_valid_values_lookup_test
vlk
ON bc.column_name = vlk.field_name
ORDER BY 1
) base
ORDER BY base.__row_number;
–SELECT * FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
;
– Set control variables
SET col_cnt = (SELECT COUNT (*)
FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
);
SET idx = 1;
SET lookup_query = ‘’;
–build case statements dynamically based on valid values on the lookup table
WHILE idx <= col_cnt DO
SET (col_flag,col_name) = (
SELECT AS STRUCT column_flag,column_name
FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
WHERE __row_number = idx
);
SET row_cnt = (SELECT count(*)
FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
WHERE IFNULL(field_name,’’) = col_name );
IF row_cnt = 1 THEN
–Check if event_names & valid_values arrays are not empty
SET (event_names_len, valid_values_len, is_required) = (
SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag
FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
WHERE field_name = col_name
);
--Check event_name + field_name is required + field name has invalid values
IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")),
"')) THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
--Check event_name + field_name is required
ELSEIF (is_required AND event_names_len > 0 AND valid_values_len = 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),")') AND IFNULL("
,col_name,",'') = '' THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
--Check field_name is required
ELSEIF (is_required AND event_names_len = 0 AND valid_values_len = 0) THEN
SET cmp_cond = (
SELECT CONCAT (",CASE WHEN IFNULL(", col_name, ",'') = '' THEN true ELSE false END AS ",col_flag)
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE field_name = col_name
);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
END IF;
– field_name with multiple rows
ELSEIF row_cnt > 1 THEN
SET idx_row = 1;
SET lookup_query_row = ‘’;
WHILE idx_row <= row_cnt DO
--Check if event_names & valid_values arrays are not empty
SET (event_names_len, valid_values_len, is_required) = (
SELECT AS STRUCT ARRAY_LENGTH(event_names),ARRAY_LENGTH(valid_values), required_field_flag
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE column_name = col_name and field_rank = idx_row
);
--Check event_name + field_name is required + field name has invalid values
IF (is_required AND event_names_len > 0 AND valid_values_len > 0) THEN
SET cmp_cond = (
SELECT CONCAT ("WHEN REGEXP_CONTAINS(LOWER(event_name), '^(",LOWER(ARRAY_TO_STRING(event_names,"|")),
")') AND (IFNULL(",col_name,",'') = '' OR LOWER(CAST(", LOWER(col_name), " AS STRING)) NOT IN ('", LOWER(ARRAY_TO_STRING (valid_values, "', '")),
"')) THEN true ")
FROM `st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp`
WHERE column_name = col_name and field_rank = idx_row
);
SET lookup_query_row = CONCAT(lookup_query_row, cmp_cond);
END IF;
SET idx_row = idx_row + 1;
END WHILE;
SET lookup_query = CONCAT(lookup_query,",CASE ", lookup_query_row, "ELSE false END AS ", col_flag);
SET idx = idx + row_cnt - 1; -- increment to go to the next field
ELSE
SET cmp_cond = CONCAT(",NULL AS ", col_flag);
SET lookup_query = CONCAT(lookup_query, cmp_cond);
END IF;
SET idx = idx + 1; --counter main while loop
END WHILE;
– This is a workaround due to BQ’s dynamic SQL limitations with nested CASE statements
– These fields aren’t in the valid values lookup table
SET logic_based_fields = (SELECT “”"
,CASE
WHEN LOWER(event_name) LIKE ‘%video%’ AND IFNULL(video_id_channel_id_sports_event_id,’’) = ‘’ THEN true
ELSE false END AS video_id_channel_id_sports_event_id_is_invalid_flag
,CASE
WHEN LOWER(event_name) LIKE ‘%video%’
AND ((IFNULL(navigation_section,’’) =’’ AND is_epg IS NOT NULL)
OR (is_epg IS NULL AND IFNULL(navigation_section,’’) <>’’)
OR (is_epg = TRUE AND IFNULL(epg_category,’’) = ‘’)) THEN true
ELSE false END AS client_path_sensitive_properties_is_invalid_flag
,CASE
WHEN LOWER(event_name) = ‘video content playing’
AND (video_heartbeat_value IS NULL OR video_heartbeat_value > 60) THEN TRUE
ELSE FALSE END AS video_heartbeat_value_is_invalid_flag
,CASE WHEN LOWER(event_name) LIKE ‘%video%’ THEN 1 ELSE 0 END AS video_event_flag
“”");
– Dynamic SQL to create temp table that will be use to insert into base table and invalid values table
EXECUTE IMMEDIATE format("""
CREATE OR REPLACE TABLE st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp
AS
SELECT
event_date
,anonymous_id
,sl.context_segment_source AS platform_name
,os_version
,event_id
,event_name
,event_type
,stream_type
,session_id
,stream_id
,ip
,navigation_section
,is_epg
,epg_category
,screen_id
,screen_title
,screen_type
,video_content_vertical
,video_genres_first
,video_id_channel_id_sports_event_id
,video_id
,channel_id
,sports_event_id
,video_is_kids
,video_player_mode
,video_title
,video_type
,video_heartbeat_value
,CASE WHEN event_name = ‘Video Content Started’ THEN true ELSE false END AS event_is_video_start_flag
%s
%s
FROM (
SELECT
context_protocols_source_id,
DATE(original_timestamp) AS event_date,
id AS event_id,
original_event_name AS event_name,
original_event_type AS event_type,
context_ip AS ip,
anonymous_id,
user_id,
COALESCE(session_id,
context_screen_properties_session_id) AS session_id,
screen_id,
screen_title,
screen_type,
stream_id,
stream_type,
video_id,
video_type,
video_title,
video_genres_first,
video_content_vertical,
video_is_kids,
video_player_mode,
video_heartbeat_value,
channel_id,
sports_event_id,
COALESCE(COALESCE(channel_id,video_id),sports_event_id) AS video_id_channel_id_sports_event_id,
is_epg,
epg_category_id,
epg_category,
navigation_section,
context_os_version AS os_version,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY loaded_at DESC) AS __row_number
FROM
st-vix-ott-dev.vix_collapsed_events_dev.master_event
WHERE
DATE(_PARTITIONTIME) = %s
AND DATE(original_timestamp) = %s
) AS mev
LEFT JOIN
st-vix-ott-dev.st_vix_ott_dev_us_data_master_dataset.st_vix_ott_dev_data_segment_lookup_table
sl
ON
mev.context_protocols_source_id = sl.context_protocols_source_id
WHERE mev.__row_number = 1
“”",
logic_based_fields,
lookup_query,
current_event_date,
current_event_date
);
–Insert into the base and invalid values tables
IF (SELECT COUNT(*) FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp
) > 0 THEN
–Delete current event date data to handle multiple runs in the same day
EXECUTE IMMEDIATE format("""
DELETE
FROM
st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test
WHERE event_date = %s;
“”",
current_event_date
);
EXECUTE IMMEDIATE format("""
DELETE
FROM
st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test
WHERE event_date = %s;
“”",
current_event_date
);
–Insert into base table
INSERT INTO st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_test
(
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
stream_type,
session_id,
stream_id,
ip,
navigation_section,
is_epg,
epg_category,
screen_id,
screen_title,
screen_type,
video_content_vertical,
video_genres_first,
video_id_channel_id_sports_event_id,
video_id,
channel_id,
sports_event_id,
video_is_kids,
video_player_mode,
video_type,
anonymous_id_is_invalid_flag,
client_path_sensitive_properties_is_invalid_flag,
event_is_video_start_flag,
ip_is_invalid_flag,
screen_id_is_invalid_flag,
screen_title_is_invalid_flag,
screen_type_is_invalid_flag,
session_id_is_invalid_flag,
stream_id_is_invalid_flag,
stream_type_is_invalid_flag,
video_heartbeat_value,
video_content_vertical_is_invalid_flag,
video_genres_first_is_invalid_flag,
video_heartbeat_value_is_invalid_flag,
video_id_channel_id_sports_event_id_is_invalid_flag,
video_is_kids_is_invalid_flag,
video_player_mode_is_invalid_flag,
video_type_is_invalid_flag,
video_event_flag,
created_datetime
)
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
stream_type,
session_id,
stream_id,
ip,
navigation_section,
is_epg,
epg_category,
screen_id,
screen_title,
screen_type,
video_content_vertical,
video_genres_first,
video_id_channel_id_sports_event_id,
video_id,
channel_id,
sports_event_id,
video_is_kids,
video_player_mode,
video_type,
anonymous_id_is_invalid_flag,
client_path_sensitive_properties_is_invalid_flag,
event_is_video_start_flag,
ip_is_invalid_flag,
screen_id_is_invalid_flag,
screen_title_is_invalid_flag,
screen_type_is_invalid_flag,
session_id_is_invalid_flag,
stream_id_is_invalid_flag,
stream_type_is_invalid_flag,
video_heartbeat_value,
video_content_vertical_is_invalid_flag,
video_genres_first_is_invalid_flag,
video_heartbeat_value_is_invalid_flag,
video_id_channel_id_sports_event_id_is_invalid_flag,
video_is_kids_is_invalid_flag,
video_player_mode_is_invalid_flag,
video_type_is_invalid_flag,
video_event_flag,
CURRENT_DATETIME()
FROM st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp
;
–Insert into invalid value_values tables
INSERT INTO st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_invalid_values_test
(
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_invalid_values,
created_datetime
)
WITH cte_invalid
AS
(
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_id,
event_name,
event_type,
CASE WHEN video_id_channel_id_sports_event_id_is_invalid_flag THEN video_id_channel_id_sports_event_id ELSE ‘valid’ END AS video_id_channel_id_sports_event_id,
CASE WHEN video_heartbeat_value_is_invalid_flag THEN cast(video_heartbeat_value as string) ELSE ‘valid’ END AS video_heartbeat_value,
CASE WHEN ip_is_invalid_flag THEN ip ELSE ‘valid’ END AS ip,
CASE WHEN screen_id_is_invalid_flag THEN screen_id ELSE ‘valid’ END AS screen_id,
CASE WHEN screen_title_is_invalid_flag THEN screen_title ELSE ‘valid’ END AS screen_title,
CASE WHEN screen_type_is_invalid_flag THEN screen_type ELSE ‘valid’ END AS screen_type,
CASE WHEN session_id_is_invalid_flag THEN session_id ELSE ‘valid’ END AS session_id,
CASE WHEN stream_id_is_invalid_flag THEN stream_id ELSE ‘valid’ END AS stream_id,
CASE WHEN stream_type_is_invalid_flag THEN stream_type ELSE ‘valid’ END AS stream_type,
CASE WHEN video_content_vertical_is_invalid_flag THEN video_content_vertical ELSE ‘valid’ END AS video_content_vertical,
CASE WHEN video_genres_first_is_invalid_flag THEN video_genres_first ELSE ‘valid’ END AS video_genres_first,
CASE WHEN video_is_kids_is_invalid_flag THEN video_is_kids ELSE ‘valid’ END AS video_is_kids,
CASE WHEN video_player_mode_is_invalid_flag THEN video_player_mode ELSE ‘valid’ END AS video_player_mode,
CASE WHEN video_type_is_invalid_flag THEN video_type ELSE ‘valid’ END AS video_type,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN navigation_section ELSE ‘valid’ END AS navigation_section,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN CAST(is_epg AS STRING) ELSE ‘valid’ END AS is_epg,
CASE WHEN client_path_sensitive_properties_is_invalid_flag THEN epg_category ELSE ‘valid’ END AS epg_category
FROM
st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp
WHERE
video_id_channel_id_sports_event_id_is_invalid_flag
OR client_path_sensitive_properties_is_invalid_flag
OR video_heartbeat_value_is_invalid_flag
OR anonymous_id_is_invalid_flag
OR ip_is_invalid_flag
OR screen_id_is_invalid_flag
OR screen_title_is_invalid_flag
OR screen_type_is_invalid_flag
OR session_id_is_invalid_flag
OR stream_id_is_invalid_flag
OR stream_type_is_invalid_flag
OR video_content_vertical_is_invalid_flag
OR video_genres_first_is_invalid_flag
OR video_is_kids_is_invalid_flag
OR video_player_mode_is_invalid_flag
OR video_type_is_invalid_flag
),
cte_invalid_agg
AS
(SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_id,
ARRAY_AGG(STRUCT(field_name,invalid_field_value)) AS invalid_field_value,
FROM
(
SELECT
*
FROM
cte_invalid
) sl
UNPIVOT INCLUDE NULLS
(
invalid_field_value FOR field_name IN (video_id_channel_id_sports_event_id,
video_heartbeat_value,
ip,
screen_id,
screen_title,
screen_type,
session_id,
stream_id,
stream_type,
video_content_vertical,
video_genres_first,
video_is_kids,
video_player_mode,
video_type,
navigation_section,
is_epg,
epg_category
)
)
WHERE IFNULL(invalid_field_value,'') <> 'valid'
GROUP BY
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
event_id
)
SELECT
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type,
ARRAY_AGG(STRUCT(event_id, invalid_field_value)) AS event_invalid_values,
CURRENT_DATETIME()
FROM cte_invalid_agg
GROUP BY
event_date,
anonymous_id,
platform_name,
os_version,
event_name,
event_type;
–Drop temp tables
DROP TABLE st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_temp
;
DROP TABLE st-vix-ott-dev.st_vix_ott_dev_us_data_dq_quality_checks.st_vix_ott_dev_dq_monitoring_base_fields_temp
;
END IF