Connection Error using Mongo Sensor

Hello!

I am trying to create a dag which initializes from a value in mongodb. This way I would like to create a sensor as my first task, and when the sensor found a certain record, the dag starts.
I am using astro, and my mongdb is running on a docker-compose. I can’t connect my sensor, and if I try the same config in a task, the same error happen.

My dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.mongo.sensors.mongo import MongoSensor
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from tasks.kaggle.main import download_csv

with DAG(
    dag_id="get_files_from_kaggle",
    start_date=datetime(2023, 1, 1),
    schedule_interval="0 0 * * *",
    catchup=False,
) as dag:

    # getting variables
    kaggle_credentials = {
        "kaggle_user": Variable.get("kaggle_user"),
        "kaggle_password": Variable.get("kaggle_password"),
    }
    mongodb_credentials = {
        "host": Variable.get("mongo_host"),
        "user": Variable.get("mongo_user"),
        "password": Variable.get("mongo_password"),
        "port": Variable.get("mongo_port"),
        "db_name": "local"
    }

    mongo_sensor = MongoSensor(
    task_id='mogo_sensor_task',
    mongo_conn_id='mongo_connection',
    collection='initialize_airflow',
    query={'running': 'false'},
    poke_interval=10,
    timeout=600,
    dag=dag,
)

    task_initialize_dag = BashOperator(
        task_id="initializing_dag", bash_command="echo Initializing Dag!"
    )



    task_download_csv = PythonOperator(
        task_id="getting_csv",
        python_callable=download_csv,
        op_args=[kaggle_credentials, mongodb_credentials],
    )

    mongo_sensor >> task_initialize_dag >> task_download_csv

I put the mongo connection on download_csv too, and the connection return an error too:

def download_csv(kaggle_credentials, mongo_credentials):
    os.environ["KAGGLE_USERNAME"] = kaggle_credentials["kaggle_user"]
    os.environ["KAGGLE_KEY"] = kaggle_credentials["kaggle_password"]
    from kaggle.api.kaggle_api_extended import KaggleApi

    try:
        api = KaggleApi()
        api.authenticate()
    except Exception as e:
        raise Exception(f"An error: {e}")

    try:
        connection_config = f"mongodb://{mongo_credentials['user']}:{mongo_credentials['password']}@{mongo_credentials['host']}:{mongo_credentials['port']}/"
        client = MongoClient(connection_config)
        db_connection = client[mongo_credentials["schema"]]
        collection = db_connection["initialize_airflow"]
        for i in collection.find({}):
            print(f"Value: {i}")
        print(f"It works: {db_connection}")
    except Exception as e:
        raise Exception(f"An error: {e}")

I have to make a connection between running services in diferent docker-compose. How can I make a network between this services?
Does anyone have an idea what I am doing wrong here? I am trying to study about airflow sensors.

Thank you so much!

The traceback, error message:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 657d329f8c3d9505a13b303c, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused')>]>
[2023-12-16, 05:16:45 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=get_files_from_kaggle, task_id=mogo_sensor_task, execution_date=20231216T023430, start_date=20231216T051615, end_date=20231216T051645
[2023-12-16, 05:16:45 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 71 for task mogo_sensor_task (localhost:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 657d329f8c3d9505a13b303c, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused')>]>; 201)
[2023-12-16, 05:16:45 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-12-16, 05:16:45 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

Hey @visus

If MongoDB is running on your local machine in a docker container, then you have to use the hostname as host.docker.internal and not localhost. For reference, see this link.

Also, I noticed that you are saving your credentials in Airflow Variables, which is not recommended. You should always use Airflow Connections for credentials, as they are secure and encrypted. I hope you have created mongo_connection in your Airflow Connections as you are using it in MongoSensor.

To see a detailed reference on how to use MongoDB with Airflow, refer these docs:

Thanks
Manmeet

Thank you @manmeet !
I noticed I was facing a docker config.
At this time I am using the mongo env config from de same docker-compose as airflow, in local machine:

mongodb:
    image: mongo
    ports:
      - "27017:27017"
    volumes:
      - mongodata:/data/db
    environment:
      MONGO_INITDB_ROOT_USERNAME: mongo
      MONGO_INITDB_ROOT_PASSWORD: senha

And I do the connection configs as follow:

In my dag.py:

task_mongo_sensor = MongoSensor(
        task_id='mogo_sensor_task',
        mongo_conn_id='mongo_connection',
        collection='initialize_airflow',
        query={"downloaded": False},
        poke_interval=10,
        timeout=600,
        dag=dag,
    )

This is my containers:

CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS                        PORTS                                           NAMES
d84f8d2698e8   study_airflow_airflow-triggerer   "/usr/bin/dumb-init …"   23 minutes ago   Up About a minute (healthy)   8080/tcp                                        study_airflow-airflow-triggerer-1
70eaeddbb881   study_airflow_airflow-webserver   "/usr/bin/dumb-init …"   23 minutes ago   Up About a minute (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp       study_airflow-airflow-webserver-1
fe16b0847714   study_airflow_airflow-worker      "/usr/bin/dumb-init …"   23 minutes ago   Up About a minute (healthy)   8080/tcp                                        study_airflow-airflow-worker-1
45db1d3c2f42   study_airflow_airflow-scheduler   "/usr/bin/dumb-init …"   23 minutes ago   Up About a minute (healthy)   8080/tcp                                        study_airflow-airflow-scheduler-1
3b3c7fb077d9   postgres:14.5                     "docker-entrypoint.s…"   23 minutes ago   Up About a minute             0.0.0.0:5432->5432/tcp, :::5432->5432/tcp       study_airflow-my_postgres_db-1
e78e682da117   mongo                             "docker-entrypoint.s…"   23 minutes ago   Up About a minute             0.0.0.0:27017->27017/tcp, :::27017->27017/tcp   study_airflow-mongodb-1
07dda01fe36c   postgres:13                       "docker-entrypoint.s…"   23 minutes ago   Up About a minute (healthy)   5432/tcp                                        study_airflow-postgres-1
0dbf51fcd025   redis:latest                      "docker-entrypoint.s…"   23 minutes ago   Up About a minute (healthy)   6379/tcp                                        study_airflow-redis-1

And I am faccing the following error:

pymongo.errors.OperationFailure: Authentication failed., full error: {'ok': 0.0, 'errmsg': 'Authentication failed.', 'code': 18, 'codeName': 'AuthenticationFailed'}
[2023-12-20, 21:56:18 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=get_files_from_kaggle, task_id=mogo_sensor_task, execution_date=20231220T213000, start_date=20231220T215618, end_date=20231220T215618
[2023-12-20, 21:56:18 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 18 for task mogo_sensor_task (Authentication failed., full error: {'ok': 0.0, 'errmsg': 'Authentication failed.', 'code': 18, 'codeName': 'AuthenticationFailed'}; 367)
[2023-12-20, 21:56:18 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-12-20, 21:56:18 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

The mongo connection is working, I can connect to post and get using NoSQLBooster without a problem.

Am I passing the credentials correctly? Thank you so much, have a nice day.

Change mongodb to mongo, in connection configs, and get the same error:

pymongo.errors.ServerSelectionTimeoutError: mongo:27017: [Errno -3] Temporary failure in name resolution, Timeout: 30s, Topology Description: <TopologyDescription id: 6583642b43e3f986b4123eaf, topology_type: Unknown, servers: [<ServerDescription ('mongo', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('mongo:27017: [Errno -3] Temporary failure in name resolution')>]>
[2023-12-20, 22:01:46 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=get_files_from_kaggle, task_id=mogo_sensor_task, execution_date=20231220T213000, start_date=20231220T220115, end_date=20231220T220146
[2023-12-20, 22:01:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 19 for task mogo_sensor_task (mongo:27017: [Errno -3] Temporary failure in name resolution, Timeout: 30s, Topology Description: <TopologyDescription id: 6583642b43e3f986b4123eaf, topology_type: Unknown, servers: [<ServerDescription ('mongo', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('mongo:27017: [Errno -3] Temporary failure in name resolution')>]>; 445)
[2023-12-20, 22:01:46 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-12-20, 22:01:46 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

Hey @visus

In the Airflow connection you created, you need to put host.docker.internal as the host. This is because you are running Mongo Server on your local machine. If you notice the error it is trying to reach server “mongo” at port 27017. Mongo is just the name of your image and not your host.

Could you try pointing the connection to your docker host?

Thanks
Manmeet