Create a custom sensor based on airflow metadatabase? Is it a good practice?

I’m developing a data pipeline that contains two DAGs, one for each filetype. Theses DAGs are triggered via API call. There is a cloud function that controles a batch_id (stored in an cloud SQL). Each batch can contains the two files (filetype_1 and filetype_2). There is a main DAG where the API call is made to. This main DAG trigger the DAG 1 or DAG 2 (or both) based on the filetype information that is passed via Conf in the API call. In the case that the batch contains both filetypes, one API call is made for each filetype (one dag_run per filetype). At some point, I will need to check if the DAG for filetype_2 was triggered before continuing the DAG for filetype_2. But considering that are two different dag_run_ids, I was wondering if is it a good practice to modify the dag_run_id pattern, and then create a SQL sensor for airflow metadatabase to check if a dag_run_id was created for DAG filetype_2 and wait for succes in case if it exists. For illustrate this scenario, image that the batch_id would be

  • batch_id: tenant1_20221120123323
  • The default dag_run_id: manual__2022-11-19T00:00:00+00:00
  • The new dag_run_id: manual__tenant1_20221120123323_filetype2__2022-11-19T00:00:00+00:00, manual__tenant1_20221120123323_filetype1__2022-11-19T00:00:00+00:00

Then I could check in airflow metadatabase if a dag_run_id containing batch_id (tenant1_20221120123323) and filetype_2 was created to identify if filetype_2 was present in batch. In true case, then DAG_1 waits for dag_run_id for filetype_2 succeed.

I would say modifying the metadatabase or the identifiers is not good practice, but I’m sure there’s some way to accomplish what you need.

So, is it the main DAG that needs to wait for the others to complete or a separate DAG? If it is the main DAG you have all the identifiers you need from the API call to just query the API and wait until the DAG it triggered is complete.

Hi Tgoad!
Thanks for answering.
I ended up modifying the main DAG mechanism. In fact, instead to make one API call for each file_type, now I make just one API call with a payload that contains the list of job_types. In this case, there is one branchPythonOperator task that is responsible for identify the existing job types and trigger the respective tasks (TriggerDagRunOperator).