I just configured airflow.cfg
to work with LocalExecutor
:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
sql_engine_encoding = utf-8
after initializing the DB , I run the following DAG:
with DAG(dag_id='a_example_parallel_v', schedule_interval=None, start_date=days_ago(2),) as dag:
def task1_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 1")
def task2_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 2")
def task3_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 3")
def task4_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 4")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task3 = PythonOperator(task_id='task3', python_callable=task3_func, provide_context=True)
task4 = PythonOperator(task_id='task4', python_callable=task4_func, provide_context=True)
task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4
- I checked the logs of task 2 & task 3 and they printed the same PID.
- If task 2 & task 3 have same PID, it means that they are not running in parallel.
I tried to changed:
execute_tasks_new_python_interpreter
to True (and got same results)
How can I configure airflow, in order that task 2 and task 3 run in parallel ?