CloudTrail ログ(JSON)をリージョン、年、月でパーティション分割して ORC に変換・マージしてみた。
使用感
- 東京リージョンの全ログ(2017〜2020年)で S3 の API 発行回数をランキング表示。
- スキャンサイズは約62MB、実行時間は約4秒。
- S3 のフォルダ(プレフィックス)構成はこんな感じ。
- 総サイズは 4.8 GiB(東京リージョンのみ)。
$ export AWS_ACCOUNT_ID=<AWS account id> $ aws s3 ls --recursive --human-readable --summarize cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics/orc/ 2020-01-08 05:49:31 19.2 MiB orc/region=ap-northeast-1/year=2017/month=04/000001_0 2020-01-08 05:49:34 3.5 MiB orc/region=ap-northeast-1/year=2017/month=05/000013_0 2020-01-08 05:49:32 3.2 MiB orc/region=ap-northeast-1/year=2017/month=06/000000_0 2020-01-08 05:49:31 2.3 MiB orc/region=ap-northeast-1/year=2017/month=07/000000_0 2020-01-08 05:49:31 872.9 KiB orc/region=ap-northeast-1/year=2017/month=08/000000_0 2020-01-08 05:49:32 23.9 MiB orc/region=ap-northeast-1/year=2017/month=09/000006_0 2020-01-08 05:49:33 178.9 MiB orc/region=ap-northeast-1/year=2017/month=10/000002_0 2020-01-08 05:49:34 330.3 MiB orc/region=ap-northeast-1/year=2017/month=11/000019_0 2020-01-08 05:49:31 234.4 MiB orc/region=ap-northeast-1/year=2017/month=12/000020_0 2020-01-08 05:49:32 191.6 MiB orc/region=ap-northeast-1/year=2018/month=01/000003_0 2020-01-08 05:49:32 152.2 MiB orc/region=ap-northeast-1/year=2018/month=02/000003_0 2020-01-08 05:49:32 140.3 MiB orc/region=ap-northeast-1/year=2018/month=03/000012_0 2020-01-08 05:49:31 113.0 MiB orc/region=ap-northeast-1/year=2018/month=04/000006_0 2020-01-08 05:49:34 146.4 MiB orc/region=ap-northeast-1/year=2018/month=05/000002_0 2020-01-08 05:49:31 125.6 MiB orc/region=ap-northeast-1/year=2018/month=06/000004_0 2020-01-08 05:49:32 377.7 MiB orc/region=ap-northeast-1/year=2018/month=07/000017_0 2020-01-08 05:49:32 553.8 MiB orc/region=ap-northeast-1/year=2018/month=08/000019_0 2020-01-08 05:49:34 512.8 MiB orc/region=ap-northeast-1/year=2018/month=09/000008_0 2020-01-08 05:49:34 380.1 MiB orc/region=ap-northeast-1/year=2018/month=10/000002_0 2020-01-08 05:49:34 60.6 MiB orc/region=ap-northeast-1/year=2018/month=11/000011_0 2020-01-08 05:49:32 9.1 MiB orc/region=ap-northeast-1/year=2018/month=12/000002_0 2020-01-08 05:49:34 8.3 MiB orc/region=ap-northeast-1/year=2019/month=01/000017_0 2020-01-08 05:49:34 10.5 MiB orc/region=ap-northeast-1/year=2019/month=02/000012_0 2020-01-08 05:49:34 167.1 MiB orc/region=ap-northeast-1/year=2019/month=03/000006_0 2020-01-08 05:49:32 117.9 MiB orc/region=ap-northeast-1/year=2019/month=04/000015_0 2020-01-08 05:49:32 68.8 MiB orc/region=ap-northeast-1/year=2019/month=05/000017_0 2020-01-08 05:49:34 70.2 MiB orc/region=ap-northeast-1/year=2019/month=06/000008_0 2020-01-08 05:49:34 70.9 MiB orc/region=ap-northeast-1/year=2019/month=07/000018_0 2020-01-08 05:49:31 86.1 MiB orc/region=ap-northeast-1/year=2019/month=08/000014_0 2020-01-08 05:49:34 95.9 MiB orc/region=ap-northeast-1/year=2019/month=09/000007_0 2020-01-08 05:49:32 260.9 MiB orc/region=ap-northeast-1/year=2019/month=10/000016_0 2020-01-08 05:49:31 102.4 MiB orc/region=ap-northeast-1/year=2019/month=11/000003_0 2020-01-08 05:49:31 275.0 MiB orc/region=ap-northeast-1/year=2019/month=12/000017_0 2020-01-08 05:49:31 33.7 MiB orc/region=ap-northeast-1/year=2020/month=01/000012_0 Total Objects: 34 Total Size: 4.8 GiB
手順
$ export AWS_ACCOUNT_ID=<AWS account id> $ export S3_PATH_SRC=cloudtrail-awslogs/AWSLogs/${AWS_ACCOUNT_ID}/CloudTrail $ export S3_PATH_DST=cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics
- S3バケットを作成する。
$ aws s3 mb s3://${S3_PATH_DST}
- リージョン、年、月、日のリストを作成する。
$ nohup aws s3 ls --recursive s3://${S3_PATH_SRC}/|\ perl -F/ -lane 'print qq/$F[3] $F[4] $F[5] $F[6]/'|uniq > partition_column_ymd.txt 2>&1 &
- リージョン、年、月のリストを作成する。
$ nohup aws s3 ls --recursive s3://${S3_PATH_SRC}|\ perl -F/ -lane 'print qq/$F[3] $F[4] $F[5]/'|uniq > partition_column_ym.txt 2>&1
- 年単位のフォルダ以下にファイルを集約する。
$ cat partition_column_ymd.txt|while read LINE do ARR=(${LINE//// }) echo "${ARR[0]} ${ARR[1]} ${ARR[2]}" aws s3 cp --recursive s3://${S3_PATH_SRC}/${ARR[0]}/${ARR[1]}/${ARR[2]}/${ARR[3]}/ \ s3://${S3_PATH_DST}/json/region=${ARR[0]}/year=${ARR[1]}/month=${ARR[2]}/ done
- Athena で以下の DDL を実行してテーブルを作成する。
> CREATE EXTERNAL TABLE default.cloudtrail_logs_json ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) PARTITIONED BY ( `region` string, `year` string, `month` string ) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://cloudtrail-awslogs-<AWS account id>-analytics/json/';
- Athena で以下を実行し、パーティションを認識させる。
> MSCK REPAIR TABLE cloudtrail_logs_json;
- Athena で CTAS で ORC に変換する。
> CREATE TABLE cloudtrail_logs_orc_tmp WITH ( external_location = 's3://cloudtrail-awslogs-<AWS account id>-analytics/orc_tmp/', format = 'ORC', orc_compression = 'SNAPPY', partitioned_by = ARRAY['region', 'year', 'month'] ) AS SELECT * FROM default.cloudtrail_logs_json;
- EMR の HDFS に ORC をコピーする。
$ hadoop fs -mkdir /cloudtrail-${AWS_ACCOUNT_ID}-logs/ $ s3-dist-cp --src s3://${S3_PATH_DST}/orc_tmp --dest /cloudtrail-${AWS_ACCOUNT_ID}-logs/orc_tmp/
- hive を起動する。
$ hive
- テーブルを作成する。
CREATE EXTERNAL TABLE default.cloudtrail_logs_orc_tmp_hdfs ( `eventversion` string COMMENT '', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', `eventtime` string COMMENT '', `eventsource` string COMMENT '', `eventname` string COMMENT '', `awsregion` string COMMENT '', `sourceipaddress` string COMMENT '', `useragent` string COMMENT '', `errorcode` string COMMENT '', `errormessage` string COMMENT '', `requestparameters` string COMMENT '', `responseelements` string COMMENT '', `additionaleventdata` string COMMENT '', `requestid` string COMMENT '', `eventid` string COMMENT '', `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', `eventtype` string COMMENT '', `apiversion` string COMMENT '', `readonly` string COMMENT '', `recipientaccountid` string COMMENT '', `serviceeventdetails` string COMMENT '', `sharedeventid` string COMMENT '', `vpcendpointid` string COMMENT '') PARTITIONED BY ( `region` string COMMENT '', `year` string COMMENT '', `month` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION '/cloudtrail-<AWS account id>-logs/orc_tmp/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'orc.compress'='SNAPPY');
- パーティションを認識させる。
> MSCK REPAIR TABLE cloudtrail_logs_orc_tmp_hdfs;
- hive でファイルをマージする。
> set hive.merge.mapfiles=true; > set hive.merge.mapredfiles=true; > set hive.merge.size.per.task=256000000; > set hive.merge.smallfiles.avgsize=32000000; > set hive.exec.dynamic.partition.mode=nonstrict; > CREATE TABLE cloudtrail_logs_orc_hdfs LIKE cloudtrail_logs_orc_tmp_hdfs; > INSERT OVERWRITE TABLE cloudtrail_logs_orc_hdfs PARTITION (region, year, month) SELECT * FROM cloudtrail_logs_orc_tmp_hdfs;
- リージョン、年、月でパーティショニングしてマージしたファイルを S3 にアップロードする。
$ s3-dist-cp --src /user/hive/warehouse/cloudtrail_logs_orc_hdfs/ --dest s3://cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics/orc/
- テーブルを作成する。
CREATE EXTERNAL TABLE default.cloudtrail_logs_orc ( `eventversion` string COMMENT '', `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', `eventtime` string COMMENT '', `eventsource` string COMMENT '', `eventname` string COMMENT '', `awsregion` string COMMENT '', `sourceipaddress` string COMMENT '', `useragent` string COMMENT '', `errorcode` string COMMENT '', `errormessage` string COMMENT '', `requestparameters` string COMMENT '', `responseelements` string COMMENT '', `additionaleventdata` string COMMENT '', `requestid` string COMMENT '', `eventid` string COMMENT '', `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', `eventtype` string COMMENT '', `apiversion` string COMMENT '', `readonly` string COMMENT '', `recipientaccountid` string COMMENT '', `serviceeventdetails` string COMMENT '', `sharedeventid` string COMMENT '', `vpcendpointid` string COMMENT '') PARTITIONED BY ( `region` string COMMENT '', `year` string COMMENT '', `month` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION '/cloudtrail-<AWS account id>-logs/orc/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'orc.compress'='SNAPPY');
- Athena で以下を実行し、パーティションを認識させる。
> MSCK REPAIR TABLE cloudtrail_logs_orc;
補足
- CTAS で Parquet に変換しようとすると、
CREATE TABLE cloudtrail_logs_parquet WITH ( external_location = 's3://cloudtrail-awslogs-<AWS account id>-analytics/parquet/', format = 'parquet', orc_compression = 'SNAPPY', partitioned_by = ARRAY['region', 'year', 'month'] ) AS SELECT * FROM default.cloudtrail_logs_json;
以下のエラーが発生したため、ORC に変換した。
GENERIC_INTERNAL_ERROR: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead. You may need to manually clean the data at location 's3://...' before retrying. Athena will not delete data in your account.
原因は以下の通り。
This is a known issue and the root cause boils down to the Hive Parquet writers, which Presto uses to write Parquet data (see HIVE-11625).
unable to insert empty map data type into parquet format · Issue #5934 · prestodb/presto · GitHub
The problematic method is DataWritableWriter.writeMap(). Although the key value entry is not null, either key or value can be null. And null keys are not properly handled.
According to parquet-format spec, keys of a Parquet MAP must not be null. Then I think the problem here is that, whether should we silently ignore null keys when writing a map to a Parquet table like what Hive 0.14.0 does, or throw an exception (probably a more descriptive one instead of the one mentioned in the ticket description) like Hive 1.2.1.
[HIVE-11625] Map instances with null keys are not properly handled for Parquet tables - ASF JIRA
参考
- Amazon EMR を使用して Parquet 形式ファイルを連結する
- Amazon EMRでS3DistCpを使用してHDFSとAmazon S3間で効率的にデータを移動するための7つのヒント | Amazon Web Services ブログ
- I'm Sei. — S3 上の大量データを EMR するときは S3DistCp を使うと捗る
- Amazon EMRでS3DistCpを使用してHDFSとAmazon S3間で効率的にデータを移動するための7つのヒント | Amazon Web Services ブログ
- Amazon Athena で CTAS クエリのファイルの数またはサイズを設定する
- presto/OrcInputStream.java at master · prestodb/presto · GitHub