A crash course on the scheduler (Celery Executor)?

In my scheduler’s log, i noticed that it keeps Sending XXX to executor with priority X and queue XXX for the same task.

This is understandable for the sensors (with mode="reschedule"). We keep sending the rescheduled sensor tasks to the queue for the workers to pick up and process. And the interval between these “Sending” logs are (almost) the same as the preset poking intervals. For operators other than sensors, for example a daily task, the scheduler will only send once a day. This is also understandable.

However, I noticed that if somehow the workers have failed to pick up the tasks for some reason, the scheduler will keep sending the tasks in a much higher frequency (in my case, every ~24 seconds). Is this part of the “heartbeat” mechanism? Or maybe this is more of a Celery (and its queue) question? Either way i’d appreciate some pointers here to explain the high volume of “Sending” during worker outage.

I think the log you are referencing to is coming from here, which dictates what the scheduler does. The heartbeating is part of the the scheduler job.

The scheduler essentially does the following…

# The actual scheduler loop. The main steps in the loop are:
    #. Harvest DAG parsing results through DagFileProcessorAgent
    #. Find and queue executable tasks
        #. Change task instance state in DB
        #. Queue tasks in executor
    #. Heartbeat executor
        #. Execute queued tasks in executor asynchronously
        #. Sync on the states of running tasks

Digging into the source code, I found that in each execute cycle the scheduler sends tasks to the executor for execution. If there are left over tasks in the executor, which for celery executors are due to busy workers, then those tasks are sent back to the scheduler with their state changed to SCHEDULED.

To answer your question more directly, it’s not because of the heartbeat but rather just the execution cycle. Tasks gets dequeued and queued over and over depending on the availability of the executor. If it does get to executor, it will output the log.

Sending %s to executor with priority %s and queue %s

This is where I’m confused why you are receiving high volumes of the same task. The first parameter is the task’s key. Are they all the same in your log? Given the try number is included in the key, I’m curious if it’s for the same try too.

thank you for the reply! to answer your question, yes, it’s the same task and same try number being sent over and over again every ~20 seconds. i’m attaching the logs here
this task is a sensor, so the log entries should be 10 mins (my poking interval) apart

@timestamp,@message
2020-08-20 00:00:01.105,"[2020-08-20 00:00:01,105] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:03:15.425,"[2020-08-20 00:03:15,425] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:03:37.479,"[2020-08-20 00:03:37,478] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:03:59.462,"[2020-08-20 00:03:59,462] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:04:21.488,"[2020-08-20 00:04:21,488] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:04:43.540,"[2020-08-20 00:04:43,539] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:05:07.538,"[2020-08-20 00:05:07,538] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:05:29.555,"[2020-08-20 00:05:29,554] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:05:53.660,"[2020-08-20 00:05:53,660] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:06:15.620,"[2020-08-20 00:06:15,620] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:06:39.618,"[2020-08-20 00:06:39,618] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:07:01.648,"[2020-08-20 00:07:01,648] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:07:23.668,"[2020-08-20 00:07:23,668] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:07:45.675,"[2020-08-20 00:07:45,675] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:08:07.701,"[2020-08-20 00:08:07,701] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:08:29.754,"[2020-08-20 00:08:29,753] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:08:53.737,"[2020-08-20 00:08:53,737] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:09:15.767,"[2020-08-20 00:09:15,767] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:09:37.824,"[2020-08-20 00:09:37,823] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:09:59.807,"[2020-08-20 00:09:59,807] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:10:21.851,"[2020-08-20 00:10:21,851] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:10:43.871,"[2020-08-20 00:10:43,871] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:11:09.893,"[2020-08-20 00:11:09,892] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:11:31.885,"[2020-08-20 00:11:31,885] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:11:53.909,"[2020-08-20 00:11:53,909] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:12:15.945,"[2020-08-20 00:12:15,945] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:12:39.956,"[2020-08-20 00:12:39,956] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:13:01.983,"[2020-08-20 00:13:01,983] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:13:24.014,"[2020-08-20 00:13:24,013] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:13:46.008,"[2020-08-20 00:13:46,008] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:14:08.032,"[2020-08-20 00:14:08,032] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:14:30.095,"[2020-08-20 00:14:30,095] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:14:52.102,"[2020-08-20 00:14:52,102] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:15:16.118,"[2020-08-20 00:15:16,118] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:15:38.132,"[2020-08-20 00:15:38,131] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:16:02.205,"[2020-08-20 00:16:02,205] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:16:26.159,"[2020-08-20 00:16:26,158] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:16:48.177,"[2020-08-20 00:16:48,177] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:17:10.203,"[2020-08-20 00:17:10,203] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:17:32.241,"[2020-08-20 00:17:32,241] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:17:54.261,"[2020-08-20 00:17:54,261] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:18:16.347,"[2020-08-20 00:18:16,347] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:18:40.285,"[2020-08-20 00:18:40,285] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:19:02.322,"[2020-08-20 00:19:02,322] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:19:24.356,"[2020-08-20 00:19:24,356] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:19:46.368,"[2020-08-20 00:19:46,368] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:20:08.382,"[2020-08-20 00:20:08,382] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:20:30.422,"[2020-08-20 00:20:30,422] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:20:56.439,"[2020-08-20 00:20:56,439] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:21:18.443,"[2020-08-20 00:21:18,443] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:21:40.465,"[2020-08-20 00:21:40,465] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:22:04.513,"[2020-08-20 00:22:04,513] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:22:26.515,"[2020-08-20 00:22:26,515] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:22:48.531,"[2020-08-20 00:22:48,530] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:23:10.579,"[2020-08-20 00:23:10,579] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:23:34.586,"[2020-08-20 00:23:34,586] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:23:56.607,"[2020-08-20 00:23:56,607] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:24:18.772,"[2020-08-20 00:24:18,772] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:24:40.793,"[2020-08-20 00:24:40,793] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:25:02.802,"[2020-08-20 00:25:02,802] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:25:26.853,"[2020-08-20 00:25:26,852] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:25:50.874,"[2020-08-20 00:25:50,872] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:26:14.880,"[2020-08-20 00:26:14,879] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:26:36.892,"[2020-08-20 00:26:36,892] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:26:58.938,"[2020-08-20 00:26:58,938] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:27:21.021,"[2020-08-20 00:27:21,020] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:27:42.963,"[2020-08-20 00:27:42,963] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:28:05.006,"[2020-08-20 00:28:05,006] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:28:27.009,"[2020-08-20 00:28:27,009] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:28:51.007,"[2020-08-20 00:28:51,006] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:29:13.030,"[2020-08-20 00:29:13,030] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:29:35.073,"[2020-08-20 00:29:35,073] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:29:57.070,"[2020-08-20 00:29:57,070] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:30:19.154,"[2020-08-20 00:30:19,154] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:30:41.138,"[2020-08-20 00:30:41,138] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:31:07.175,"[2020-08-20 00:31:07,175] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:31:29.169,"[2020-08-20 00:31:29,169] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:31:51.208,"[2020-08-20 00:31:51,207] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:32:13.227,"[2020-08-20 00:32:13,227] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:32:37.229,"[2020-08-20 00:32:37,229] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:32:59.251,"[2020-08-20 00:32:59,251] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:33:21.307,"[2020-08-20 00:33:21,307] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:33:43.305,"[2020-08-20 00:33:43,305] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:34:05.396,"[2020-08-20 00:34:05,396] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:34:29.438,"[2020-08-20 00:34:29,437] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:34:51.380,"[2020-08-20 00:34:51,379] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:35:13.400,"[2020-08-20 00:35:13,400] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:35:35.466,"[2020-08-20 00:35:35,466] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:36:01.460,"[2020-08-20 00:36:01,459] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"
2020-08-20 00:36:23.451,"[2020-08-20 00:36:23,451] {{scheduler_job.py:1175}} INFO - Sending ('my_dag_id', 'my_task_id', datetime.datetime(2020, 8, 19, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default"

This tells me that the task is always ready for execution, meaning the task state is always in the SCHEDULED state for some reason.

If you have permission, I would delete that particular task instance and see if the problem persists. You can also clear the celery queue depending on you have your Airflow setup.

The celery app name should be default unless you modified it. The queue is “default” as seen in the log.

I’m pretty much looking for ways to reset the task to see if this is reproducible.

my airflow workers/scheduler/webserver are all containers running on a few EC2 instances
i can “reproduce” this by letting the EC2 reach 100% CPU usage and then workers stop picking up tasks
i guess this is why the task is always in the SCHEDULED state?

That sounds probable. The scheduler determines the task is good to send to the Executor, outputting the message, but the workers can’t pick up the work so the tasks get dequeued.

Though we should see a message like

Set the following tasks to scheduled state: ...

Do you see something similar in your logs? It should come from scheduler_job.py too.

you are right, in the same time frame as my previous log (~36mins: 2020-08-20 00:00:01.105 to 2020-08-20 00:36:23.451), i have 559 records of

Set the following tasks to scheduled state: ...

Then I think we have adequate evidence to say that the root cause of the scheduler scheduling and dequeueing the same task over and over is because of the resource limitation, which is not necessarily a detriment just that it is resource you are bound by. You can remedy this issue with more resources/workers.

You should be to see worker usage and further solidify this explanation with flower if you have that enabled.

yes, on flower i saw many failed tasks
i guess i was confused because i knew little about the dequeuing mechanism
now i understand, thank you alan!