Setting python variables based on execution_date

Hey Astronomers.

Quick question here.

So I have a dag that will execute 3 times daily, let’s call it 6a, 12p, 5p.
Based on the execution_date(time), I want to set some other variables (that will feed a docker container env var). For example, “meal”. If it’s 6a, meal would = ‘breakfast’.

I originally began by looking at the hour of the current time and set the other variables appropriately.
That works, unless any tasks execute after the hour changes, because that current_time variable is dynamic and those tasks would fall into the 7a hour and there is no meal associated with 7a.

So it occurred to me I should be using a static date like execution_date of the dag run. That value would not change if tasks fell into the next hour, since that is not dynamic.
I can grab it using templates at the top the main dag (no task):
EXEC_DATE = “{{ execution_date.in_timezone(‘EST’).strftime(‘%H’) }}”

If I print out EXEC_DATE it shows the right hour.

But I cannot interrogate it in order to set the “meal” variable.

I know I am missing something here.

Anyone have any ideas?

Many thanks,
Jim

Using the dag_run.logical_date or execution_date seems like the right direction. That tells you when the task was intended to run.

You could have a PythonOperator or some type of task like that that takes the logical_date and maps to the output you’d like with a dictionary or series of if/else. Then you can use the XCom of that task in your env var downstream, more easily. You may even be able to do that directly in a Jinja template on the downstream task - to map the logical_date to a value in a dictionary.

Thanks, fritz-astronomer!

I looked at both of those options and they both gave me the datetime of the previous dag run, not the currently running one.

I found an elegant solution I think you’ll appreciate.

I inserted a “start” dummyoperator task, then in subsequent tasks in the currently running dag run, I can refer back to that start task as such.

kwargs["dag_run"].get_task_instance('start').start_date

I really appreciate your reply. Thank you.
Jim

Make sure to remember - DAG Runs in Airflow start at the end of their schedule window.

If you have something scheduled @hourly, and it’s supposed to run at 12:00, you’ll see the 12:00 run begin at 12:59:59.

Exactly.

What I needed to get was the starttime of the currently running dag.

In a perfect world, I could have just used the current datetime.

But the issue there was that that changed with every ingest scan. So tasks at the end were referencing a different datetime value than tasks at the beginning. With the start task method, each tasks reflects the actual start time of this dag run.

You’re spot on in your answer. I was just looking for the actual start time of this dag. I could have probably used the {{ data_interval_end }} to align with your point.

Jim