I am writing a unit test for my ETLs and as a process, I want to test all Dags to make sure that they do not have cycles. After reading Data Pipelines with Apache Airflow by Bas Harenslak and Julian de Ruiter I see they are using DAG.test_cycle()
, the DAG here is imported from the module airflow.models.dag
but when I run the code I get an error that AttributeError: 'DAG' object has no attribute 'test_cycle'
Here is my code snipet
import glob
import importlib
import os
import pytest
from airflow.models.dag import DAG
DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
dag.test_cycle()