Hide AWS credentials from logs

Is there a way to hide AWS credentials (in particular the secret access key) from the logs for the RedshiftToS3Transfer and S3ToRedshiftTransfer operators?

Hi @borismo! I synced up with our team today to chat about this. In short - we agree that we don’t want that happening. Direct response from one of our team’s engineers:

I do see logging way down in the DbApiHook.run() function.

That’s an airflow issue we should all talk about. For now, I did just build an IAM-based operator (below). If you make a Redshift-S3 IAM role, you can use that role ARN here. You can drop it into your plugins, then you’ll be able to use the operator in your DAG. It accepts the same parameters as the S3toRedshiftOperator, except instead you can pass it either iam_role_arn or iam_role_name . You can store those in your environment if you like, or if you want, in an airflow variable. It’s brand new, iam_role_name is untested. From the docs, it may derive temporary creds. Otherwise, stick with the role ARN.


s3_to_redshift_transfer

from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class IAMS3ToRedshiftTransfer(BaseOperator):
    """
    Executes an COPY command to load files from s3 to Redshift

    :param role_name: reference to IAM role name
    :type role_name: str
    :param schema: reference to a specific schema in redshift database
    :type schema: str
    :param table: reference to a specific table in redshift database
    :type table: str
    :param s3_bucket: reference to a specific S3 bucket
    :type s3_bucket: str
    :param s3_key: reference to a specific S3 key
    :type s3_key: str
    :param redshift_conn_id: reference to a specific redshift database
    :type redshift_conn_id: str
    :param aws_conn_id: reference to a specific S3 connection
    :type aws_conn_id: str
    :param verify: Whether or not to verify SSL certificates for S3 connection.
        By default SSL certificates are verified.
        You can provide the following values:

        - ``False``: do not validate SSL certificates. SSL will still be used
                 (unless use_ssl is False), but SSL certificates will not be
                 verified.
        - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
                 You can specify this argument if you want to use a different
                 CA cert bundle than the one used by botocore.
    :type verify: bool or str
    :param copy_options: reference to a list of COPY options
    :type copy_options: list
    """

    template_fields=('s3_key', )

    template_ext = ()

    ui_color = '#ededed'


    @apply_defaults
    def __init__(
            self,
            schema,
            table,
            s3_bucket,
            s3_key,
            redshift_conn_id='redshift_default',
            aws_conn_id='aws_default',
            iam_role_name=None,
            iam_role_arn=None,
            verify=None,
            copy_options=tuple(),
            autocommit=False,
            parameters=None,
            *args, **kwargs):
        assert iam_role_arn or iam_role_name
        super(IAMS3ToRedshiftTransfer, self).__init__(*args, **kwargs)
        self.iam_role_name = iam_role_name
        self.iam_role_arn = iam_role_arn
        self.schema = schema
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.redshift_conn_id = redshift_conn_id
        self.aws_conn_id = aws_conn_id
        self.verify = verify
        self.copy_options = copy_options
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
        copy_options = '\n\t\t\t'.join(self.copy_options)


        if self.iam_role_arn:
            iam_role = self.iam_role_arn
        else:
            iam_role = self.s3\
                .get_client_type('iam')\
                .get_role(RoleName=self.iam_role_name)['Role']['Arn']

        cred_string = 'aws_iam_role={iam_role}'.format(iam_role=iam_role)

        copy_query = """
            COPY {schema}.{table}
            FROM 's3://{s3_bucket}/{s3_key}/'
            with credentials '{cred_string}'
            {copy_options};
        """.format(schema=self.schema,
                   table=self.table,
                   s3_bucket=self.s3_bucket,
                   s3_key=self.s3_key,
                   cred_string=cred_string,
                   copy_options=copy_options)

        self.log.info('Executing COPY command...')
        self.hook.run(copy_query, self.autocommit)
        self.log.info("COPY command complete...")

Hi @paola! Thank you for looking into this!

We already use a custom operator for S3 to Redshift transfers so we need to take the time to merge the operator you just shared with ours. Also, that would change a few things on the IAM side that we need to be careful about.

Are you guys planning to propose an improvement to Airflow?

Cheers,

Boris

Hi @borismo! Yes, absolutely a fix we want to build and contribute back to Airflow. We have an internal GitHub issue on our side logged for this but we’ll post an update in here with the link to the Airflow JIRA ticket as soon as we create it and of course the PR from there.

Let us know if you have any other questions in the meantime - appreciate it!

1 Like