I have checked REST API documents for finding a solution to my problem. Does airflow v2 support REST API for backfill in a selected date range?
Hi @vanduong0504, thanks for reaching out!
Currently, there is no direct backfill API endpoint.
A workaround is possible by triggering a backfilling_dag
via the API and using dag_run.conf
to pass in parameters such as start_date
, end_date
, and dag_id
for the required backfill (dag_to_be_backfilled
).
API endpoint to use: https://airflow.apache.org/api/v1/dags/{dag_id}/dagRuns
Please see below an example DAG (note that the commented trigger_backfill
task shows how you would hard-code these parameters to perform a backfill without using the API).
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(dag_id="backfilling_dag",
start_date=pendulum.datetime(2022, 10, 1, tz="UTC"),
schedule_interval=None,
) as dag:
# Method 1: Trigger a backfill using hard-coded dates & dag_id:
# trigger_backfill = BashOperator(
# task_id="trigger_backfill",
# bash_command="airflow dags backfill --reset-dagruns -y -s 20220901 -e 20220915 forum_dag_to_be_backfilled",
# )
# Method 2: Use the REST API to trigger a backfill, passing in start/end dates and dag_id etc:
trigger_backfill = BashOperator(
task_id="trigger_backfill",
bash_command="airflow dags backfill --reset-dagruns -y -s {{ dag_run.conf['date_start'] }} -e {{ dag_run.conf['date_end'] }} {{ dag_run.conf['dag_id'] }}"
)
In order to trigger the backfilling_dag
, see example Python/Requests using localhost:
import json
import requests
dag_to_be_triggered = "backfilling_dag"
dag_to_backfill = "dag_to_be_backfilled"
url = f"http://localhost:8080/api/v1/dags/{dag_to_be_triggered}/dagRuns"
payload = json.dumps({
"conf": {
"dag_id": dag_to_backfill,
"date_start": 20221001,
"date_end": 20221005
}
})
headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic YWRtaW46YWRtaW4='
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Hope this helps!
Thanks for your help @magdagultekin, I tried your solution in a sample DAG but provided an error :
No run dates were found for the given dates and dag interval.
Here my bash comand airflow dags backfill --reset-dagruns -y -s 2022-10-01 -e 2022-10-05 sample_dag
.
I checked this problem in Github issue, sorry for my misleading, can we trigger a dag with a specific date range?
Hi @vanduong0504, could you please share some details: what is the schedule_interval
, start_date
and dag_id
of both of your DAGs - the one that should be backfilled, and the second one that should trigger the backfill?
Thanks for your quick reply @magdagultekin, here is my DAG information:
- both schedule intervals:
None
- dag_id:
- dag want to trigger with date range:
sample_dag
- dag want to trigger the backfill:
backfilling_dag
- dag want to trigger with date range:
- start date:
-
sample_dag
:2019-01-01
-
backfilling_dag
:2022-10-01
-
- command line in backfilling_dag:
airflow dags backfill --reset-dagruns -y -s 2022-10-01 -e 2022-10-05 sample_dag
Here is the log info in trigger_task
of backfilling_dag
:
[2022-10-19, 17:47:57 +07] {subprocess.py:92} INFO - [2022-10-19, 17:47:57 +07] {executor_loader.py:105} INFO - Loaded executor: CeleryExecutor
[2022-10-19, 17:47:57 +07] {subprocess.py:92} INFO - [2022-10-19, 17:47:57 +07] {backfill_job.py:791} INFO - No run dates were found for the given dates and dag interval.
[2022-10-19, 17:47:58 +07] {subprocess.py:96} INFO - Command exited with return code 0
[2022-10-19, 17:48:00 +07] {taskinstance.py:1395} INFO - Marking task as SUCCESS. dag_id=backfilling_dag, task_id=trigger_backfill, execution_date=20221019T104505, start_date=20221019T104518, end_date=20221019T104800
@vanduong0504, in this case - when your sample_dag
has schedule_interval=None
- it’s hard to say how many times that DAG should run between 2022-10-01 and 2022-10-05 (as per your bash command).
I’d suggest changing your sample_dag
as follows:
...
import pendulum
with DAG(dag_id='sample_dag',
start_date=pendulum.datetime(2022, 10, 1, tz='UTC'), # or any day you think is best
schedule_interval='@daily', # or any other preset/CRON expression
catchup=True,
...
) as dag:
...
This way your sample_dag
will run starting on 2022-01-01 and as a result, there are going to be some DAGRuns that later on you can backfill using backfilling_dag
.