Managing snowflake Tasks and Tree Tasks with Airflow

I’d like to provide some context about Snowflake Tasks: They possess their own scheduling system, often referred to as a cron source. These tasks are asynchronous in nature, meaning that once a Snowflake Task commences, it’s immediately deemed successful, even though it might only have just started. A Snowflake task doesn’t return an exit code. Consequently, the only method to ascertain its completion or success is by examining the Snowflake logs to determine if the task executed correctly. Given this understanding of Snowflake tasks, i have two questions :

  1. How can we effectively orchestrate Snowflake Tasks using Airflow? To give a specific example: How would one orchestrate a sequence where Task A triggers Task B, with both A and B being Snowflake tasks?
  2. Is it considered a good practice to orchestrate Snowflake tasks using Apache Airflow?

Response from ask.astronomer.io:

Orchestrating Snowflake tasks with Apache Airflow is entirely possible and can be a good practice depending on the specifics of your use case.

Here’s a simple example of how you could orchestrate Snowflake tasks using Airflow:

  1. Use the SnowflakeOperator to start Task A.
  2. Then, use the sensor mechanism in Airflow to check the Snowflake logs and determine whether Task A has completed successfully. This can be done by writing a custom Snowflake sensor or using the SqlSensor with a suitable SQL query.
  3. Once Task A has completed successfully, you can then use another SnowflakeOperator to start Task B.

Here’s a basic example of how this might look in code:

from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.sensors.sql import SqlSensor
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

dag = DAG(
    'snowflake_tasks',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
)

start_task_A = SnowflakeOperator(
    task_id='start_task_A',
    sql='CALL system$task_run("TASK_A");',
    snowflake_conn_id='snowflake_default',
    dag=dag,
)

sensor_task_A = SqlSensor(
    task_id='sensor_task_A',
    conn_id='snowflake_default',
    sql="SELECT * FROM TABLE(information_schema.task_history) WHERE NAME='TASK_A' AND STATE='SUCCESS';",
    dag=dag,
)

start_task_B = SnowflakeOperator(
    task_id='start_task_B',
    sql='CALL system$task_run("TASK_B");',
    snowflake_conn_id='snowflake_default',
    dag=dag,
)

start_task_A >> sensor_task_A >> start_task_B

This example assumes that you have a Snowflake connection set up in your Airflow instance with the connection ID ‘snowflake_default’. It also assumes that you have two Snowflake tasks set up, named ‘TASK_A’ and ‘TASK_B’, and that you can use the system$task_run procedure to start these tasks.

However, keep in mind that this is a simplified example and you may need to adjust it to fit your specific needs, especially the SQL query in the SqlSensor. You might also need to handle cases where the tasks fail or take longer than expected to complete.

As for whether it’s good practice to orchestrate Snowflake tasks with Airflow, it depends on your specific use case. If you need more advanced scheduling or dependency management than what Snowflake tasks provide, or if you want to integrate your Snowflake tasks into a larger workflow that includes non-Snowflake tasks, then using Airflow can be a good choice. On the other hand, if your use case is simple and entirely contained within Snowflake, using Snowflake’s built-in task scheduling might be simpler and more efficient.