0. 개요
AWS 장기 자격 증명으로 액세스키를 사용할 수 있습니다. 액세스키를 사용하는것을 권장하지 않으나, 사용자 환경에 따라 불가피하게 사용이 필요한 경우가 있을 것으로 생각됩니다.
자격증명을 위하여 코드 단에 액세스키를 하드코딩 하지 않고, Secrets Manager에 액세스키를 저장하여 API 호출로 자격증명을 할 뿐 아니라 일정 기간이 지나면 액세스키를 교체하는 방안을 소개하겠습니다.
1. 액세스 키 생성
액세스 키는 생성되어 있으시다고 가정하겠습니다. 액세스 키 생성 관련해서는 다음 문서를 참고해주세요.
IAM 사용자의 액세스 키 관리 - https://docs.aws.amazon.com/ko_kr/IAM/latest/UserGuide/id_credentials_access-keys.html?icmpid=docs_iam_console
2. Secret 생성
액세스 키를 Secrets Manager에 저장할 수 있도록 Secret을 생성합니다. Secret 생성 방안은 다음 문서를 참고 바랍니다.
AWS Secrets Manager 보안 암호 생성 - https://docs.aws.amazon.com/ko_kr/secretsmanager/latest/userguide/create_secret.html
3. Lambda Function 생성
일정 기간 경과 시 액세스 키를 자동으로 교체할 수 있도록 Lambda 함수를 생성합니다.
import boto3
import json
import logging
import os
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Setup the client
secretsmanager_client = boto3.client('secretsmanager')
# Make sure the version is staged correctly
metadata = secretsmanager_client.describe_secret(SecretId=arn)
logging.info(repr(metadata))
versions = metadata['VersionIdsToStages']
if token not in versions:
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
if "AWSCURRENT" in versions[token]:
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
elif "AWSPENDING" not in versions[token]:
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
if step == "createSecret":
logging.debug("createSecret %s" % arn)
logging.info("for IAM user access keys secret creation is handled by IAM ")
elif step == "setSecret":
logging.debug("setSecret %s" % arn)
current_dict = get_secret_dict(secretsmanager_client, arn, "AWSCURRENT", required_fields=['username'])
username = current_dict['username']
master_dict = get_secret_dict(secretsmanager_client, current_dict['masterarn'], "AWSCURRENT")
master_iam_client = boto3.client('iam', aws_access_key_id=master_dict['accesskey'], aws_secret_access_key=master_dict['secretkey'])
# load any pre-existing access keys. sorted by created descending. if the count is 2+ remove the oldest key
existing_access_keys = sorted(master_iam_client.list_access_keys(UserName=username)['AccessKeyMetadata'], key=lambda x: x['CreateDate'])
if len(existing_access_keys) >= 2:
logger.info("at least 2 access keys already exist. deleting the oldest version: %s" % existing_access_keys[0]['AccessKeyId'])
master_iam_client.delete_access_key(UserName=username, AccessKeyId=existing_access_keys[0]['AccessKeyId'])
# request new access key and gather the response
new_access_key = master_iam_client.create_access_key(UserName=username)
current_dict['accesskey'] = new_access_key['AccessKey']['AccessKeyId']
current_dict['secretkey'] = new_access_key['AccessKey']['SecretAccessKey']
logging.info('applying new secret value to AWSPENDING')
# save the new access key to the pending secret
secretsmanager_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING'])
elif step == "testSecret":
logging.debug("testSecret %s" % arn)
# load the pending secret for testing
pending_dict = get_secret_dict(secretsmanager_client, arn, "AWSPENDING", required_fields=['username'], token = token)
# attempt to call an iam service using the credentials
test_client = boto3.client('iam', aws_access_key_id=pending_dict['accesskey'], aws_secret_access_key=pending_dict['secretkey'])
try:
test_client.get_account_authorization_details()
except test_client.exceptions.ClientError as e:
# the test fails if and only if Authentication fails. Authorization failures are acceptable.
if e.response['Error']['Code'] == 'AuthFailure':
logging.error("Pending IAM secret %s in rotation %s failed the test to authenticate. exception: %s" % (arn, pending_dict['username'], repr(e)))
raise ValueError("Pending IAM secret %s in rotation %s failed the test to authenticate. exception: %s" % (arn, pending_dict['username'], repr(e)))
elif step == "finishSecret":
logging.debug("finishSecret %s" % arn)
# finalize the rotation process by marking the secret version passed in as the AWSCURRENT secret.
metadata = secretsmanager_client.describe_secret(SecretId=arn)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
return
current_version = version
break
# finalize by staging the secret version current
secretsmanager_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (token, arn))
else:
raise ValueError("Invalid step parameter")
def get_secret_dict(secretsmanager_client, arn, stage, required_fields=[], token=None):
# Only do VersionId validation against the stage if a token is passed in
if token:
secret = secretsmanager_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
else:
secret = secretsmanager_client.get_secret_value(SecretId=arn, VersionStage=stage)
plaintext = secret['SecretString']
secret_dict = json.loads(plaintext)
# Run validations against the secret
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
# Parse and return the secret JSON string
return secret_dict
4. Secret Automation rotaion 설정
위에서 생성한 "alice_accesskey" lambda 함수를 사용하여 secret이 3개월마다 자동으로 변경되도록 설정합니다.
5. Secret 즉시 교체
정상적으로 구현이 되었는지 확인하기 위해 Secret을 즉시 교체 해보도록 하겠습니다.
1에서 볼 수 있는거와 같이 Alice의 액세스 키는 ~6V입니다. Secret 즉시 교체 후 액세스 키가 ~SS로 변경된 것을 볼 수 있습니다.
그렇다면, IAM에서도 정상적으로 변경되었는지 확인해보겠습니다.
이제, 코드 단에 액세스 키를 하드코딩 하지 않고, Secrets를 API로 불러와 사용하실 수 있습니다.
'AWS' 카테고리의 다른 글
[AWS] KMS symmetric key import (0) | 2023.06.27 |
---|---|
[AWS] Springboot에 Secrets Manager 연동하기 (0) | 2023.06.21 |
[AWS] AWS SSO SAML with Azure (AWS IAM Identity Center) (0) | 2023.05.30 |
[AWS] Secrets Manager 강제 삭제 (0) | 2023.02.01 |
[AWS] EC2 인스턴스 Tag-based 관리 방법 - 2 (0) | 2023.01.29 |