I am new to airflow and I have a small requirement for which I am looking for help.
To put my requirement in simplest form,
I have a code in which I use variable like
Input_Bucket_name = “Test-abcdefghi”
And the code creates the bucket using airflow code and CreateS3BucketOperator.
What I want is → I want to take this parameter of bucket name inside S3CreateBucketOperator from a file/object in S3.
I know there are other ways, but what I am looking at is–> can I take this parameter from a file stored in s3(A json file with key as “Input_bucket_name” and value as mentioned)
I have many other hard codings in my existing code, which I want to remove.
One idea could be to create a Python function to read from the S3 object and create a Variable with the bucket name using the Variable.set()method. Then, add a task to your DAG that uses the PythonOperator to call this function.
def read_bucket_name_from_s3():
...
some logic to read from S3
...
Variable.set(key="bucket_name_variable", value=bucket_name_value)
get_bucket_name = PythonOperator(
task_id="get_bucket_name",
python_callable=read_bucket_name_from_s3,
dag=dag
)
The Variable created for bucket_name in the S3CreateBucketOperator task can be referenced via Jinja templating (see below) since bucket_name is a templated fields. More on Jinja templating to access Variableshere.
A second idea - since you mentioned XComs - is to still create that function mentioned above, return the value of the bucket_name from the function but then access the bucket_name as an XCom via Jinja as "{{ ti.xcom_pull(task_ids=get_bucket_name) }}". This would eliminate the need to persist an Airflow Variable.