Hi every one,
I have my DAG on airflow 2.6.2 as:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
Define default arguments for the DAG
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2023, 8, 2),
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
Create the DAG
dag = DAG(
‘my_pyspark_job_dag’,
default_args=default_args,
schedule_interval=‘@daily’, # Set your desired schedule here
catchup=False # Set to False if you don’t want to backfill past runs
)
Define the SparkSubmitOperator to run your PySpark script
spark_submit_task = SparkSubmitOperator(
task_id=‘run_pyspark_job’,
application=‘/home/test/test_dag_pyspark.py’, # t
conn_id=‘spark_conn’, # Connection ID to the Spark cluster defined in Airflow UI
dag=dag,
)
spark_conn: I created on airflowweb => my spark server (included pyspark), not the same server with airflow.
I tested /home/test/test_dag_pyspark.py on my spark server, it is done. now I want to try run it on airflow but when I run dag it shows
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named ‘airflow.providers.apache’
But when i check in my airflow-webserver container with statement:
pip freeze | grep apache-airflow-providers-apache-spark → and it shows:
apache-airflow-providers-apache-spark==4.1.0
I don’t understand my problem. Can anyone help me to solve that? Thank you in advance.