AWS Cloudtrail Logs を AWS Glue と Amazon Quicksight 使って可視化する | Amazon Web Services ブログ を試してみた。
Lambda用ロールの作成
- 名前: CloudTrailWatchLogs
- インラインポリシー
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:*" }, { "Action": [ "s3:*" ], "Resource": [ "arn:aws:s3:::cloudtrail-do-not-delete/*", "arn:aws:s3:::cloudtrail-do-not-delete" ], "Effect": "Allow" } ] }
Glue用ロールの作成
- 名前: AWSGlueServiceRole-Default
- 以下のポリシーをアタッチ
- AmazonS3FullAccess
- AWSGlueServiceRole
Lambda関数の作成
- 関数名: cloudTrail2FlatfileFunction
- ランタイム : Python 2.7
- 実行ロール: CloudTrailWatchLogs
- トリガーの設定
- 関数コード
from __future__ import print_function import json import urllib import boto3 import gzip s3 = boto3.resource('s3') client = boto3.client('s3') def convertColumntoLowwerCaps(obj): for key in obj.keys(): new_key = key.lower() if new_key != key: obj[new_key] = obj[key] del obj[key] return obj def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) print(bucket) print(key) try: newKey = 'flatfiles/' + key.replace("/", "") client.download_file(bucket, key, '/tmp/file.json.gz') with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file: i = 0 for line in file: for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['records']: if i != 0: output.write("\n") output.write(json.dumps(record)) i += 1 client.upload_file('/tmp/out.json.gz', bucket,newKey) return "success" except Exception as e: print(e) print('Error processing object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e
Glue クローラの作成(Flatfile用)
- 名前: cloudTrailFlatfiles
- Choose a data store: S3
- インクルードパス: s3://cloudtrail-do-not-delete/flatfiles
- IAM ロール: AWSGlueServiceRole-Default
- データベース: cloudtrail
- 頻度: 毎時
Glue クローラの作成(Parquet用)
- 名前: cloudtrailParquetFiles
- Choose a data store: S3
- インクルードパス: s3://cloudtrail-do-not-delete/parquettrails
- IAM ロール: AWSGlueServiceRole-Default
- データベース: cloudtrail
- 頻度: 毎時
Glueジョブの作成
- 以下のコードをファイル名 "cloudtrailtoparquet.py" で S3バケット "az-src" にアップロードする。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import boto3 import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrail", table_name = "flatfiles", transformation_ctx = "datasource0") resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1") relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail") datasink = glueContext.write_dynamic_frame.from_options(frame = relationalized1, connection_type = "s3", connection_options = {"path": "s3://cloudtrail-do-not-delete/parquettrails"}, format = "parquet", transformation_ctx = "datasink4") job.commit()
Athena でクエリ実行
- データベースで cloudtrail を選択する
- 以下のクエリを実行する
select * from cloudtrail.parquettrails where eventtime > '2017-10-23T12:00:00Z' AND eventtime < '2017-10-23T13:00:00Z' order by eventtime asc;