The right way to configure dbt-core with cosmos without Airflow connections

I’m currently working on a project that involves integrating dbt with Airflow, utilizing the cosmos package. However, I’m facing a challenge. I don’t want to create a connection in Airflow. Instead, I want to use the profile that I already have in my profiles.yaml. The task group is showing in airflow but without the subtask. And the dbt task is never triggered:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.datasets import Dataset
from cosmos.task_group import DbtTaskGroup
import requests

def check_connection_sync_status():
    # Airbyte API endpoint to retrieve connection status
    airbyte_api_url = 'http://localhost:8000/api/v1/connections'

    try:
        response = requests.get(airbyte_api_url)
        response.raise_for_status()
        connections = response.json().get('connections', [])

        # Check the sync status for each connection
        for connection in connections:
            connection_id = connection.get('connectionId')
            status = connection.get('status')
            # Perform necessary checks based on the status of each connection
            # Implement your logic here to handle different status conditions

        # Return True if all connections have successfully synced, else return False
        return all(connection.get('status') == 'synced' for connection in connections)
    
    except requests.exceptions.RequestException as e:
        # Handle API request exception
        print(f"Error: {e}")
        return False




default_args = {
    'start_date': datetime(2023, 6, 13),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG('dbt_execution_dag', default_args=default_args, schedule_interval='@daily') as dag:
    
    connection_status_check_task = PythonOperator(
        task_id='check_connection_sync_status',
        python_callable=check_connection_sync_status
    )


    dbt_run = DbtTaskGroup(
        
        dbt_root_path="/app/dbt",
        dbt_project_name="dbt_project",
        conn_id="",
        profile_args={
            "schema": "public",
            },
        #profile_name_override="dbt_project"
    )



connection_status_check_task >> dbt_run

Hey @saqqaf, great question. We’re releasing a new version of Cosmos (1.0) tomorrow that’ll let you do this! Here’s the PR: Simplify interfaces everywhere to use new Config objects by jlaneve · Pull Request #385 · astronomer/astronomer-cosmos · GitHub

If you star/watch the repository, you should get notified when the release happens.

Great news! looking forward for this update.
I have another issue and would appreciate if you can help. I’m trying now to run DbtTaskGroup with an airflow connection. However, the task never run in airflow.

I’m not sure if there is something wrong with my DAG code or if its due to the way I installed Cosmos. I built an Airflow image on docker and installed astronomer-cosmos packages on it. Here is my DAG code:

    dbt_run = DbtTaskGroup(
        dbt_root_path="../dbt",
        dbt_project_name="dbt_project",
        conn_id="postgres_conn",
        profile_args={
            "schema": "public",
        },
    )

And this is the requirement.txt file that has the cosmos packages installed in the Airflow image:

apache-airflow
dbt-core
dbt-postgres
dbt-clickhouse
astronomer-cosmos
astronomer-cosmos[dbt-all]
astronomer-cosmos[dbt-postgres]
astronomer-cosmos[docker]
astronomer-cosmos[kubernetes]
astronomer-cosmos[dbt-clickhouse]
apache-airflow-providers-airbyte
apache-airflow-providers-postgres