AWS
[Lambda] Security Group 변경 사항 모니터링 고도화
easthyeok
2023. 5. 25. 21:43
개요
이전에 SecurityGroup및 IAM에 대한 변경사항을 알림받았었다.
하지만 계속 사용하다보니 SecurityGroup에 대한 알람 고도화가 필요하게 되었다.
딥다이브 해보자💪
[Lambda] IAM, SG 변경 사항 모니터링
개요 Security Group에 대한 변경사항이 감지되면 알람을 받고싶다(Slack,gmail등) Cloudtrail,EventBridge,Lambda를 활용해서 바로 해보자💪 [아키텍처] 1. IAM 서비스 및 관련 AWS API 호출은 버지니아 북부 리전
easthyeok.tistory.com
이전글에서 달라진부분만 언급하고 넘어가겠다 ¯\(°_o)/¯
[아키텍처]
- 모든 Account에서 발생하는 SG변경 사항을 MonitoringAccount를 통해 알람이 전송되도록 구성
[Lambda설정]
1. 이전 Lambda코드의 문제점
1. 가독성 떨어짐
2. 하나의 보안그룹에서 정책 여러개 추가 시(ex: ssh,https,dns포트 한번에 추가) 첫번째로 추가된 정책만 알람전송됨
3. account 별칭출력안됨 (어떤 용도로 사용하는 계정인지 모름..)
4. 보안그룹 변경사항에 대한 자세한 설명이 부족함( 포트,프로토콜,Description등 )
열심히 문제점들을 개선해 보자
2. Lambda코드 상세
- 계정에 대한 정보는 account_dict.py를 통해 dict타입으로 따로 정리
import json
import logging
import os
import urllib3
import pytz
import datetime
from account_dict import account_dict
http = urllib3.PoolManager()
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']
# Asia/seoul로 timezone설정
TZ = pytz.timezone('Asia/Seoul')
now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(TZ).strftime('%Y-%m-%d %H:%M:%S')
#INFO레벨 이상의 로그메시지만 출력 (INFO, WARNING, ERROR, CRITICAL)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
#이벤트 메시지 출력
def send_message(message):
data = json.dumps(message).encode('utf-8')
# data = json.dumps(message)
res = http.request(
method='POST',
url=HOOK_URL,
body=data
)
print(res.data, res.status)
def get_account_name(account_id):
return account_dict.get(account_id,"계정정보없음")
- account_dict.py에 정의가 안된 accountID가 존재할 경우 "계정정보없음" 출력
def lambda_handler(event, context):
# 로깅
slack_evnet = json.loads(event['Records'][0]['Sns']['Message'])
logger.info("Event : " + str(event))
if 'ipPermissions' in slack_evnet['detail']['requestParameters']:
aws_sg_number = len(slack_evnet['detail']['requestParameters']['ipPermissions']['items'])
else:
aws_sg_number = 1
aws_sg_number = int(aws_sg_number)
- 보안그룹생성,삭제 이벤트 발생 시 ['detail']['requestParameters']['ipPermissions']['items'] 라는 값은 존재하지 않는다.
- 값이 존재할 시 items의 길이(한번에 추가된 인바운드,아웃바운드규칙 수)만큼 aws_sg_number 정의
없을 시 1
for i in range(aws_sg_number):
changed_resource = "Security Group"
event_id = slack_evnet['detail']['eventID']
aws_account = slack_evnet['detail']['userIdentity']['accountId']
account_name = get_account_name(aws_account)
aws_region = slack_evnet['detail']['awsRegion']
user_name = slack_evnet['detail']['userIdentity']['arn']
source_ip = slack_evnet['detail']['sourceIPAddress']
event_time = now
event_name = slack_evnet['detail']['eventName']
event_request_parameters = slack_evnet['detail'].get('requestParameters', {})
aws_sg = event_request_parameters.get('ipPermissions', {}).get('items', [{}])[i]
aws_sg_depth = aws_sg.get('ipRanges', {}).get('items', [{}])[0]
aws_sg_id = slack_evnet.get('detail', {}).get('requestParameters', {}).get('groupId') or "None"
aws_sg_protocol = aws_sg.get('ipProtocol', 'None')
aws_sg_fromport = aws_sg.get('fromPort', '0')
aws_sg_toport = aws_sg.get('toPort', '0')
aws_sg_ipv4 = aws_sg_depth.get('cidrIp', 'None')
aws_sg_description = aws_sg_depth.get('description', 'None')
# cloudtrail url
ct_url = f"https://{aws_region}.console.aws.amazon.com/cloudtrail/home?region={aws_region}#/events/{event_id}"
print(ct_url)
current_event = ""
for key, value in event_request_parameters.items():
current_event += f"{key} : {value}"
logger.info("SLACK Channel: " + SLACK_CHANNEL)
logger.info("HOOK URL : " + HOOK_URL)
check_list = ["Delete", "Detach", "Revoke"]
if any(x in event_name for x in check_list):
color = "#eb4034"
else:
color = "#0c3f7d"
- 이전에 정의한 aws_sg_number(items길이)만큼 for문 수행
- 값이 없다면 [{}] 빈 딕셔너리 반환
- ipv4,ipv6,description 정보는 [ipRangs][itmes][0]에서 값을 빼와야 하므로 aws_sg_depth으로 변수명을 지정한 뒤 따로 설정
- 나머지는 이전에 설정했던부분과 동일하니 PASS ~
[MultiAccount EventBridge설정]
1. 이벤트규칙 생성
{
"source": ["aws.ec2"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["ec2.amazonaws.com"],
"eventName": ["AuthorizeSecurityGroupIngress", "AuthorizeSecurityGroupEgress", "RevokeSecurityGroupIngress", "RevokeSecurityGroupEgress", "CreateSecurityGroup", "DeleteSecurityGroup"]
}
}
- 모니터링account와 그외 나머지account 모두 이벤트규칙을 생성한다
- SNS 및 Lambda연동은 이전글 참조~
2. 모니터링account로 이벤트 전송
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"events:PutEvents"
],
"Resource": [
"arn:aws:events:ap-northeast-2:ACCOUNTID:event-bus/default"
]
}
]
}
- 모니터링Account에 생성한 이벤트버스로 전송
3. 모니터링 Account 설정
- 그외나머지Account가 모니터링account의 이벤트버스에 접근할 수 있도록 설정
- ㅁ = 나머지AccountID ㅁ = 모니터링 AccountID
[SG 변경사항 알림 테스트]
AS-IS
TO-BE
- 나름 깔끔하게 변경된 것 같다💪