Backfill with rest api

I have checked REST API documents for finding a solution to my problem. Does airflow v2 support REST API for backfill in a selected date range?

Hi @vanduong0504, thanks for reaching out!

Currently, there is no direct backfill API endpoint.

A workaround is possible by triggering a backfilling_dag via the API and using dag_run.conf to pass in parameters such as start_date, end_date, and dag_id for the required backfill (dag_to_be_backfilled).

API endpoint to use: https://airflow.apache.org/api/v1/dags/{dag_id}/dagRuns

Please see below an example DAG (note that the commented trigger_backfill task shows how you would hard-code these parameters to perform a backfill without using the API).

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(dag_id="backfilling_dag",
         start_date=pendulum.datetime(2022, 10, 1, tz="UTC"),
         schedule_interval=None,
         ) as dag:

    # Method 1: Trigger a backfill using hard-coded dates & dag_id:
    # trigger_backfill = BashOperator(
    #     task_id="trigger_backfill",
    #     bash_command="airflow dags backfill --reset-dagruns -y -s 20220901 -e 20220915 forum_dag_to_be_backfilled",
    # )

    # Method 2: Use the REST API to trigger a backfill, passing in start/end dates and dag_id etc:
    trigger_backfill = BashOperator(
        task_id="trigger_backfill",
        bash_command="airflow dags backfill --reset-dagruns -y -s {{ dag_run.conf['date_start'] }} -e {{ dag_run.conf['date_end'] }} {{ dag_run.conf['dag_id'] }}"
    )

In order to trigger the backfilling_dag, see example Python/Requests using localhost:

import json

import requests

dag_to_be_triggered = "backfilling_dag"
dag_to_backfill = "dag_to_be_backfilled"
url = f"http://localhost:8080/api/v1/dags/{dag_to_be_triggered}/dagRuns"

payload = json.dumps({
  "conf": {
    "dag_id": dag_to_backfill,
    "date_start": 20221001,
    "date_end": 20221005
  }
})
headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Basic YWRtaW46YWRtaW4='
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

Hope this helps!

Thanks for your help @magdagultekin, I tried your solution in a sample DAG but provided an error :
No run dates were found for the given dates and dag interval.
Here my bash comand airflow dags backfill --reset-dagruns -y -s 2022-10-01 -e 2022-10-05 sample_dag.
I checked this problem in Github issue, sorry for my misleading, can we trigger a dag with a specific date range?

Hi @vanduong0504, could you please share some details: what is the schedule_interval, start_date and dag_id of both of your DAGs - the one that should be backfilled, and the second one that should trigger the backfill?

Thanks for your quick reply @magdagultekin, here is my DAG information:

  • both schedule intervals: None
  • dag_id:
    • dag want to trigger with date range: sample_dag
    • dag want to trigger the backfill: backfilling_dag
  • start date:
    • sample_dag: 2019-01-01
    • backfilling_dag: 2022-10-01
  • command line in backfilling_dag: airflow dags backfill --reset-dagruns -y -s 2022-10-01 -e 2022-10-05 sample_dag

Here is the log info in trigger_task of backfilling_dag:

[2022-10-19, 17:47:57 +07] {subprocess.py:92} INFO - [2022-10-19, 17:47:57 +07] {executor_loader.py:105} INFO - Loaded executor: CeleryExecutor
[2022-10-19, 17:47:57 +07] {subprocess.py:92} INFO - [2022-10-19, 17:47:57 +07] {backfill_job.py:791} INFO - No run dates were found for the given dates and dag interval.
[2022-10-19, 17:47:58 +07] {subprocess.py:96} INFO - Command exited with return code 0
[2022-10-19, 17:48:00 +07] {taskinstance.py:1395} INFO - Marking task as SUCCESS. dag_id=backfilling_dag, task_id=trigger_backfill, execution_date=20221019T104505, start_date=20221019T104518, end_date=20221019T104800

@vanduong0504, in this case - when your sample_dag has schedule_interval=None - it’s hard to say how many times that DAG should run between 2022-10-01 and 2022-10-05 (as per your bash command).

I’d suggest changing your sample_dag as follows:

...
import pendulum

with DAG(dag_id='sample_dag',
         start_date=pendulum.datetime(2022, 10, 1, tz='UTC'),  # or any day you think is best
         schedule_interval='@daily',  # or any other preset/CRON expression
         catchup=True,
         ...
         ) as dag:
...

This way your sample_dag will run starting on 2022-01-01 and as a result, there are going to be some DAGRuns that later on you can backfill using backfilling_dag.