CloudTrail ログ(JSON)を年月でパーティション化して Parquet に変換した手順。
- 環境変数をセットする。
$ export S3_PATH_SRC=cloudtrail-awslogs/AWSLogs/<AWS account id>/CloudTrail $ export S3_PATH_DST=cloudtrail-awslogs-analytics
- S3バケットを作成する。
$ aws s3 mv ${S3_PATH_DST}
- リージョン、年、月、日のリストを作成する。
$ aws s3 ls --recursive s3://${S3_PATH_SRC}/ap-northeast-1|\ perl -F/ -lane 'print qq/$F[3] $F[4] $F[5] $F[6]/'|uniq > partition_column_ymd.txt 2>&1 &
- リージョン、年、月のリストを作成する。
$ aws s3 ls --recursive s3://${S3_PATH_SRC}|\ perl -F/ -lane 'print qq/$F[3] $F[4] $F[5]/'|uniq > partition_column_y.txt 2>&1
- 年単位のフォルダ以下にファイルを集約する。
$ cat partition_column_ymd.txt|while read LINE do ARR=(${LINE//// }) echo "${ARR[0]} ${ARR[1]} ${ARR[2]}" aws s3 cp --recursive s3://${S3_PATH_SRC}/${ARR[0]}/${ARR[1]}/${ARR[2]}/${ARR[3]}/ \ s3://${S3_PATH_DST}/json/region=${ARR[0]}/year=${ARR[1]}/month=${ARR[2]}/ done
- Athena で以下の DDL を実行してテーブルを作成する。
> CREATE EXTERNAL TABLE default.cloudtrail_logs_json ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) PARTITIONED BY ( `region` string, `year` string, `month` string ) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://cloudtrail-awslogs-analytics/json/';
- Athena で以下を実行し、パーティションを認識させる。
> MSCK REPAIR TABLE cloudtrail_logs_json;
- s3://cloudtrail-awslogs-merge/orc_tmp を作成する。
- CTAS で ORC に変換する。
> CREATE TABLE cloudtrail_logs_orc_tmp WITH ( external_location = 's3://cloudtrail-awslogs-analytics/orc_tmp/', format = 'ORC', orc_compression = 'SNAPPY', partitioned_by = ARRAY['region', 'year', 'month'] ) AS SELECT * FROM default.cloudtrail_logs_json;
- ファイルを128MB単位でまとめる。
$ export S3_PATH_DST=cloudtrail-awslogs-analytics $ cat partition_column_ym.txt|while read LINE do ARR=(${LINE//// }) echo "${ARR[0]} ${ARR[1]}" s3-dist-cp --src s3://${S3_PATH_DST}/orc_tmp/region=${ARR[0]}/year=${ARR[1]}/ \ --dest s3://${S3_PATH_DST}/orc/region=${ARR[0]}/year=${ARR[1]}/ \ --targetSize=128 --groupBy=".*region=(${ARR[0]})/year=(${ARR[1]}).*" \ --outputCodec=snappy done
- 最終版のテーブルを作成する。
> CREATE EXTERNAL TABLE `cloudtrail_logs_orc`( `eventversion` string COMMENT '', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', `eventtime` string COMMENT '', `eventsource` string COMMENT '', `eventname` string COMMENT '', `awsregion` string COMMENT '', `sourceipaddress` string COMMENT '', `useragent` string COMMENT '', `errorcode` string COMMENT '', `errormessage` string COMMENT '', `requestparameters` string COMMENT '', `responseelements` string COMMENT '', `additionaleventdata` string COMMENT '', `requestid` string COMMENT '', `eventid` string COMMENT '', `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', `eventtype` string COMMENT '', `apiversion` string COMMENT '', `readonly` string COMMENT '', `recipientaccountid` string COMMENT '', `serviceeventdetails` string COMMENT '', `sharedeventid` string COMMENT '', `vpcendpointid` string COMMENT '') PARTITIONED BY ( `region` string COMMENT '', `year` string COMMENT '', `month` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 's3://cloudtrail-awslogs-analytics/orc' TBLPROPERTIES ( 'has_encrypted_data'='false', 'orc.compress'='SNAPPY')
- Athena で以下を実行し、パーティションを認識させる。
> MSCK REPAIR TABLE cloudtrail_logs_orc;
補足
- CTAS で Parquet に変換しようとすると以下のエラーが発生したため、ORC に変換した。
GENERIC_INTERNAL_ERROR: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead. You may need to manually clean the data at location 's3://...' before retrying. Athena will not delete data in your account.
ボツになった手順
- 年単位でファイルを128MB単位にまとめる。
cat partition_column_y.txt|while read LINE do ARR=(${LINE//// }) echo "${ARR[0]} ${ARR[1]}" s3-dist-cp --src s3://${S3_PATH_DST}/tmp1/region=${ARR[0]}/year=${ARR[1]}/ \ --dest s3://${S3_PATH_DST}/tmp2/region=${ARR[0]}/year=${ARR[1]}/ \ --targetSize=128 --groupBy=".*(CloudTrail_${ARR[0]}_${ARR[1]}).*(.json).*" \ --outputCodec=gz done
参考
- Amazon EMR を使用して Parquet 形式ファイルを連結する
- Amazon EMRでS3DistCpを使用してHDFSとAmazon S3間で効率的にデータを移動するための7つのヒント | Amazon Web Services ブログ
- I'm Sei. — S3 上の大量データを EMR するときは S3DistCp を使うと捗る
- Amazon EMRでS3DistCpを使用してHDFSとAmazon S3間で効率的にデータを移動するための7つのヒント | Amazon Web Services ブログ
- Amazon Athena で CTAS クエリのファイルの数またはサイズを設定する
- presto/OrcInputStream.java at master · prestodb/presto · GitHub