ablog

不器用で落着きのない技術者のメモ

CloudTrail ログをパーティション分割して ORC に変換・マージして検索を高速化してみた

CloudTrail ログ(JSON)をリージョン、年、月でパーティション分割して ORC に変換・マージしてみた。

使用感

  • 東京リージョンの全ログ(2017〜2020年)で S3 の API 発行回数をランキング表示。
    • スキャンサイズは約62MB、実行時間は約4秒。

f:id:yohei-a:20200108185928p:plain

f:id:yohei-a:20200108190312p:plain

  • 総サイズは 4.8 GiB(東京リージョンのみ)。
$ export AWS_ACCOUNT_ID=<AWS account id>
$ aws s3 ls --recursive --human-readable --summarize cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics/orc/
2020-01-08 05:49:31   19.2 MiB orc/region=ap-northeast-1/year=2017/month=04/000001_0
2020-01-08 05:49:34    3.5 MiB orc/region=ap-northeast-1/year=2017/month=05/000013_0
2020-01-08 05:49:32    3.2 MiB orc/region=ap-northeast-1/year=2017/month=06/000000_0
2020-01-08 05:49:31    2.3 MiB orc/region=ap-northeast-1/year=2017/month=07/000000_0
2020-01-08 05:49:31  872.9 KiB orc/region=ap-northeast-1/year=2017/month=08/000000_0
2020-01-08 05:49:32   23.9 MiB orc/region=ap-northeast-1/year=2017/month=09/000006_0
2020-01-08 05:49:33  178.9 MiB orc/region=ap-northeast-1/year=2017/month=10/000002_0
2020-01-08 05:49:34  330.3 MiB orc/region=ap-northeast-1/year=2017/month=11/000019_0
2020-01-08 05:49:31  234.4 MiB orc/region=ap-northeast-1/year=2017/month=12/000020_0
2020-01-08 05:49:32  191.6 MiB orc/region=ap-northeast-1/year=2018/month=01/000003_0
2020-01-08 05:49:32  152.2 MiB orc/region=ap-northeast-1/year=2018/month=02/000003_0
2020-01-08 05:49:32  140.3 MiB orc/region=ap-northeast-1/year=2018/month=03/000012_0
2020-01-08 05:49:31  113.0 MiB orc/region=ap-northeast-1/year=2018/month=04/000006_0
2020-01-08 05:49:34  146.4 MiB orc/region=ap-northeast-1/year=2018/month=05/000002_0
2020-01-08 05:49:31  125.6 MiB orc/region=ap-northeast-1/year=2018/month=06/000004_0
2020-01-08 05:49:32  377.7 MiB orc/region=ap-northeast-1/year=2018/month=07/000017_0
2020-01-08 05:49:32  553.8 MiB orc/region=ap-northeast-1/year=2018/month=08/000019_0
2020-01-08 05:49:34  512.8 MiB orc/region=ap-northeast-1/year=2018/month=09/000008_0
2020-01-08 05:49:34  380.1 MiB orc/region=ap-northeast-1/year=2018/month=10/000002_0
2020-01-08 05:49:34   60.6 MiB orc/region=ap-northeast-1/year=2018/month=11/000011_0
2020-01-08 05:49:32    9.1 MiB orc/region=ap-northeast-1/year=2018/month=12/000002_0
2020-01-08 05:49:34    8.3 MiB orc/region=ap-northeast-1/year=2019/month=01/000017_0
2020-01-08 05:49:34   10.5 MiB orc/region=ap-northeast-1/year=2019/month=02/000012_0
2020-01-08 05:49:34  167.1 MiB orc/region=ap-northeast-1/year=2019/month=03/000006_0
2020-01-08 05:49:32  117.9 MiB orc/region=ap-northeast-1/year=2019/month=04/000015_0
2020-01-08 05:49:32   68.8 MiB orc/region=ap-northeast-1/year=2019/month=05/000017_0
2020-01-08 05:49:34   70.2 MiB orc/region=ap-northeast-1/year=2019/month=06/000008_0
2020-01-08 05:49:34   70.9 MiB orc/region=ap-northeast-1/year=2019/month=07/000018_0
2020-01-08 05:49:31   86.1 MiB orc/region=ap-northeast-1/year=2019/month=08/000014_0
2020-01-08 05:49:34   95.9 MiB orc/region=ap-northeast-1/year=2019/month=09/000007_0
2020-01-08 05:49:32  260.9 MiB orc/region=ap-northeast-1/year=2019/month=10/000016_0
2020-01-08 05:49:31  102.4 MiB orc/region=ap-northeast-1/year=2019/month=11/000003_0
2020-01-08 05:49:31  275.0 MiB orc/region=ap-northeast-1/year=2019/month=12/000017_0
2020-01-08 05:49:31   33.7 MiB orc/region=ap-northeast-1/year=2020/month=01/000012_0

Total Objects: 34
   Total Size: 4.8 GiB

手順

$ export AWS_ACCOUNT_ID=<AWS account id>
$ export S3_PATH_SRC=cloudtrail-awslogs/AWSLogs/${AWS_ACCOUNT_ID}/CloudTrail
$ export S3_PATH_DST=cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics
$ aws s3 mb s3://${S3_PATH_DST}
  • リージョン、年、月、日のリストを作成する。
$ nohup aws s3 ls --recursive s3://${S3_PATH_SRC}/|\
perl -F/ -lane 'print qq/$F[3] $F[4] $F[5] $F[6]/'|uniq > partition_column_ymd.txt 2>&1 &
  • リージョン、年、月のリストを作成する。
$ nohup aws s3 ls --recursive s3://${S3_PATH_SRC}|\
perl -F/ -lane 'print qq/$F[3] $F[4]  $F[5]/'|uniq > partition_column_ym.txt 2>&1
  • 年単位のフォルダ以下にファイルを集約する。
$ cat partition_column_ymd.txt|while read LINE
do
    ARR=(${LINE//// })
    echo "${ARR[0]} ${ARR[1]} ${ARR[2]}"
    aws s3 cp --recursive s3://${S3_PATH_SRC}/${ARR[0]}/${ARR[1]}/${ARR[2]}/${ARR[3]}/ \
        s3://${S3_PATH_DST}/json/region=${ARR[0]}/year=${ARR[1]}/month=${ARR[2]}/
done
  • Athena で以下の DDL を実行してテーブルを作成する。
> CREATE EXTERNAL TABLE default.cloudtrail_logs_json (
eventversion STRING,
useridentity STRUCT<
               type:STRING,
               principalid:STRING,
               arn:STRING,
               accountid:STRING,
               invokedby:STRING,
               accesskeyid:STRING,
               userName:STRING,
sessioncontext:STRUCT<
attributes:STRUCT<
               mfaauthenticated:STRING,
               creationdate:STRING>,
sessionissuer:STRUCT<  
               type:STRING,
               principalId:STRING,
               arn:STRING, 
               accountId:STRING,
               userName:STRING>>>,
eventtime STRING,
eventsource STRING,
eventname STRING,
awsregion STRING,
sourceipaddress STRING,
useragent STRING,
errorcode STRING,
errormessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestid STRING,
eventid STRING,
resources ARRAY<STRUCT<
               ARN:STRING,
               accountId:STRING,
               type:STRING>>,
eventtype STRING,
apiversion STRING,
readonly STRING,
recipientaccountid STRING,
serviceeventdetails STRING,
sharedeventid STRING,
vpcendpointid STRING
)
PARTITIONED BY ( 
  `region` string, 
  `year` string,
  `month` string
)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://cloudtrail-awslogs-<AWS account id>-analytics/json/';
> MSCK REPAIR TABLE cloudtrail_logs_json;
  • Athena で CTAS で ORC に変換する。
> CREATE TABLE cloudtrail_logs_orc_tmp
WITH (
        external_location = 's3://cloudtrail-awslogs-<AWS account id>-analytics/orc_tmp/',
        format = 'ORC',
        orc_compression = 'SNAPPY',
        partitioned_by = ARRAY['region', 'year', 'month']
  )
AS SELECT * FROM default.cloudtrail_logs_json;
  • EMR の HDFS に ORC をコピーする。
$ hadoop fs -mkdir /cloudtrail-${AWS_ACCOUNT_ID}-logs/
$ s3-dist-cp --src s3://${S3_PATH_DST}/orc_tmp --dest /cloudtrail-${AWS_ACCOUNT_ID}-logs/orc_tmp/
  • hive を起動する。
$ hive
  • テーブルを作成する。
CREATE EXTERNAL TABLE default.cloudtrail_logs_orc_tmp_hdfs (
  `eventversion` string COMMENT '', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', 
  `eventtime` string COMMENT '', 
  `eventsource` string COMMENT '', 
  `eventname` string COMMENT '', 
  `awsregion` string COMMENT '', 
  `sourceipaddress` string COMMENT '', 
  `useragent` string COMMENT '', 
  `errorcode` string COMMENT '', 
  `errormessage` string COMMENT '', 
  `requestparameters` string COMMENT '', 
  `responseelements` string COMMENT '', 
  `additionaleventdata` string COMMENT '', 
  `requestid` string COMMENT '', 
  `eventid` string COMMENT '', 
  `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', 
  `eventtype` string COMMENT '', 
  `apiversion` string COMMENT '', 
  `readonly` string COMMENT '', 
  `recipientaccountid` string COMMENT '', 
  `serviceeventdetails` string COMMENT '', 
  `sharedeventid` string COMMENT '', 
  `vpcendpointid` string COMMENT '')
PARTITIONED BY ( 
  `region` string COMMENT '', 
  `year` string COMMENT '',
  `month` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  '/cloudtrail-<AWS account id>-logs/orc_tmp/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
    'orc.compress'='SNAPPY');
> MSCK REPAIR TABLE cloudtrail_logs_orc_tmp_hdfs;
  • hive でファイルをマージする。
> set hive.merge.mapfiles=true;
> set hive.merge.mapredfiles=true;
> set hive.merge.size.per.task=256000000;
> set hive.merge.smallfiles.avgsize=32000000;
> set hive.exec.dynamic.partition.mode=nonstrict;
> CREATE TABLE cloudtrail_logs_orc_hdfs LIKE cloudtrail_logs_orc_tmp_hdfs;
> INSERT OVERWRITE TABLE cloudtrail_logs_orc_hdfs PARTITION (region, year, month)
SELECT * FROM cloudtrail_logs_orc_tmp_hdfs;
  • リージョン、年、月でパーティショニングしてマージしたファイルを S3 にアップロードする。
$ s3-dist-cp --src /user/hive/warehouse/cloudtrail_logs_orc_hdfs/ --dest s3://cloudtrail-awslogs-${AWS_ACCOUNT_ID}-analytics/orc/
  • テーブルを作成する。
CREATE EXTERNAL TABLE default.cloudtrail_logs_orc (
  `eventversion` string COMMENT '', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT '', 
  `eventtime` string COMMENT '', 
  `eventsource` string COMMENT '', 
  `eventname` string COMMENT '', 
  `awsregion` string COMMENT '', 
  `sourceipaddress` string COMMENT '', 
  `useragent` string COMMENT '', 
  `errorcode` string COMMENT '', 
  `errormessage` string COMMENT '', 
  `requestparameters` string COMMENT '', 
  `responseelements` string COMMENT '', 
  `additionaleventdata` string COMMENT '', 
  `requestid` string COMMENT '', 
  `eventid` string COMMENT '', 
  `resources` array<struct<arn:string,accountid:string,type:string>> COMMENT '', 
  `eventtype` string COMMENT '', 
  `apiversion` string COMMENT '', 
  `readonly` string COMMENT '', 
  `recipientaccountid` string COMMENT '', 
  `serviceeventdetails` string COMMENT '', 
  `sharedeventid` string COMMENT '', 
  `vpcendpointid` string COMMENT '')
PARTITIONED BY ( 
  `region` string COMMENT '', 
  `year` string COMMENT '',
  `month` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  '/cloudtrail-<AWS account id>-logs/orc/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
    'orc.compress'='SNAPPY');
> MSCK REPAIR TABLE cloudtrail_logs_orc;

補足

  • CTAS で Parquet に変換しようとすると、
CREATE TABLE cloudtrail_logs_parquet
WITH (
        external_location = 's3://cloudtrail-awslogs-<AWS account id>-analytics/parquet/',
        format = 'parquet',
        orc_compression = 'SNAPPY',
        partitioned_by = ARRAY['region', 'year', 'month']
  )
AS SELECT * FROM default.cloudtrail_logs_json;

以下のエラーが発生したため、ORC に変換した。

GENERIC_INTERNAL_ERROR: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead. You may need to manually clean the data at location 's3://...' before retrying. Athena will not delete data in your account.

原因は以下の通り。

This is a known issue and the root cause boils down to the Hive Parquet writers, which Presto uses to write Parquet data (see HIVE-11625).

unable to insert empty map data type into parquet format · Issue #5934 · prestodb/presto · GitHub

The problematic method is DataWritableWriter.writeMap(). Although the key value entry is not null, either key or value can be null. And null keys are not properly handled.

According to parquet-format spec, keys of a Parquet MAP must not be null. Then I think the problem here is that, whether should we silently ignore null keys when writing a map to a Parquet table like what Hive 0.14.0 does, or throw an exception (probably a more descriptive one instead of the one mentioned in the ticket description) like Hive 1.2.1.

[HIVE-11625] Map instances with null keys are not properly handled for Parquet tables - ASF JIRA