ablog

不器用で落着きのない技術者のメモ

「AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する」をやってみた

AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する | Amazon Web Services ブログ を試してみた。

Lambda用ロールの作成
  • 名前: CloudTrailWatchLogs
  • インラインポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::cloudtrail-do-not-delete/*",
                "arn:aws:s3:::cloudtrail-do-not-delete"
            ],
            "Effect": "Allow"
        }
    ]
}
Glue用ロールの作成
  • 名前: AWSGlueServiceRole-Default
  • 以下のポリシーをアタッチ
    • AmazonS3FullAccess
    • AWSGlueServiceRole
Lambda関数の作成
  • 関数名: cloudTrail2FlatfileFunction
  • ランタイム : Python 2.7
  • 実行ロール: CloudTrailWatchLogs
  • トリガーの設定
  • 関数コード
from __future__ import print_function
import json
import urllib
import boto3
import gzip

s3 = boto3.resource('s3')
client = boto3.client('s3')

def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = key.lower()
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj


def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print(bucket)
    print(key)
    try:
        newKey = 'flatfiles/' + key.replace("/", "")
        client.download_file(bucket, key, '/tmp/file.json.gz')
        with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file:
            i = 0
            for line in file: 
                for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['records']:
            		if i != 0:
            		    output.write("\n")
            		output.write(json.dumps(record))
            		i += 1
        client.upload_file('/tmp/out.json.gz', bucket,newKey)
        return "success"
    except Exception as e:
        print(e)
        print('Error processing object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
Glue クローラの作成(Flatfile用)
  • 名前: cloudTrailFlatfiles
  • Choose a data store: S3
  • インクルードパス: s3://cloudtrail-do-not-delete/flatfiles
  • IAM ロール: AWSGlueServiceRole-Default
  • データベース: cloudtrail
  • 頻度: 毎時
Glue クローラの作成(Parquet用)
  • 名前: cloudtrailParquetFiles
  • Choose a data store: S3
  • インクルードパス: s3://cloudtrail-do-not-delete/parquettrails 
  • IAM ロール: AWSGlueServiceRole-Default
  • データベース: cloudtrail
  • 頻度: 毎時
Glueジョブの作成
  • 以下のコードをファイル名 "cloudtrailtoparquet.py" で S3バケット "az-src" にアップロードする。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrail", table_name = "flatfiles", transformation_ctx = "datasource0")
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail")
datasink = glueContext.write_dynamic_frame.from_options(frame = relationalized1, connection_type = "s3", connection_options = {"path": "s3://cloudtrail-do-not-delete/parquettrails"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
  • 以下のジョブを作成する
    • 名前: cloudTrailToParquet
    • スクリプトパス: s3://az-src/cloudtrailtoparquet.py
    • IAM ロール: AWSGlueServiceRole-Default
    • 一時ディレクトリ: デフォルトのまま
Athena でクエリ実行
  • データベースで cloudtrail を選択する
  • 以下のクエリを実行する
select *
from cloudtrail.parquettrails
where eventtime > '2017-10-23T12:00:00Z' AND eventtime < '2017-10-23T13:00:00Z'
order by eventtime asc;