I have an Airflow Dag which pulls in data from external HR System which has employee data for companies . In our etl we need to pull data for all companies.
Now our dag looks like this today
- Get List of Companies
- Get list of Employee for each company
- Get information about each employee
- Combine the information and push to s3
Is there a way where we can run the task of getting information about employee in parallel instead of sequential. Someway to generate the parallel tasks based on result previous task
You can definitely generate tasks within a DAG dynamically based on input data. See https://www.astronomer.io/guides/managing-dependencies/.
What type of operator are you using to get the list of companies? I’m still an Airflow beginner, but it seems like you could either:
- Pass the list of companies to the operator for task #2 via XCom (assuming it’s a small amount of data)
- Persist the list of companies to a shared data store and then read the list of companies in task #2.
Once task #2 has the list of companies, it’s easy to create a task for each company.
To get the companies we use a Custom Operator which internally calls an API to get list of companies .
So would this process still work , what would be the shared storage we use ?
Hello,
It sounds like you are trying to fanout your tasks. So for example, Task A runs, and based on the results of that, you create n Task Bs to run in parallel, then when they all complete, Task C runs. For each DAG run, the number of tasks Bs is different and is based on the result of Task A for that particular DAG run. If this is correct, unfortunately this is not currently supported by Airflow. The DAG structure cannot dynamically change within a DAG Run. I believe this is a feature Airflow is looking to include in a future release.
Hi, @AndrewHarmon . What would happen if dag run #1 had 3 companies and thus 3 tasks for step 2 and then on a subsequent dag run there were 4 companies and thus 4 tasks for step 2?
Just trying to get a better handle on how the components interact.
you could make your dag file dynamic. So if you had a cofig file, env var or airflow variable with the value 3 in it, you could use that in a loop in your dag file to create 3 similar tasks, 1 for each company. Then sometime between DAG run 1 and 2, your edited that value to 4, your dag would instantly reflect that and have 4 similar tasks when DAG Run 2 starts. And this pattern is supported.
Thanks, @AndrewHarmon. What’s the major difference between what @twylabs-ext is trying to do and that pattern? Is it simply that the variable is defined in a “static” location (e.g. a config file, env var, or Airflow variable) as opposed to a database table?
i think @twylabs-ext wants to change the number of tasks during the DAG Run. So while the dag is actually running, edit the number of tasks in the dag. That can’t be done. But you can edit the number of tasks between DAG runs.
@AndrewHarmon
Can i not do something like this
- Fetch a list a companies from API in Task 1
- Depending on the number above say 100 , we generate 100 identical tasks in parallel, each to fetch some data for individual company and then store in s3
Check the attached diagram.
@twylabs-ext Yes, that should work as long as (as @AndrewHarmon mentioned) the number of companies returned in task #1 wont’ change within that dag run. It sounds like there should be no problem if the number of companies changes between runs.
So going back to your use case, your custom operator would call the API, get the list of companies, and the list of companies would be used by a dependent operator. As for what type of persistent storage - that might be better answered by someone else as I am still learning Airflow myself. If you’re able, could you create a temporary table in S3 tied to an ID for that specific dag run and then read the list from that table? Or maybe have a persistent working table that has a column that indicates dag id which can be used in a query by the next task?
Ya i will try that , will post back with the whatever works for me
1 Like