I’m trying to load a pandas dataframe to Snowflake and would love some help getting it to work. Here’s my class:
import logging
import pandas as pd
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from snowflake.connector.pandas_tools import pd_writer
LOG = logging.getLogger()
log: logging.log = logging.getLogger('airflow')
log.setLevel(logging.DEBUG)
class Snowflake_dataframe:
"""
Reads and writes dataframes from Snowflake
"""
def __init__(self, snowflake_connection_id):
hook = SnowflakeHook(snowflake_conn_id=snowflake_connection_id)
self.connection = hook.get_conn()
def get_dataframe(self, query):
"""Takes a query and returns a dataframe from snowflake
Ex: `SHOW ROLES` or `SELECT * FROM my_table`
Args:
query: The SQL query to run. Can be DCL or DML
Returns:
A dataframe with the query result
"""
df = pd.read_sql(query, self.connection)
return df
def write_dataframe(self, table_name, df, if_exists):
"""Write dataframes to Snowflake can be very slow for large data frames.
This can be written and tested later
"""
df.to_sql(name=table_name, con=self.connection, if_exists=if_exists, index=False, method=pd_writer)
The error I’m getting is:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2056, in execute
cur.execute(*args, **kwargs)
File "/home/astro/.local/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 671, in execute
query = command % processed_params
TypeError: not all arguments converted during string formatting
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 188, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/metadata/snowflake_information_schema.py", line 94, in get_and_write_columns
SD.write_dataframe(table_name=SNOWFLAKE_TABLE, df=df, if_exists='append')
File "/usr/local/airflow/include/snowflake/query.py", line 51, in write_dataframe
df.to_sql(name=table_name, con=self.connection, if_exists=if_exists, index=False)
File "/usr/local/lib/python3.9/site-packages/pandas/core/generic.py", line 2872, in to_sql
sql.to_sql(
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 717, in to_sql
pandas_sql.to_sql(
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2225, in to_sql
table.create()
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 856, in create
if self.exists():
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 840, in exists
return self.pd_sql.has_table(self.name, self.schema)
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2236, in has_table
return len(self.execute(query, [name]).fetchall()) > 0
File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 2068, in execute
raise ex from exc
pandas.io.sql.DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': not all arguments converted during string formatting
I’d love help troubleshooting this OR your code that will efficiently load a pandas dataframe to Snowflake in an Airflow job. I’d like to be able to just do df.to_sql
in a python_callable.
. It’s laborious in all cases to separate the tasks for manipulating and loading the data into separate tasks for some of my use cases.