Currently I am an experiencing a technical issue with Airflow on Astronomer.
Im trying to run inside a PyhtonVirtualenvOperator (I have also tried PythonOperator) a function that triggers an aws lambda that runs around 15 minutes
dag_name:str = "long"
with DAG(dag_name, default_args=default_args, schedule_interval=timedelta(1), max_active_runs=3) as dag:
LAMBDA:str = "long-running"
PAYLOAD = {}
init = PythonVirtualenvOperator(task_id=f"long-running-lambda",requirements=["boto3==1.12.20",
"botocore==1.15.20",
"docutils==0.15.2",
"jmespath==0.9.5",
"python-dateutil==2.8.1",
"s3transfer==0.3.3",
"six==1.14.0",
"urllib3==1.25.8"],python_version=3.7,system_site_packages=False,python_callable=execute_lambda, op_kwargs={
"function_name": LAMBDA, "payload": PAYLOAD})
The callback looks like this:
def execute_lambda(function_name: str, payload):
from botocore.config import Config
import botocore
import boto3
print(boto3.__version__)
print(botocore.__version__)
#from airflow.models import Variable
import os
import json
import logging
lambda_client = boto3.client('lambda',aws_access_key_id='AWS_ACCESS_KEY',aws_secret_access_key="AWS_SECRET_ACCESS_KEY",region_name="eu-west-1",config=Config(read_timeout=900,connect_timeout=900,retries={'max_attempts':0}))
#logger.debug(payload)
payload_str: str = json.dumps(payload)
#json.dump(payload)
#logger.debug(type(payload))
res = lambda_client.invoke(
FunctionName=function_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=payload_str
)
print("Finished job")
After 5 minutes this will return with error: botocore.exceptions.ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL: āhttps://lambda.eu-west-1.amazonaws.com/2015-03-31/functions/long-running/invocationsā.\nā
When I tried this exact same thing with locally running airflow without docker and astronomer this operator is able to finish successfully.
Best
Andres Namm