We had a customer email us with a question and thought it was a common use case and worth a forum post.
We have an intraday dag which runs periodically during the day (say every hour between 10am and 5pm) , and then at eod (say 6pm) another dag that does post processing on the combined data set.
Now I note that if the scheduler was down for some reason, when it starts up then both dags can be scheduled to run at the same time. I’d expect this behaviour because the scheduler knows of no dependency between the dags. What we really want to happen is for the intraday task to run and then for the eod task to be delayed until it has completed.
There are a few ways you could accomplish this, but here’s a suggestion. Add a DummyTask to the end of your intraday DAG that runs when everything else is done, so you know when it’s complete. Then use an ExternalTaskSensor at the beginning of your eod DAG and point it to the DummyTask in your intraday DAG. Set the eod DAG schedule to daily and a start date with a time of 5pm. This 5pm start time aligns with the last run of the intraday DAG. Now at 5pm, the last iteration of the intraday DAG will start as well as the eod DAG. However, the sensor in the eod DAG will wait for the dummy task to complete in the intraday DAG before letting downstream tasks in the eod DAG run.
An extension of this is we have seen cases where the scheduler was down for some reason, and when we bring it back up , every dag is started at the same time. We need a mechanism where we can stagger the start of the jobs so the system isn’t swamped.
There are a few options here as well to limit the number of DAGs running in total on your deployment or per DAG. See the chart below for global settings you can set in Airflow to limit concurrency. You may want to set max_active_runs_per_dag
to 2 so that no more than 2 DAG runs for the same DAG will be started.
Additionally you can change the max_active_runs
setting on an individual DAG if you want to limit the number of concurrent DAG runs for a specific DAG. This is done when declaring your DAG in your python file. See here.
airflow.cfg name | Environment Variable | Default Value |
parallelism | AIRFLOW__CORE__PARALLELISM | 32 |
dag_concurrency | AIRFLOW__CORE__DAG_CONCURRENCY | 16 |
worker_concurrency | AIRFLOW__CELERY__WORKER_CONCURRENCY | 16 |
max_active_runs_per_dag | AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG | 16 |
max_threads | AIRFLOW__SCHEDULER__MAX_THREADS | 2 |
parallelism is the max number of task instances that can run concurrently on airflow. This means that across all running DAGs, no more than 32 tasks will run at one time.
dag_concurrency is the number of task instances allowed to run concurrently within a specific dag. In other words, you could have 2 DAGs running 16 tasks each in parallel, but a single DAG with 50 tasks would also only run 16 tasks - not 32
worker_concurrency is related, but it determines how many tasks a single celery worker can process. So, if you have 4 workers running at a worker concurrency of 16, you could process up to 16x4=64 tasks at once. Configured with the defaults above, however, only 32 would actually run in parallel. (and only 16 if all tasks are in the same DAG)
max_active_runs_per_dag The maximum number of active DAG runs per DAG. So if you start a DAG with catchup=True
, it will start to schedule lot’s of DAG runs concurrently up to this limit.
Thanks for the great question! Would love to hear anyone else’s feedback or thoughts.