AWSアカウントやAWS IAM周りの管理って大変だ
AWSアカウントやAWS IAM周りの管理って、AWS Organizationsのようなサービスを使用していようがいなかろうが結構大変ですよね。特に何が大変かって、長期間ログインしていないIAMユーザーの棚卸から始まって、MFA認証の有無やらパスワードポリシーの管理やら何やら。
弊社では、マスターのAWSアカウント上でのみIAMユーザーを作成し、各プロジェクト用のAWSアカウントに対してはスイッチロールのみでアクセスできるようにしているのですけれども、やはりみんながみんな恒常的にAWSマネージメントコンソールにアクセスするとは限らず、プロジェクトが安定運用に入ってくると、アクセスするIAMユーザーの数も限られてきます。そうなると、やはり一定期間ごとに、そのAWSアカウントにアクセス可能なIAMユーザーの棚卸をすることがセキュリティ的にも必要になってくるわけです。
これを手作業でいちいち調べていると当然結構な労力になってくるわけで、その労力を軽減するためにも、とりあえず長期間スイッチロールそのものを行っていないIAMユーザーを探し出すための仕組みを考えて実装してみました。図でそのワークフローを書くとこんな感じになります。
ワークフロー
ここでは2本のワークフローを組みます。
- IAMユーザーがスイッチロールしたイベントはAWS CloudTrailで自動的に検知されるため、証跡ログがAmazon S3バケットに格納された時点で、PUTイベントをトリガーとしてAWS Lambda関数を呼び出して、Amazon DynamoDBにその履歴を格納するワークフロー
- Amazon EventBridgeのcron式をトリガーとしてAWS Lambda関数を呼び出して長期間スイッチロールしていないIAMユーザーを割り出し、Slack Incoming Webhookの仕組みを使用してSlackの特定のチャンネルにIAMユーザーのリストを通知するワークフロー
DynamoDBに作成するテーブルはこんな感じです。
aws dynamodb create-table \
--table-name SwitchRoleHistory \
--attribute-definitions \
AttributeName=eventID, AttributeType=S \
AttributeName=eventTime, AttributeType=S \
--key-schema AttributeName=eventID, KeyType=HASH AttributeName=eventTime, KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=1, WriteCapacityUnits=1
少なくともeventTime属性はソートキーとして機能できるようにしておきます。eventID属性はあくまでもプライマリーキーとしてのみ機能するという感じです。ここではRCUとWCUの値を最小値にしていますが、スイッチロールが発生する頻度によって調整をかけた方がいいかもしれません。
1番目のワークフローのLambda関数
1番目のワークフローのLambda関数はこんな感じで作成してみました。ちょっと一部だけ、実際に動作しているものからは改変しています。
# --------------------------------------------------
# function name:
# saving-record-to-dynamodb-from-cloudtrail
# usecase:
# detect to SwitchRole user from Amazon CloudTrail logs and
# put into Amazon DynamoDB Table.
import json
from operator import truediv
import urllib.parse
import boto3
import zlib
import logging
import traceback
import os
import re
AWS_ACCOUNT = os.environ.get('AWS_ACCOUNT')
DYNAMODB_TABLE = os.environ.get('DYNAMODB_TABLE')
EVENT_NAME = os.environ.get('EVENT_NAME')
CROSS_ACCOUNT_ROLE_ARN = 'arn:aws:sts::' + AWS_ACCOUNT + ':assumed-role/'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3Bucket = boto3.resource('s3')
db = boto3.resource('dynamodb')
tableName = db.Table(DYNAMODB_TABLE)
def parseS3CloudTrail(bucket_name, key):
s3Object = s3Bucket.Object(bucket_name, key)
# Decomplessing Objects in S3 Bucket
payload_gz = s3Object.get()['Body'].read()
payload = zlib.decompress(payload_gz, 16 + zlib.MAX_WBITS)
payload_json = json.loads(payload)
# To get AssumedRole information and put into DynamoDB
for record in payload_json['Records']:
logger.info("[STEP2] eventName is: {}".format(record['eventName']))
if EVENT_NAME == str(record['eventName']):
logger.info("[STEP3-1] record is: {}".format(record))
logger.info("[STEP3-2] arn: {}".format(record['userIdentity']['arn']))
saveRecord(record)
def saveRecord(record):
logger.info("[STEP4] Saving record to DynamoDB")
arnFlg = False
# Search Arn
tempArn = record['userIdentity']['arn']
# for master AWS Account
findStr = r'_role/(.*)'
responseArn = re.search(findStr, tempArn)
arnFlg = True
logger.info("[STEP4-1] detected responseArn is: {}".format(responseArn))
# for master AWS Account
if responseArn is None:
findStr = r'user/(.*)'
responseArn = re.search(findStr, tempArn)
arnFlg = False
logger.info("[STEP4-2] detected responseArn is: {}".format(responseArn))
logger.info("[STEP5] responseArn: {}".format(responseArn))
if responseArn is not None:
responseArn = responseArn.group(1)
if arnFlg == True:
userName = responseArn
else:
userName = record['userIdentity']['userName']
# Put into DynamoDB Table
result = tableName.put_item(
Item={
'arn': responseArn,
'userName': userName,
'eventTime': record['eventTime'],
'eventID': record['eventID'],
'record': json.dumps(record)
}
)
logger.info("[STEP6] Saved record to DynamoDB")
def lambda_handler(event, context):
logger.info("[STEP1] Loading function")
try:
# Get current S3 Bucket Name and Key
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = urllib.parse.unquote(record['s3']['object']['key'])
logger.info("[STEP2] BucketName: {}".format(bucket))
logger.info("[STEP2] Bucket Key: {}".format(key))
# Get information and put into DynamoDB Table
parseS3CloudTrail(bucket, key)
logger.info("[STEP7] Finished Function")
return {
'StatusCode': 200,
'message': 'Saving record to DynamoDB from CloudTrail is finished nomaly.'
}
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
return {
'StatusCode': 500,
'message': 'An error occured at Saving record to DynamoDB from CloudTrail.'
}
なんか、logger.infoがやたら多くない? というツッコミはさておき、AWS CloudTrailからAmazon S3バケットへのPUTイベントが走った時点で、eventオブジェクトの中から、PUTされてきた圧縮済み証跡ファイルの情報を取得した後、それをまずは解凍してJSONからarnの情報を取得して、同じJSONの中にある他の情報と一緒に、最初に作成したDynamoDBのテーブルに突っ込む、という流れです。
arnの情報からのマッチングに、正規表現を用いています。マスターのAWSアカウント側では実はわざわざこんなことをする必要はないのですが、要件によって、スイッチロールされた側のAWSアカウント上でも同じ仕組みを入れたい、となった場合にも最低限のコード修正で対応できるように、わざわざarnの情報から正規表現を用いてマッチングをかけてIAMユーザー名を取得しています。
2番目のワークフローのLambda関数
2番目のワークフローのLambda関数はこんな感じで作成してみました。
# --------------------------------------------------
# function name:
# check-long-time-switch-user-from-dynamodb
# usecase:
# detect to long time SwitchRole user from Amazon DynamoDB and
# send to Slack via Slack Incoming Webhook.
import json
import boto3
import logging
import traceback
import os
import operator
import datetime
import dateutil.parser
import urllib.request
DYNAMODB_TABLE = os.environ.get('DYNAMODB_TABLE')
TIME_DELTA = os.environ.get('TIME_DELTA')
SLACK_WEB_HOOK_URL = os.environ.get('SLACK_WEB_HOOK_URL')
SLACK_USER_NAME = os.environ.get('SLACK_USER_NAME')
MSG_OLD_ARN_IS_NONE = os.environ.get('MSG_OLD_ARN_IS_NONE')
MSG_OLD_ARN_IS_EXIST = os.environ.get('MSG_OLD_ARN_IS_EXIST')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
db = boto3.resource('dynamodb')
table = db.Table(DYNAMODB_TABLE)
pjExp = "#id, eventTime, arn"
exAttName = {"#id": "eventId"}
def getValueFromKey(value, key):
try:
return str(value[key])
except KeyError as e:
return "None"
def sendSlack(sendText):
try:
# Generate Slack messages
sendMessage = {
"text": sendText,
"username": SLACK_USER_NAME,
"icon_emoji": ":dog:"
}
sendMessage = json.dumps(sendMessage)
# Send message to Slack channel via Incoming Webhook
request = urllib.request.Request(
SLACK_WEB_HOOK_URL,
data=sendMessage.encode('utf-8'),
method="POST"
)
with urllib.request.urlopen(request) as response:
responseBody = response.read().decode('utf-8')
logger.info("[STEP8] send to Slack finished.")
return responseBody
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
return {
'StatusCode': 500,
'message': 'An error occured at Check long time switch user from DynamoDB.'
}
def lambda_handler(event, context):
logger.info("[STEP1] Loading Function")
try:
count = 0
# Get items from DynamoDB Table
result = table.scan(
ProjectionExpression=pjExp,
ExpressionAttributeNames=exAttName
)
jsonDict = [""] * len(result["Items"])
# Get arn and eventTime into JSON dictionary
for item in result["Items"]:
jsonDict[count] = {}
jsonDict[count]["arn"] = getValueFromKey(item, "arn")
jsonDict[count]["eventTime"] = getValueFromKey(item, "eventTime")
count = count + 1
logger.info("[STEP2] jsonDict: {}".format(jsonDict))
# Sorting JSON dictionary Order By eventTime Desc
sortDict = sorted(jsonDict, key=operator.itemgetter('eventTime'), reverse=True)
logger.info("[STEP3] sort is completed. sortDict: {}".format(sortDict))
# Srote in each array as arnList and lastLoginTimeList
arnList = [arn.get('arn') for arn in sortDict]
lastLoginTimeList = [lastLoginTime.get('eventTime') for lastLoginTime in sortDict]
oldArnList = []
evalFlg = 0
if len(arnList) == len(lastLoginTimeList):
# Generate old switchrole users into new array
for tmpArn, tmpLastLoginTime in zip(arnList, lastLoginTimeList):
timeDelta = datetime.timedelta(days=int(TIME_DELTA))
evalDate = datetime.datetime.now() - timeDelta
evalDate = evalDate.astimezone()
logger.info("[STEP4-1] timeDelta is: {}".format(evalDate))
tmpLastLoginTime = dateutil.parser.parse(tmpLastLoginTime)
logger.info("[STEP4-2] tmpLastLoginTime is: {}".format(tmpLastLoginTime))
if tmpLastLoginTime > evalDate:
evalFlg = 1
continue
else:
evalFlg = 0
logger.info("[STEP5-1] Old Switched User is: {}".format(tmpArn))
for tmpArn2nd, tmpLastLoginTime2nd in zip(arnList, lastLoginTimeList):
tmpLastLoginTime2nd = dateutil.parser.parse(tmpLastLoginTime2nd)
if tmpArn != tmpArn2nd:
continue
elif tmpArn == tmpArn2nd and tmpLastLoginTime < tmpLastLoginTime2nd:
evalFlg = 1
continue
else:
logger.info("[STEP5-1] evalFlg is: {}".format(evalFlg))
if evalFlg == 0:
oldArnList.append(tmpArn)
logger.info("[STEP5-2] appended list of tmpArn is: {}".format(tmpArn))
oldArnList = list(set(oldArnList))
logger.info("[STEP6] old IAM Users are this list: {}".format(oldArnList))
# Generate Slack messages and old switch role user list
if oldArnList == []:
sendText = TIME_DELTA + MSG_OLD_ARN_IS_NONE
logger.info("[STEP7-1] oldArnList is NOT Exist.")
else:
sendText = TIME_DELTA + MSG_OLD_ARN_IS_EXIST + ' ' + str(oldArnList)
logger.info("[STEP7-2] oldArnList is Exist.")
sendSlack(sendText)
logger.info("[STEP9] Finished Function")
return {
'StatusCode': 200,
'message': 'Check long time switch user from DynamoDB is finished nomaly.'
}
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
return {
'StatusCode': 500,
'message': 'An error occured at Check long time switch user from DynamoDB.'
}
ここでまずは言い訳させてください。本当はSQL文で言うところの、SELECT DISTINCT文を使いたかったのですが、当然のことながら、Amazon DynamoDBにはそんな機能は兼ね備えていません。かといって、get_item()関数ではひとつのレコードしか返すことができないので、ちょっとコストはかかりますがscan()関数を実行してeventTimeとarnを引っ張り出しているという苦しいことをしております。Amazon DynamoDBって、リレーショナルにするまでもない情報を格納するにはお手軽だしコストもかからないので楽なのですが、こういうケースでちょっと痒いところに手が届きにくいというのがちょっと辛いですね。
なので、後半で、eventTimeでソートをかけた後に、リストの中身を二重にぐるぐる回して、最終のスイッチロール実行から一定期間が経過したIAMユーザーを探し出すという面倒臭いことをしています。この辺りのコードはもう少しエレガントになるように改良したいところ。
SlackのIncoming Webhookを使用してPostする関数は、教科書通りです。
実行結果
この2つのワークフローを回していくことによって、Amazon DynamoDBの中には少しずつスイッチロールしたIAMユーザーの履歴が溜め込まれていって、Slackには以下のようなイメージでメッセージが定期的に通知されてきます。
なかなかこの手の作業を自動化するのって面倒ではあるのですが、まずはこの実装をきっかけに、管理できていないところを可視化するという営みまでは実現することができました。この先の計画としては、可視化されたIAMユーザーから、該当するスイッチロール権限を自動的に削除する、というところまで行き着くところなのですが、この辺は運用も絡む世界なので、さらに整理しながら拡張を続けていきたいなと思っています。