Hey Astronomers.
Quick question here.
So I have a dag that will execute 3 times daily, let’s call it 6a, 12p, 5p.
Based on the execution_date(time), I want to set some other variables (that will feed a docker container env var). For example, “meal”. If it’s 6a, meal would = ‘breakfast’.
I originally began by looking at the hour of the current time and set the other variables appropriately.
That works, unless any tasks execute after the hour changes, because that current_time variable is dynamic and those tasks would fall into the 7a hour and there is no meal associated with 7a.
So it occurred to me I should be using a static date like execution_date of the dag run. That value would not change if tasks fell into the next hour, since that is not dynamic.
I can grab it using templates at the top the main dag (no task):
EXEC_DATE = “{{ execution_date.in_timezone(‘EST’).strftime(‘%H’) }}”
If I print out EXEC_DATE it shows the right hour.
But I cannot interrogate it in order to set the “meal” variable.
I know I am missing something here.
Anyone have any ideas?
Many thanks,
Jim
Using the dag_run.logical_date or execution_date seems like the right direction. That tells you when the task was intended to run.
You could have a PythonOperator or some type of task like that that takes the logical_date and maps to the output you’d like with a dictionary or series of if/else. Then you can use the XCom of that task in your env var downstream, more easily. You may even be able to do that directly in a Jinja template on the downstream task - to map the logical_date to a value in a dictionary.
Thanks, fritz-astronomer!
I looked at both of those options and they both gave me the datetime of the previous dag run, not the currently running one.
I found an elegant solution I think you’ll appreciate.
I inserted a “start” dummyoperator task, then in subsequent tasks in the currently running dag run, I can refer back to that start task as such.
kwargs["dag_run"].get_task_instance('start').start_date
I really appreciate your reply. Thank you.
Jim
Make sure to remember - DAG Runs in Airflow start at the end of their schedule window.
If you have something scheduled @hourly
, and it’s supposed to run at 12:00
, you’ll see the 12:00
run begin at 12:59:59
.
Exactly.
What I needed to get was the starttime of the currently running dag.
In a perfect world, I could have just used the current datetime.
But the issue there was that that changed with every ingest scan. So tasks at the end were referencing a different datetime value than tasks at the beginning. With the start task method, each tasks reflects the actual start time of this dag run.
You’re spot on in your answer. I was just looking for the actual start time of this dag. I could have probably used the {{ data_interval_end }} to align with your point.
Jim