Load Pandas dataframe to Snowflake

I’m trying to load a pandas dataframe to Snowflake and would love some help getting it to work. Here’s my class:

import logging
import pandas as pd
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from snowflake.connector.pandas_tools import pd_writer

LOG = logging.getLogger()
log: logging.log = logging.getLogger('airflow')
log.setLevel(logging.DEBUG)


class Snowflake_dataframe:

    """
    Reads and writes dataframes from Snowflake
    """

    def __init__(self, snowflake_connection_id):
        hook = SnowflakeHook(snowflake_conn_id=snowflake_connection_id)
        self.connection = hook.get_conn()

    def get_dataframe(self, query):
        """Takes a query and returns a dataframe from snowflake
        Ex: `SHOW ROLES` or `SELECT * FROM my_table`

        Args:
            query: The SQL query to run. Can be DCL or DML

        Returns:
            A dataframe with the query result
        """

        df = pd.read_sql(query, self.connection)
        return df

    def write_dataframe(self, table_name, df, if_exists):
        """Write dataframes to Snowflake can be very slow for large data frames.
        This can be written and tested later

        """

        df.to_sql(name=table_name, con=self.connection, if_exists=if_exists, index=False, method=pd_writer)

The error I’m getting is:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2056, in execute
    cur.execute(*args, **kwargs)
  File "/home/astro/.local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 671, in execute
    query = command % processed_params
TypeError: not all arguments converted during string formatting

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 188, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/metadata/snowflake_information_schema.py", line 94, in get_and_write_columns
    SD.write_dataframe(table_name=SNOWFLAKE_TABLE, df=df, if_exists='append')
  File "/usr/local/airflow/include/snowflake/query.py", line 51, in write_dataframe
    df.to_sql(name=table_name, con=self.connection, if_exists=if_exists, index=False)
  File "/usr/local/lib/python3.9/site-packages/pandas/core/generic.py", line 2872, in to_sql
    sql.to_sql(
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 717, in to_sql
    pandas_sql.to_sql(
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2225, in to_sql
    table.create()
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 856, in create
    if self.exists():
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 840, in exists
    return self.pd_sql.has_table(self.name, self.schema)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2236, in has_table
    return len(self.execute(query, [name]).fetchall()) > 0
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2068, in execute
    raise ex from exc
pandas.io.sql.DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': not all arguments converted during string formatting

I’d love help troubleshooting this OR your code that will efficiently load a pandas dataframe to Snowflake in an Airflow job. I’d like to be able to just do df.to_sql in a python_callable.. It’s laborious in all cases to separate the tasks for manipulating and loading the data into separate tasks for some of my use cases.

Hi @jaybythebay, thanks for reaching out!

Manipulating and loading data into Snowflake can be achieved with Astro SDK - please take a look at dataframe operator which allows you to run Python transformations in Airflow (documentation). You can convert the dataframe - the result of your manipulations - into a table by passing the output_table parameter.

In the provided documentation case 1 shows how to run Python transformations, and case 2 how to load the resulting dataframe into Snowflake.

Also, here you can see an example DAG that loads data from S3 to Snowflake, transforms it and saves the result into Snowflake - all with Astro SDK.

2 Likes