Hello!
I am trying to create a dag which initializes from a value in mongodb. This way I would like to create a sensor as my first task, and when the sensor found a certain record, the dag starts.
I am using astro, and my mongdb is running on a docker-compose. I can’t connect my sensor, and if I try the same config in a task, the same error happen.
My dag:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.mongo.sensors.mongo import MongoSensor
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from tasks.kaggle.main import download_csv
with DAG(
dag_id="get_files_from_kaggle",
start_date=datetime(2023, 1, 1),
schedule_interval="0 0 * * *",
catchup=False,
) as dag:
# getting variables
kaggle_credentials = {
"kaggle_user": Variable.get("kaggle_user"),
"kaggle_password": Variable.get("kaggle_password"),
}
mongodb_credentials = {
"host": Variable.get("mongo_host"),
"user": Variable.get("mongo_user"),
"password": Variable.get("mongo_password"),
"port": Variable.get("mongo_port"),
"db_name": "local"
}
mongo_sensor = MongoSensor(
task_id='mogo_sensor_task',
mongo_conn_id='mongo_connection',
collection='initialize_airflow',
query={'running': 'false'},
poke_interval=10,
timeout=600,
dag=dag,
)
task_initialize_dag = BashOperator(
task_id="initializing_dag", bash_command="echo Initializing Dag!"
)
task_download_csv = PythonOperator(
task_id="getting_csv",
python_callable=download_csv,
op_args=[kaggle_credentials, mongodb_credentials],
)
mongo_sensor >> task_initialize_dag >> task_download_csv
I put the mongo connection on download_csv too, and the connection return an error too:
def download_csv(kaggle_credentials, mongo_credentials):
os.environ["KAGGLE_USERNAME"] = kaggle_credentials["kaggle_user"]
os.environ["KAGGLE_KEY"] = kaggle_credentials["kaggle_password"]
from kaggle.api.kaggle_api_extended import KaggleApi
try:
api = KaggleApi()
api.authenticate()
except Exception as e:
raise Exception(f"An error: {e}")
try:
connection_config = f"mongodb://{mongo_credentials['user']}:{mongo_credentials['password']}@{mongo_credentials['host']}:{mongo_credentials['port']}/"
client = MongoClient(connection_config)
db_connection = client[mongo_credentials["schema"]]
collection = db_connection["initialize_airflow"]
for i in collection.find({}):
print(f"Value: {i}")
print(f"It works: {db_connection}")
except Exception as e:
raise Exception(f"An error: {e}")
I have to make a connection between running services in diferent docker-compose. How can I make a network between this services?
Does anyone have an idea what I am doing wrong here? I am trying to study about airflow sensors.
Thank you so much!