I’m currently working on a project that involves integrating dbt with Airflow, utilizing the cosmos
package. However, I’m facing a challenge. I don’t want to create a connection in Airflow. Instead, I want to use the profile that I already have in my profiles.yaml
. The task group is showing in airflow but without the subtask. And the dbt task is never triggered:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.datasets import Dataset
from cosmos.task_group import DbtTaskGroup
import requests
def check_connection_sync_status():
# Airbyte API endpoint to retrieve connection status
airbyte_api_url = 'http://localhost:8000/api/v1/connections'
try:
response = requests.get(airbyte_api_url)
response.raise_for_status()
connections = response.json().get('connections', [])
# Check the sync status for each connection
for connection in connections:
connection_id = connection.get('connectionId')
status = connection.get('status')
# Perform necessary checks based on the status of each connection
# Implement your logic here to handle different status conditions
# Return True if all connections have successfully synced, else return False
return all(connection.get('status') == 'synced' for connection in connections)
except requests.exceptions.RequestException as e:
# Handle API request exception
print(f"Error: {e}")
return False
default_args = {
'start_date': datetime(2023, 6, 13),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG('dbt_execution_dag', default_args=default_args, schedule_interval='@daily') as dag:
connection_status_check_task = PythonOperator(
task_id='check_connection_sync_status',
python_callable=check_connection_sync_status
)
dbt_run = DbtTaskGroup(
dbt_root_path="/app/dbt",
dbt_project_name="dbt_project",
conn_id="",
profile_args={
"schema": "public",
},
#profile_name_override="dbt_project"
)
connection_status_check_task >> dbt_run