AWS Athena 로 VPC Flow Log 분석하기 – 1

AWS Athena 는 로그 분석 서비스로 Hive 와 같다. 가장 많이 쓰이는 부분이 VPC Flow Log 를 분석하는데에 Athena 를 이용하는 방법이다. 이 글에서는 어떻게 VPC Flow Log 를 Athena 를 통해서 분석하는 알아 본다.

VPC Flow Log 설정

VPC Flow Log 설정은 간단하다. VPC 에서 Flow logs 탭에서 설정하면 그만인데, 다음과 같은 파라메터를 필요로 한다.

  • Destination Type: S3
  • Destination Name: S3 로 지정했을 시에 S3 Bucket 이름.
  • Log record format: AWS default format
  • Log file format: Text (default)
  • Partition logs by time: Every 24 hours (default)

여기서 중요한 것은 밑에서 3가지 정도다. Log record format 을 바꿀 경우에 Athena 테이블 생성시에 맞춰야 한다. Partition logs by time 을 24 시간으로 하면 S3 버킷 안에서 2022/08/29 형식으로 폴더가 생성되면서 VPC 로그가 전송 된다. 하루에 한번 폴더를 생성하면서 로그가 쌓인다는 뜻이다. 만일 Every 1 hours (60 minutes) 으로 할 경우에 2022/08/29/09 폴더가 생성되면서 로그가 쌓인다. 이 폴더의 구조는 나중에 Athena 에서 파티션 프포젝션(Partition Projection) 을 설정할때에 참고하게 된다.

또, S3 버킷으로 전송할 경우에 S3 의 암호화를 SSE_S3 로 할 것을 권장한다. CMK 로 할 경우에 로그가 쌓이지 않을 가능성이 있다. 또, 향후에 권한지정에서 CMK 권한을 함께 줘야하는 복잡함이 있을 수 있다.

S3 버킷 확인

필자의 VPC Log 설정으로 인해서 S3 에는 다음과 같은 형태로 S3 에 로그가 쌓이고 있다.

버킷 이름만 지정해주면 그 안에 AWSLogs/계정ID/vpclfowlogs/ap-northeast-2/ 가 자동으로 생성되며 그 안으로 year/month/day 순으로 생성된다.

앞에서 VPC Flow Logs 설정할때에 Partition logs by time 에서 Every 24 hours (default) 로 지정했기 때문에 날짜별로 생성된다.

Athena 작업

작업그룹(Workgroups) 생성

먼저 Athena 에서 필요한 것이 작업그룹(Workgroups) 이다. 기본적으로 Primary 가 기본 생성되어 있지만 하나 생성한다. 생성할때에 필요한 것은 다음과 같다.

쿼리 결과를 받을 S3 를 지정해야 한다. 만일 암호화가 필요하다면 SSE_S3 를 권장한다. CMK 도 가능하지만 Role 설정을 잘 해줘야 한다.

데이터 사용량을 적절하게 조절해 주길 권장한다. 덮어놓고 좋다고 No limit 로 하는 순간 돈이 술술 나갈 것이다. 이 데이터 사용량은 얼마든지 설정을 변경할 수 있다.

Database 생성

Query Editor 로 이동해 화면 오른쪽 상단에서 앞에서 생성한 작업그룹(Workgroup) 으로 변경해 준다.

그러면 Workgroup 이 변경 된다. 이제 다음과 같이 Database 를 생성해 준다.

CREATE DATABASE vpcflowlogs

위 쿼리문은 화면안에 쿼리입력창에 입력하고 ‘Run’ 을 클릭해주면 된다.

이렇게 하면 데이터베이스가 생성이 된다. 그리고 왼쪽에 Database 부분에서 새로 생성한 데이터베이스를 선택해 준다.

위와같은 상태가 된다면 이제 테이블을 생성해야 한다.

Table 생성

이제 Table 을 생성해야 한다. 테이블을 생성할때에 중요한 것이 VPC Flow Log 의 S3 저장소와 데이터 컬럼들이다. 다음과 같다.

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs(
    version int,
    account_id string,
    interface_id string,
    srcaddr string,
    dstaddr string,
    srcport int,
    dstport int,
    protocol bigint,
    packets bigint,
    bytes bigint,
    start bigint,
    end bigint,
    action string,
    log_status string
)
PARTITIONED BY (date date)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://flow-log-buckets/AWSLogs/1111122222233333/vpcflowlogs/ap-northeast-2/'
TBLPROPERTIES("skip.header.line.count"="1")

테이블을 생성할때에 컬럼을 지정해 줘야 하는데, VPC Flow Log 설정할때에 record format 을 AWS Default Format 을 지정했는데, 그 포맷은 위와 같다. 이 컬럼들은 S3 에 저장된 파일을 열었을 때에 맨 처음 나오는 행에 컬럼헤더 값들이다. 정확히는 ‘_’ 가 ‘-‘ 로 보면 정확하다. Athena 테이블은 ‘-‘ 를 ‘_’ 로 변환된다.

중요한 것은 Partitioned BY 부분에 date 부분이다. 테이블을 파티셔닝을 하기 위한 기준이 되는 컬럼을 추가하는 것이다. 이 파티션 컬럼은 S3 에 저장되는 VPC Flow Log 에는 없는데 파티셔닝 테이블을 생성할때에 이름처럼 생성된다.

위 쿼리문을 실행해 테이블을 생성한다.

파티션 생성

만일 파티션이 없다면 쿼리 시간이 길어진다. VPC Flow Log 의 경우 날짜로별로(yyyy/MM/dd) 쌓이는 것에 창안해 파티션을 날짜별로 생성하도록 할 것이다. 다음과 같이 파티션을 생성해준다.

ALTER TABLE vpc_flow_logs
ADD PARTITION (date='2022-08-29')
LOCATION 's3://flow-log-buckets/AWSLogs/1111122222233333/vpcflowlogs/ap-northeast-2/2022/08/29';

이렇게 하면 2022/08/29 에 해당하는 버킷에 내용이 파티션으로 입력 된다.

파티션을 이용하면 데이터 조회시에 그 범위가 줄어든다. 정확하게는 데이터 스캔(Data Scan) 범위를 줄일 수 있어서 쿼리 속도를 높여줄 뿐만 아니라 스캔 범위가 줄어들기 때문에 데이터를 긁어오는 양도 줄게되어서 비용을 아낄 수 있다.

앞에서 만든 테이블의 경우 date 컬럼은 date 테이터 타입임으로 다음과 같은 쿼리가 가능하다.

SELECT * FROM vpc_flow_logs WHERE 'date' > DATE('2022-08-28') - interval 1 day and 'date' < DATE('2022-08-28') ORDER BY start DESC LIMIT 10

date 타입이기 때문에 문자열을 date 타입으로 해줘야 한다.

파티션 생성 문제

파티션을 생성하면 문제가 하나 있다. 날짜별로 하나하나 다 생성해 줘야 한다. 시간은 흐르고 날짜는 변경될 것이다. 내일이 되면 다른 날짜로 S3 에 폴더가 만들어지고 거기에 데이터가 쌓일 것이다. 그러면 Athena 에서 그 날짜에 맞는 S3 저장소의 파티션을 생성해줘야 한다.

매일매일 하루에 한번 이것을 해야 한다고 생각하면 힘들다.

이것을 자동으로 하는 방법은 존재한다. Lambda 를 이용하는 것이다.

Lambda 작성하기

Lambda 실행을 위한 Role 생성

Lambda 를 작성해 자동으로 매일매일 하루에 한번 파티션을 생성하도록 해보자. Lambda 를 실행하기 위해서는 먼저 Lambda 실행을 위한 Role 이 필요하다.

## Role Name: ServiceRoleForVPCFlowLogsAthenaIntegrated
# Trust Relationships
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
# Inline Policy: VPCFlowLogsAthenaIntegrationLambdaExecutorPolicy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucketMultipartUploads",
                "s3:PutBucketPublicAccessBlock",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:GetBucketLocation",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": [
                "arn:aws:s3:::athena-query-result/*",
                "arn:aws:s3:::athena-query-result"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::flow-log-buckets/*",
                "arn:aws:s3:::flow-log-buckets"
            ]
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetQueryResults"
            ],
            "Resource": [
                "arn:aws:athena:ap-northeast-2:111111222222333333:workgroup/*"
            ]
        },
        {
            "Sid": "VisualEditor3",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "glue:GetPartition",
                "s3:ListAllMyBuckets",
                "s3:ListBucket",
                "glue:CreatePartition",
                "s3:GetBucketLocation",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "glue:UpdatePartition",
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:BatchCreatePartition"
            ],
            "Resource": "*"
        }
    ]
}

Lambda 실행을 위한 Role 을 위와같이 생성해 준다.

Lambda 생성

Lambda 는 여러가지 언어로 작성될 수 있는데, 여기서는 Python 을 이용했다.

# Lambda Name: aws-athena-vpc-flow-log-Partitioner
import boto3
import datetime
import logging
import time
import re

# setup simple logging for INFO
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# setup variables
athena_workgroup = "athena-vpc-flow-log"
catalog_database = "vpcflowlogs"
flowlog_bucket="flow-log-buckets"
account_id = "111112222233333"
flowlog_table = "vpc_flow_logs"

# define the connection
athena = boto3.client('athena','ap-northeast-2')

def lambda_handler(event, context):

    today = datetime.datetime.utcnow()
    #today += datetime.timedelta(hours=10) # Convert KST
    logger.info('Today: {}'.format(today))

    year = today.strftime("%Y")
    month = today.strftime("%m")
    day  = today.strftime("%d")

    ############################################################################
    #  VPC Flow Log Catalog 테이블 파티션 갱신
    ############################################################################
    query = """
                ALTER TABLE {}
                ADD PARTITION (date='{}-{}-{}')
                LOCATION 's3://{}/AWSLogs/{}/vpcflowlogs/ap-northeast-2/{}/{}/{}'
            """.format(flowlog_table, year, month, day, flowlog_bucket, account_id, year, month, day)
    logger.info("Query: {}".format(query))

    response = athena.start_query_execution( QueryString=query,
                                             QueryExecutionContext={
                                                 'Catalog': 'AwsDataCatalog',
                                                 'Database': catalog_database
                                             },
                                             WorkGroup=athena_workgroup,
                                             ResultConfiguration={
                                                 'OutputLocation': 's3://aws-athena-query-result/',
                                                 'EncryptionConfiguration': {
                                                     'EncryptionOption': 'SSE_S3'
                                                 }

                                             }
                                        )
    query_execution_id = response['QueryExecutionId']
    logger.info('CodeIssue PRD Execution ID: {}'.format(query_execution_id))

    time.sleep(10)

    state = 'RUNNING'
    max_execution = 5
    while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
        max_execution = max_execution -1

        results = athena.get_query_execution(QueryExecutionId = query_execution_id)
        if 'QueryExecution' in results and \
                'Status' in results['QueryExecution'] and \
                'State' in results['QueryExecution']['Status']:
            state = results['QueryExecution']['Status']['State']
            if state == 'FAILED':
                logger.info("Query Result Failed: {}".format(results))
            elif state == 'SUCCEEDED':
                s3_path = results['QueryExecution']['ResultConfiguration']['OutputLocation']
                filename = re.findall('.*\/(.*)', s3_path)[0]
                logger.info("Query Result S3: {}".format(filename))
                break

        time.sleep(1)

Lambda 를 실행하는 Roles 는 앞에서 작성한 Roles 를 지정해 준다.

EventBridge Rule 생성

EventBridge 에서 Rule 를 생성해 매일 자정 0시 0분에 람다를 실행하도록 설정해 준다.

KST 를 위한 Athena 테이블 View 생성

Athena 에 vpc_flow_logs 테이블에 start, end 컬럼은 unixtime 이지만 UTC 기반이다. 이것을 KST 기반으로 보기 위해서는 연산이 필요한데, 그것을 아예 View 만들어 놓으면 좋다.

CREATE VIEW vpc_flow_logs_kst AS
SELECT
  "version"
, "account_id"
, "interface_id"
, "srcaddr"
, "dstaddr"
, "srcport"
, "dstport"
, "protocol"
, "packets"
, "bytes"
, ("from_unixtime"("start") + INTERVAL  '9' HOUR) "start"
, ("from_unixtime"("end") + INTERVAL  '9' HOUR) "end"
, "action"
, "log_status"
, "date"
FROM
  vpc_flow_logs

이제는 vpc_flow_logs_kst 뷰(view) 에 질의를 하면 start, end 컬럼의 데이터가 KST 시간으로 표신된다.

결론

지금까지 VPC Flow Log 생성에서부터 Athena 를 이용하는 방법, 더 나가 자동으로 파티션을 생성하도록 Lambda 까지 작성해봤다.

하지만, Lambda 작성도 필요없는 방법이 있다. 파티션 프로젝션(Partition Projection) 이라고 불리는 방법인데, 이것은 처음 테이블을 생성할때에 S3 의 날짜 폴더 구조를 기반으로 자동으로 파티션을 인식시키는 방법이다. 이렇게 하면 Lambda 를 이용해 파티션을 수동으로 생성해줄 필요가 없게된다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다