Here is the requirement:
As part of my DAG, I am using S3KeySensor inorder to check for specific key in S3 path. When I specified bucket_name and bucket_key for one path, it is working fine. But we have requirement to check for multiple S3 Keys before starting the task. Here is the example:
- Check for existence of Keys in multiple paths like : s3://bucket-1/data/table-1/_SUCCESS and s3://bucket-1/data/table-2/_SUCCESS.
- Requirement is to start the next TASK if the files _SUCCESS are available in above mentioned paths. Created the DAG as below:
success_flg_check = S3KeySensor(
task_id='success_flg_check',
bucket_name=None,
bucket_key=['s3://bucket-1/data/table-1/_SUCCESS','s3://bucket-1/data/table-2/_SUCCESS'],
aws_conn_id='aws_default',
mode='poke',
poke_interval=5,
timeout=30,
soft_fail=False
)
It is getting failed with error : File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/sensors/s3_key.py", line 88, in poke parsed_url = urlparse(self.bucket_key) File "/usr/lib64/python3.7/urllib/parse.py", line 367, in urlparse url, scheme, _coerce_result = _coerce_args(url, scheme) File "/usr/lib64/python3.7/urllib/parse.py", line 123, in _coerce_args return _decode_args(args) + (_encode_result,) File "/usr/lib64/python3.7/urllib/parse.py", line 107, in _decode_args return tuple(x.decode(encoding, errors) if x else '' for x in args) File "/usr/lib64/python3.7/urllib/parse.py", line 107, in <genexpr> return tuple(x.decode(encoding, errors) if x else '' for x in args) AttributeError: 'list' object has no attribute 'decode'
Need help in defining DAG on how to check for mutiple S3 Keys under S3KeySensor before starting next task.
Hi @vishnusarat.t, thanks for reaching out!
I wasn’t able to reproduce your error - I used the code snippet you shared and changed the paths:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
...
success_flg_check = S3KeySensor(
task_id='success_flg_check',
bucket_name=None,
bucket_key=[
's3://astro-onboarding/iris/processed-input-data/test.csv',
's3://airflow-success/jira/sol/issues.csv',
],
aws_conn_id='aws_default',
mode='poke',
poke_interval=5,
timeout=30,
soft_fail=False
)
In the logs I can see:
[2022-08-18, 08:35:38 UTC] {s3.py:104} INFO - Poking for key : s3://astro-onboarding/iris/processed-input-data/test.csv
[2022-08-18, 08:35:38 UTC] {base.py:68} INFO - Using connection ID 'aws_default' for task execution.
[2022-08-18, 08:35:38 UTC] {base_aws.py:210} INFO - Credentials retrieved from extra_config
[2022-08-18, 08:35:38 UTC] {base_aws.py:100} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-08-18, 08:35:39 UTC] {s3.py:104} INFO - Poking for key : s3://airflow-success/jira/sol/issues.csv
[2022-08-18, 08:35:40 UTC] {base.py:301} INFO - Success criteria met. Exiting.
My guess is there’s something wrong in the paths you passed in the bucket_key
- assuming the ones provided above are dummies.
@magdagultekin Thanks for looking into this and provide your inputs.
Yes, the paths provided are the dummies. But in my case actual paths are there and files are also there.
Also, I am using Airflow 2.2 version(part AWS managed airflow). Here is the import I am using : from airflow.providers.amazon.aws.sensors.s3_key import S3KeySenor
. but I see your import statement is different. Can you please suggest which one to use?
@vishnusarat.t, great place to check the import statements (and much more!) is Astronomer Registry.
Please use the import I provided above as the one you used has been deprecated (see Airflow docs).