반응형

0. 개요

AWS 장기 자격 증명으로 액세스키를 사용할 수 있습니다. 액세스키를 사용하는것을 권장하지 않으나, 사용자 환경에 따라 불가피하게 사용이 필요한 경우가 있을 것으로 생각됩니다.

 

자격증명을 위하여 코드 단에 액세스키를 하드코딩 하지 않고, Secrets Manager에 액세스키를 저장하여 API 호출로 자격증명을 할 뿐 아니라 일정 기간이 지나면 액세스키를 교체하는 방안을 소개하겠습니다.

 

1. 액세스 키 생성

액세스 키는 생성되어 있으시다고 가정하겠습니다. 액세스 키 생성 관련해서는 다음 문서를 참고해주세요.

IAM 사용자의 액세스 키 관리 -  https://docs.aws.amazon.com/ko_kr/IAM/latest/UserGuide/id_credentials_access-keys.html?icmpid=docs_iam_console

 

2. Secret 생성

액세스 키를 Secrets Manager에 저장할 수 있도록 Secret을 생성합니다. Secret 생성 방안은 다음 문서를 참고 바랍니다.

AWS Secrets Manager 보안 암호 생성 - https://docs.aws.amazon.com/ko_kr/secretsmanager/latest/userguide/create_secret.html

 

3. Lambda Function 생성

일정 기간 경과 시 액세스 키를 자동으로 교체할 수 있도록 Lambda 함수를 생성합니다.

import boto3
import json
import logging
import os
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    arn = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
    # Setup the client
    secretsmanager_client = boto3.client('secretsmanager')
    # Make sure the version is staged correctly
    metadata = secretsmanager_client.describe_secret(SecretId=arn)
    logging.info(repr(metadata))
    versions = metadata['VersionIdsToStages']
    if token not in versions:
        logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
        raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
    if "AWSCURRENT" in versions[token]:
        logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
        return
    elif "AWSPENDING" not in versions[token]:
        logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
        raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
    if step == "createSecret":
        logging.debug("createSecret %s" % arn)
        logging.info("for IAM user access keys secret creation is handled by IAM ")
    elif step == "setSecret":
        logging.debug("setSecret %s" % arn)
        current_dict = get_secret_dict(secretsmanager_client, arn, "AWSCURRENT", required_fields=['username'])
        username = current_dict['username']
        master_dict = get_secret_dict(secretsmanager_client, current_dict['masterarn'], "AWSCURRENT")
        master_iam_client = boto3.client('iam', aws_access_key_id=master_dict['accesskey'], aws_secret_access_key=master_dict['secretkey'])
        # load any pre-existing access keys. sorted by created descending. if the count is 2+ remove the oldest key
        existing_access_keys = sorted(master_iam_client.list_access_keys(UserName=username)['AccessKeyMetadata'], key=lambda x: x['CreateDate'])
        if len(existing_access_keys) >= 2:
            logger.info("at least 2 access keys already exist. deleting the oldest version: %s" % existing_access_keys[0]['AccessKeyId'])
            master_iam_client.delete_access_key(UserName=username, AccessKeyId=existing_access_keys[0]['AccessKeyId'])
        # request new access key and gather the response
        new_access_key = master_iam_client.create_access_key(UserName=username)
        current_dict['accesskey'] = new_access_key['AccessKey']['AccessKeyId']
        current_dict['secretkey'] = new_access_key['AccessKey']['SecretAccessKey']
        logging.info('applying new secret value to AWSPENDING')
        # save the new access key to the pending secret
        secretsmanager_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING'])
    elif step == "testSecret":
        logging.debug("testSecret %s" % arn)
        # load the pending secret for testing
        pending_dict = get_secret_dict(secretsmanager_client, arn, "AWSPENDING", required_fields=['username'], token = token)
        # attempt to call an iam service using the credentials
        test_client = boto3.client('iam', aws_access_key_id=pending_dict['accesskey'], aws_secret_access_key=pending_dict['secretkey'])
        try:
            test_client.get_account_authorization_details()
        except test_client.exceptions.ClientError as e:
            # the test fails if and only if Authentication fails. Authorization failures are acceptable.
            if e.response['Error']['Code'] == 'AuthFailure':
                logging.error("Pending IAM secret %s in rotation %s failed the test to authenticate. exception: %s" % (arn, pending_dict['username'], repr(e)))
                raise ValueError("Pending IAM secret %s in rotation %s failed the test to authenticate. exception: %s" % (arn, pending_dict['username'], repr(e)))
    elif step == "finishSecret":
        logging.debug("finishSecret %s" % arn)
        # finalize the rotation process by marking the secret version passed in as the AWSCURRENT secret.
        metadata = secretsmanager_client.describe_secret(SecretId=arn)
        current_version = None
        for version in metadata["VersionIdsToStages"]:
            if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
                if version == token:
                    # The correct version is already marked as current, return
                    logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
                    return
                current_version = version
                break
        # finalize by staging the secret version current
        secretsmanager_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
        logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (token, arn))
    else:
        raise ValueError("Invalid step parameter")


def get_secret_dict(secretsmanager_client, arn, stage, required_fields=[], token=None):
    # Only do VersionId validation against the stage if a token is passed in
    if token:
        secret = secretsmanager_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
    else:
        secret = secretsmanager_client.get_secret_value(SecretId=arn, VersionStage=stage)
    plaintext = secret['SecretString']
    secret_dict = json.loads(plaintext)
    # Run validations against the secret
    for field in required_fields:
        if field not in secret_dict:
            raise KeyError("%s key is missing from secret JSON" % field)
    # Parse and return the secret JSON string
    return secret_dict

 

4. Secret Automation rotaion 설정

 

위에서 생성한 "alice_accesskey" lambda 함수를 사용하여 secret이 3개월마다 자동으로 변경되도록 설정합니다.

 

5. Secret 즉시 교체

정상적으로 구현이 되었는지 확인하기 위해 Secret을 즉시 교체 해보도록 하겠습니다.

1에서 볼 수 있는거와 같이 Alice의 액세스 키는 ~6V입니다. Secret 즉시 교체 후 액세스 키가 ~SS로 변경된 것을 볼 수 있습니다.

 

그렇다면, IAM에서도 정상적으로 변경되었는지 확인해보겠습니다.

이제, 코드 단에 액세스 키를 하드코딩 하지 않고, Secrets를 API로 불러와 사용하실 수 있습니다.

반응형