ablog

不器用で落着きのない技術者のメモ

CloudTrail ログ(JSON)を年月でパーティション化して Parquet に変換する

CloudTrail ログ(JSON)を年月でパーティション化して Parquet に変換した手順。

$ export S3_PATH_SRC=cloudtrail-awslogs/AWSLogs/<AWS account id>/CloudTrail
$ export S3_PATH_DST=cloudtrail-awslogs-analytics
$ aws s3 mv ${S3_PATH_DST}
  • リージョン、年、月、日のリストを作成する。
$ aws s3 ls --recursive s3://${S3_PATH_SRC}/ap-northeast-1|\
perl -F/ -lane 'print qq/$F[3] $F[4] $F[5] $F[6]/'|uniq > partition_column_ymd.txt 2>&1 &
  • リージョン、年、月のリストを作成する。
$ aws s3 ls --recursive s3://${S3_PATH_SRC}|\
perl -F/ -lane 'print qq/$F[3] $F[4]  $F[5]/'|uniq > partition_column_y.txt 2>&1
  • 年単位のフォルダ以下にファイルを集約する。
$ cat partition_column_ymd.txt|while read LINE
do
    ARR=(${LINE//// })
    echo "${ARR[0]} ${ARR[1]} ${ARR[2]}"
    aws s3 cp --recursive s3://${S3_PATH_SRC}/${ARR[0]}/${ARR[1]}/${ARR[2]}/${ARR[3]}/ \
        s3://${S3_PATH_DST}/json/region=${ARR[0]}/year=${ARR[1]}/month=${ARR[2]}/
done
  • Athena で以下の DDL を実行してテーブルを作成する。
> CREATE EXTERNAL TABLE default.cloudtrail_logs_json (
eventversion STRING,
useridentity STRUCT<
               type:STRING,
               principalid:STRING,
               arn:STRING,
               accountid:STRING,
               invokedby:STRING,
               accesskeyid:STRING,
               userName:STRING,
sessioncontext:STRUCT<
attributes:STRUCT<
               mfaauthenticated:STRING,
               creationdate:STRING>,
sessionissuer:STRUCT<  
               type:STRING,
               principalId:STRING,
               arn:STRING, 
               accountId:STRING,
               userName:STRING>>>,
eventtime STRING,
eventsource STRING,
eventname STRING,
awsregion STRING,
sourceipaddress STRING,
useragent STRING,
errorcode STRING,
errormessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestid STRING,
eventid STRING,
resources ARRAY<STRUCT<
               ARN:STRING,
               accountId:STRING,
               type:STRING>>,
eventtype STRING,
apiversion STRING,
readonly STRING,
recipientaccountid STRING,
serviceeventdetails STRING,
sharedeventid STRING,
vpcendpointid STRING
)
PARTITIONED BY ( 
  `region` string, 
  `year` string,
  `month` string
)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://cloudtrail-awslogs-analytics/json/';
> MSCK REPAIR TABLE cloudtrail_logs_json;
  • s3://cloudtrail-awslogs-merge/orc_tmp を作成する。
  • CTAS で ORC に変換する。
> CREATE TABLE cloudtrail_logs_orc_tmp
WITH (
        external_location = 's3://cloudtrail-awslogs-analytics/orc_tmp/',
        format = 'ORC',
        orc_compression = 'SNAPPY',
        partitioned_by = ARRAY['region', 'year', 'month']
  )
AS SELECT * FROM default.cloudtrail_logs_json;
  • ファイルを128MB単位でまとめる。
$ export S3_PATH_DST=cloudtrail-awslogs-analytics
$ cat partition_column_ym.txt|while read LINE
do
    ARR=(${LINE//// })
    echo "${ARR[0]} ${ARR[1]}"
    s3-dist-cp --src s3://${S3_PATH_DST}/orc_tmp/region=${ARR[0]}/year=${ARR[1]}/ \
        --dest s3://${S3_PATH_DST}/orc/region=${ARR[0]}/year=${ARR[1]}/ \
        --targetSize=128 --groupBy=".*region=(${ARR[0]})/year=(${ARR[1]}).*" \
        --outputCodec=snappy
done
  • 最終版のテーブルを作成する。
> CREATE EXTERNAL TABLE `cloudtrail_logs_orc`(
  `eventversion` string COMMENT '', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', 
  `eventtime` string COMMENT '', 
  `eventsource` string COMMENT '', 
  `eventname` string COMMENT '', 
  `awsregion` string COMMENT '', 
  `sourceipaddress` string COMMENT '', 
  `useragent` string COMMENT '', 
  `errorcode` string COMMENT '', 
  `errormessage` string COMMENT '', 
  `requestparameters` string COMMENT '', 
  `responseelements` string COMMENT '', 
  `additionaleventdata` string COMMENT '', 
  `requestid` string COMMENT '', 
  `eventid` string COMMENT '', 
  `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', 
  `eventtype` string COMMENT '', 
  `apiversion` string COMMENT '', 
  `readonly` string COMMENT '', 
  `recipientaccountid` string COMMENT '', 
  `serviceeventdetails` string COMMENT '', 
  `sharedeventid` string COMMENT '', 
  `vpcendpointid` string COMMENT '')
PARTITIONED BY ( 
  `region` string COMMENT '', 
  `year` string COMMENT '',
  `month` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  's3://cloudtrail-awslogs-analytics/orc'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'orc.compress'='SNAPPY')
> MSCK REPAIR TABLE cloudtrail_logs_orc;

補足

  • CTAS で Parquet に変換しようとすると以下のエラーが発生したため、ORC に変換した。
GENERIC_INTERNAL_ERROR: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead. You may need to manually clean the data at location 's3://...' before retrying. Athena will not delete data in your account.

ボツになった手順

  • 年単位でファイルを128MB単位にまとめる。
cat partition_column_y.txt|while read LINE
do
    ARR=(${LINE//// })
    echo "${ARR[0]} ${ARR[1]}"
    s3-dist-cp --src s3://${S3_PATH_DST}/tmp1/region=${ARR[0]}/year=${ARR[1]}/ \
        --dest s3://${S3_PATH_DST}/tmp2/region=${ARR[0]}/year=${ARR[1]}/ \
        --targetSize=128 --groupBy=".*(CloudTrail_${ARR[0]}_${ARR[1]}).*(.json).*" \
        --outputCodec=gz
done