ablog

不器用で落着きのない技術者のメモ

Presto で Parquet にクエリすると、参照するカラムのみ読んでいることを確認した

HDFS の Datanode の Flame Graph

f:id:yohei-a:20191208172548p:plain

  • 一番左のスタックをドリルダウンしたもの。

f:id:yohei-a:20191208172603p:plain

Presto Server の Flame Graph

f:id:yohei-a:20191208173139p:plain

  • 一番左のスタックをドリルダウンしたもの、com.facebook.presto.parquet.reader.BinaryColumnReader:::readValue で Columnar Read していると思われる。

f:id:yohei-a:20191208173319p:plain

確認ポイント

  • Presto で Parquet にクエリする際、参照するカラムのデータのみ読む。

環境

手順

インストール
  • マスターノードにログイン
ssh -i ~/mykeytokyo.pem hadoop@ec2-54-***-**-112.ap-northeast-1.compute.amazonaws.com
  • 各種パッケージのインストール
$ sudo yum -y install htop sysstat dstat iotop ltrace strace perf blktrace gnuplot
  • perf-map-agent のインストール
$ sudo yum -y install cmake git
$ git clone --depth=1 https://github.com/jrudolph/perf-map-agent
$ cd perf-map-agent
$ cmake .
$ make
  • FlameGraph のインストール
$ git clone https://github.com/brendangregg/FlameGraph
$ chmod +x FlameGraph/*.pl
$ vi ~/.bashrc
$ export FLAMEGRAPH_DIR=~/FlameGraph
  • sysdig のインストール
$ sudo su -
# rpm --import https://s3.amazonaws.com/download.draios.com/DRAIOS-GPG-KEY.public
# curl -s -o /etc/yum.repos.d/draios.repo https://s3.amazonaws.com/download.draios.com/stable/rpm/draios.repo
# rpm -i https://mirror.us.leaseweb.net/epel/6/i386/epel-release-6-8.noarch.rpm
# yum -y install kernel-devel-$(uname -r)
# yum -y install sysdig
  • bcc のインストール
sudo yum -y install kernel-headers-$(uname -r | cut -d'.' -f1-5)
sudo yum -y install kernel-devel-$(uname -r | cut -d'.' -f1-5)
sudo yum -y install bcc
  • eBPF ツールをダウンロード
$ git clone https://github.com/iovisor/bcc.git
  • JVM のオプションを設定
$ sudo vi /etc/hadoop/conf/hadoop-env.sh
# Extra Java runtime options.  Empty by default.
export HADOOP_OPTS=-XX:+PreserveFramePointer
  • HDFS の Datanode を再起動
$ sudo initctl list|grep hadoop
hadoop-mapreduce-historyserver start/running, process 19605
hadoop-yarn-timelineserver start/running, process 18124
hadoop-kms start/running, process 9781
hadoop-yarn-resourcemanager start/running, process 18294
hadoop-httpfs start/running, process 10292
hadoop-hdfs-datanode start/running, process 10996
hadoop-yarn-proxyserver start/running, process 17977
hadoop-hdfs-namenode start/running, process 10587
hadoop-yarn-nodemanager start/running, process 19139
$ sudo stop hadoop-hdfs-datanode
hadoop-hdfs-datanode stop/waiting
$ sudo status hadoop-hdfs-datanode
hadoop-hdfs-datanode stop/waiting
$ sudo start hadoop-hdfs-datanode
hadoop-hdfs-datanode start/running, process 27016
  • Presto Server を再起動
$ sudo initctl list|grep presto
presto-server start/running, process 17624
$ sudo stop presto-server
presto-server stop/waiting
$ sudo start presto-server
presto-server start/running, process 29763
データを S3 から HDFS にコピーする
  • hdfs ユーザーにスイッチする。
$ sudo su - hdfs
$ hadoop fs -mkdir /amazon-reviews-pds
  • データを S3 から HDFS にコピーする。
$ nohup s3-dist-cp --src  s3://amazon-reviews-pds/ --dest /amazon-reviews-pds  &
  • コピーされたファイルを確認する。
$ hadoop fs -ls -R -h /amazon-reviews-pds/
外部テーブルを定義する
  • hive シェルを起動する。
$ hive
  • データベースを作成する。
> create database if not exists parquet;
> show databases;
> use parquet;
  • テーブルを作成する。
> CREATE EXTERNAL TABLE parquet.amazon_reviews_parquet(
  marketplace string, 
  customer_id string, 
  review_id string, 
  product_id string, 
  product_parent string, 
  product_title string, 
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine string, 
  verified_purchase string, 
  review_headline string, 
  review_body string, 
  review_date bigint, 
  year int)
PARTITIONED BY (product_category string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs:///amazon-reviews-pds/parquet';
> MSCK REPAIR TABLE parquet.amazon_reviews_parquet;
  • hive シェルを終了する。
> quit;

検証

  • ページキャッシュをクリアする
$ sudo su -
# free -g 
# echo 3 > /proc/sys/vm/drop_caches
# free -g 
  • Presto で クエリを実行する。
$ presto-cli
presto> use hive.parquet;
presto:parquet> select count(review_body) from amazon_reviews_parquet;
   _col0   
-----------
 160789772 
(1 row)

Query 20191208_024302_00027_6zu9k, FINISHED, 1 node
Splits: 1,134 total, 1,134 done (100.00%)
0:17 [161M rows, 34GB] [9.43M rows/s, 2GB/s]
  • IOを発行しているプロセスを確認すると HDFS であることが分かる。
$ sudo iotop -aoP
Total DISK READ: 0.00 B/s | Total DISK WRITE: 3.75 K/s
  PID  PRIO  USER     DISK READ  DISK WRITE  SWAPIN     IO>    COMMAND                                                                       
10858 be/4 hdfs         22.56 G★     28.00 K  0.00 %  1.47 % java -Dproc_datanode -Xmx4096m -server -~che.hadoop.hdfs.server.datanode.DataNode
 7262 be/4 ganglia     564.00 K      2.03 M  0.00 %  0.02 % rrdcached -p /tmp/ganglia-rrdcached.pid ~ed/journal -z 1800 -w 1800 -f 3600 -B -g
14848 be/4 root          9.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:2]
 1815 be/3 root          0.00 B     16.00 K  0.00 %  0.00 % [jbd2/nvme0n1p1-]
  • sysdig で確認すると、HDFS の datanode プロセスが HDFS のファイルをオープンしていることが分かる。
$ sudo csysdig
# F2押下、File Opens List を選択
# F4押下、"hdfs" でフィルタ

f:id:yohei-a:20191208102847p:plain

  • 発行されているシステコールの種類を確認する。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(review_body) from amazon_reviews_parquet; # クエリを実行中に
$ sudo strace -fc -o strace_hdfs_count.log -p 10858 # strace をかける
$ cat  strace_hdfs_count.log
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 61.25 3555.752632       42765     83146     11425 futex
 13.32  773.389390         689   1123106           epoll_wait
  8.37  485.990166         433   1122904           epoll_ctl
  4.42  256.738351         458    560237           sendfile ★
  4.30  249.436856         441    566137           write
  4.23  245.286061         433    566554           fstat
  2.50  144.961730     3716967        39        36 restart_syscall
  0.59   34.502151      442335        78           accept
  0.49   28.632010    28632010         1           poll
  0.14    8.069533         437     18455           fadvise64
  0.11    6.362389         421     15120      1186 read
  0.08    4.458707         395     11279           lseek
  0.06    3.725950         396      9401         6 stat
  0.04    2.456106         342      7190           close
  0.04    2.378046         391      6076           prctl
  0.04    2.331405         398      5856        12 open
(中略)
------ ----------- ----------- --------- --------- ----------------
100.00 5805.164067               4101929     12668 total
  • sendfile システコールをトレースする。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(review_body) from amazon_reviews_parquet;
   _col0   
-----------
 160789772 
(1 row)

Query 20191208_024302_00027_6zu9k, FINISHED, 1 node
Splits: 1,134 total, 1,134 done (100.00%)
0:17 [161M rows, 34GB] [9.43M rows/s, 2GB/s]

$ sudo strace -fe sendfile -s 200 -o strace_hdfs_review_body.log -p 10858
  • strace のログを確認すると、sendfile システムコールで 64Kbyte(65536 byte) 単位で読んでいる。
$ head strace_hdfs_review_body.log
3546  sendfile(984, 1042, [16796672], 65536★ <unfinished ...>
3438  sendfile(979, 1007, [86431232], 65536 <unfinished ...>
3546  <... sendfile resumed> )          = 65536
3522  sendfile(981, 1020, [36381184], 65536 <unfinished ...>
3438  <... sendfile resumed> )          = 65536
3522  <... sendfile resumed> )          = 65536
3434  sendfile(1003, 993, [68038656], 65536) = 65536
3546  sendfile(984, 1042, [16862208], 65536) = 65536
3438  sendfile(979, 1007, [86496768], 65536) = 65536
3422  sendfile(971, 1032, [101465600], 65536 <unfinished ...>
  • ほぼ、64KB で読んでいる。
$ perl -lane '$F[1]=~/^sendfile/ and ($s)=$F[4]=~/^(\d+)/ and print $s' strace_hdfs_review_body.log|sort|uniq -c|sort -r|head -10
 465521 65536
     24 165
     22 177
     21 279
     20 382
     20 288
     20 24576
     19 240
     18 513
     17 505
  • sendfile システコールをトレースする。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(*) from amazon_reviews_parquet;
   _col0   
-----------
 160796570 
(1 row)

Query 20191208_024332_00028_6zu9k, FINISHED, 1 node
Splits: 1,124 total, 1,124 done (100.00%)
0:07 [161M rows, 0B] [23M rows/s, 0B/s]

$ sudo strace -fe sendfile -s 200 -o strace_hdfs_asterisk.log -p 10858
$ head strace_hdfs_asterisk.log 
14649 sendfile(1047, 1004, [73965568], 74) = 74
14596 sendfile(1001, 1007, [35630080], 37) = 37
14589 sendfile(971, 1025, [84801024], 60) = 60
14676 sendfile(1086, 1027, [93463040], 250) = 250
14637 sendfile(1060, 1028, [26791424], 331) = 331
14660 sendfile(1056, 1004, [8282112], 29) = 29
14651 sendfile(1006, 1039, [2904576], 35) = 35
14687 sendfile(1108, 1044, [114178560], 345) = 345
14631 sendfile(1054, 1039, [65089024], 361) = 361
14682 sendfile(1114, 972, [65166336], 126) = 126
  • HDFS の datanode のプロセスはほとんど読んでいない。
$ sudo iotop -aoP
Total DISK READ: 11.24 K/s | Total DISK WRITE: 3.75 K/s
  PID  PRIO  USER     DISK READ  DISK WRITE  SWAPIN     IO>    COMMAND                                                                       
 7262 be/4 ganglia     640.00 K      2.39 M  0.00 %  0.00 % rrdcached -p /tmp/ganglia-rrdcached.pid ~ed/journal -z 1800 -w 1800 -f 3600 -B -g
26568 be/4 root         18.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:1]
 1815 be/3 root          0.00 B     16.00 K  0.00 %  0.00 % [jbd2/nvme0n1p1-]
 3245 be/4 root         64.00 K      0.00 B  0.00 %  0.00 % [xfsaild/nvme1n1]
12037 be/4 root          9.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:0]
28666 be/4 hadoop      556.00 K     68.00 K  0.00 %  0.00 % java -cp /usr/share/aws/emr/hadoop-state~oop-state-pusher/hadoop-state-pusher.pid
10858 be/4 hdfs        708.00 K★     60.00 K  0.00 %  0.00 % java -Dproc_datanode -Xmx4096m -server -~che.hadoop.hdfs.server.datanode.DataNode
 4439 be/4 root          4.00 K      4.00 K  0.00 %  0.00 % rsyslogd -i /var/run/syslogd.pid -c 5
23
  • IOサイズが小さく、回数も少ない。
$ perl -lane '$F[1]=~/^sendfile/ and ($s)=$F[4]=~/^(\d+)/ and print $s' strace_hdfs_asterisk.log|sort|uniq -c|sort -r|head -10
     23 165
     21 279
     21 177
     19 382
     19 288
     18 513
     18 240
     17 505
     17 29861
     17 29689
  • Flame Graph を取得する
$ ps -fU hdfs,presto
UID        PID  PPID  C STIME TTY          TIME CMD
hdfs     10399     1  0 Dec07 ?        00:02:40 /usr/lib/jvm/java-openjdk/bin/java -Dproc_namenode -Xmx26419m -server -XX:OnOutOfMemoryError=
hdfs     26883     1  5 07:30 ?        00:02:04 /usr/lib/jvm/java-openjdk/bin/java -Dproc_datanode -Xmx4096m -XX:+PreserveFramePointer -serve
presto   29762     1 87 07:37 ?        00:28:04 java -cp /usr/lib/presto/lib/* -verbose:class -server -Xmx214026810294 -XX:+UseG1GC -XX:G1Hea
$ sudo su -
# cd /home/hadoop/perf-map-agent/bin
# ./perf-java-flames 26883 & ./perf-java-flames 29762
SYNOPSIS         top
       #include <sys/sendfile.h>

       ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
DESCRIPTION         top
       sendfile() copies data between one file descriptor and another.
       Because this copying is done within the kernel, sendfile() is more
       efficient than the combination of read(2) and write(2), which would
       require transferring data to and from user space.

       in_fd should be a file descriptor opened for reading and out_fd
       should be a descriptor opened for writing.

       If offset is not NULL, then it points to a variable holding the file
       offset from which sendfile() will start reading data from in_fd.
       When sendfile() returns, this variable will be set to the offset of
       the byte following the last byte that was read.  If offset is not
       NULL, then sendfile() does not modify the file offset of in_fd;
       otherwise the file offset is adjusted to reflect the number of bytes
       read from in_fd.

       If offset is NULL, then data will be read from in_fd starting at the
       file offset, and the file offset will be updated by the call.

       count is the number of bytes to copy between the file descriptors.
transferTo
public abstract long transferTo(long position,
                                long count,
                                WritableByteChannel target)
                         throws IOException
Transfers bytes from this channel's file to the given writable byte channel.
An attempt is made to read up to count bytes starting at the given position in this channel's file and write them to the target channel. An invocation of this method may or may not transfer all of the requested bytes; whether or not it does so depends upon the natures and states of the channels. Fewer than the requested number of bytes are transferred if this channel's file contains fewer than count bytes starting at the given position, or if the target channel is non-blocking and it has fewer than count bytes free in its output buffer.

This method does not modify this channel's position. If the given position is greater than the file's current size then no bytes are transferred. If the target channel has a position then bytes are written starting at that position and then the position is incremented by the number of bytes written.

This method is potentially much more efficient than a simple loop that reads from this channel and writes to the target channel. Many operating systems can transfer bytes directly from the filesystem cache to the target channel without actually copying them.

The transferTo() method transfers data from the file channel to the given writable byte channel. Internally, it depends on the underlying operating system’s support for zero copy; in UNIX and various flavors of Linux, this call is routed to the sendfile() system call, shown in Listing 3, which transfers data from one file descriptor to another:

Efficient data transfer through zero copy – IBM Developer

To Do

  • blktrace + bttgnuplot でグラフ化(* と review_body)
  • Web UI で確認
  • CWメトリクスの確認
  • 旧いPrestoで
  • プロセス間通信の確認
  • atena, S3

Spark Meetup Tokyo #2 (Spark+AI Summit EU 2019) に参加してきた

f:id:yohei-a:20191201004037j:plain
Spark Meetup Tokyo #2 (Spark+AI Summit EU 2019) - connpass に参加してきた。今度は Spark について、どうでもいことに Dive deep して話してみるのも面白そう。

Koalasの開発状況 (Updates)
  • by Takuya Ueshin (Twitter:@ueshin) @ Databricks
  • Koalas の開発状況の紹介。Koalas は Apache Spark 上に pandas DataFrame API を実装し, データサイエンティストのビッグデータに対する生産性向上を目的としたプロジェクト。 pandas (tests, smaller datasets) と Spark (distributed datasets) の両方で動く単一のコードベースとできることを目指している。

Quick Overview of Upcoming Spark 3.0 + α(SAIS Europe 2019で個人的に興味のあった発表紹介)
  • by Takeshi Yamamuro (Twitter:@maropu) @ NTT
  • Spark 3.0 Preview Release の Adaptive Execution in Spark SQL、Dynamic Partition Pruning、New Option in EXPLAIN、Dataframe Cogroup、Join Strategy Hints、PostgreSQL Dialect Support の紹介、Facebook の Hive から Spark への移行や Script Transformation の紹介など。

Project Hydrogenの最新情報
  • Kazuaki Ishizaki (Twitter:@kiszk) @ IBM Research - Tokyo
  • SparkとAIフレームワークを統合するHydrogenプロジェクトの紹介。想定しているユースケース、分散学習、推論 – 実際の使い方、使われ方 – 最近の更新について。

Delta アーキテクチャ

LT1: Koalasのココが良いよね
  • by Harutaka Kawamura (Twitter:harupy) @ ARISE
  • Koalas の特徴や良いところの紹介。

docs.google.com

RDS PostgreSQL の接続・切断ログを CloudWatch Logs にエクスポートする

RDS PostgreSQL で log_connections=1、log_disconnections=1 に設定して、接続・切断ログを記録し、CloudWatch Logs にエクスポートして確認したメモ。

設定する

  • パラメータグループを作成する(パラメータグループファミリー: postgres11)。
    • log_connections=1
    • log_disconnections=1
  • RDS PostgreSQL インスタンスを作成する。
    • 作成したパラメータグループを設定する。
    • ログのエクスポートで Postgresql log にチェックを入れる。

確認する

$ psql "host=postgres-m5xl.************.ap-northeast-1.rds.amazonaws.com user=awsuser dbname=mydb port=5432"
postgres-m5xl awsuser 15:49 => \q
$
  • CloudWatch Logs でロググループを確認する。
    • /aws/rds/instance/postgres-m5xl/postgresqlpostgres-m5xl.0
2019-11-14 15:12:58 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):[unknown]@[unknown]:[9604]:LOG:  00000: connection received: host=ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com port=50692
2019-11-14 15:12:58 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):[unknown]@[unknown]:[9604]:LOCATION:  BackendInitialize, postmaster.c:4254
2019-11-14 15:12:58 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):awsuser@mydb:[9604]:LOG:  00000: connection authorized: user=awsuser database=mydb SSL enabled (protocol=TLSv1.2, cipher=ECDHE-RSA-AES256-GCM-SHA384, bits=256, compression=off)
2019-11-14 15:12:58 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):awsuser@mydb:[9604]:LOCATION:  PerformAuthentication, postinit.c:274
2019-11-14 15:13:22 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):awsuser@mydb:[9604]:LOG:  00000: disconnection: session time: 0:00:24.122 user=awsuser database=mydb host=ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com port=50692
2019-11-14 15:13:22 UTC:ec2-**-***-**-16.ap-northeast-1.compute.amazonaws.com(50692):awsuser@mydb:[9604]:LOCATION:  log_disconnections, postgres.c:4618

S3 で特定のプリフィックス以下のオブジェクトの過去バージョンを削除する

S3 で特定のプリフィックス以下のオブジェクトの過去バージョンを削除する。以下はバケット test-bucket の test/ 以下のオブジェクトの過去バージョンを削除するケース。UTC 0:00 になると削除される。

  • lifecycle.json を作成する
{
    "Rules": [
        {
            "Status": "Enabled",
            "Prefix": "test/", // 対象のプリフィックスを指定する
            "NoncurrentVersionExpiration": {
                "NoncurrentDays": 1 // 1日以上経過した過去バージョンを削除する
            },
            "ID": "Delete all"
        }
    ]
}
  • ライフサイクルポリシーを適用する。
$ aws s3api put-bucket-lifecycle --bucket test-bucket --lifecycle-configuration file://lifecycle.json
  • ライフサイクルポリシーを確認する。
$ aws s3api get-bucket-lifecycle --bucket test-bucket 
{
    "Rules": [
        {
            "Status": "Enabled",
            "Prefix": "test/",
            "NoncurrentVersionExpiration": {
                "NoncurrentDays": 1
            },
            "ID": "Delete all"
        }
    ]
}
  • ライフサイクルポリシーを削除したい場合は、delete-bucket-lifecycle で削除する。
$ aws s3api delete-bucket-lifecycle --bucket test-bucket

参考

Amazon S3 は、ルールに指定された日数をオブジェクトの次の新しいバージョンが作成された時間に加算し、得られた日時を翌日の午前 00:00 (UTC) に丸めることで、時間を算出します。たとえば、バケット内に 2014 年 1 月 1 日の午前 10 時半 (UTC) に作成されたオブジェクトの現行バージョンがあるとします。現行バージョンを置き換えるオブジェクトの新しいバージョンが 2014 年 1 月 15 日の午前 10 時半 (UTC) に作成され、3 日間の移行ルールを指定すると、オブジェクトの移行日は 2014 年 1 月 19 日の午前 0 時 (UTC) となります。

ライフサイクル設定の要素 - Amazon Simple Storage Service

S3 オブジェクトの過去バージョンを一括削除する

バージョニングを有効化している S3 バケットでオブジェクトの過去バージョンを一括削除する手順。

  • 任意の S3 バケットの過去バージョンを確認する。
% aws s3api list-object-versions --bucket test-bucket|jq -r '.Versions[]|select(.IsLatest==false)|@text "\(.Key) \(.VersionId)"'
test.txt SFDNnzR6u1gfaeSV6ObrLubM.BoG5SiM
test.txt kUD7TJofypM6OCKj6kKPLrsWQC3f6gCm
test.txt ypCUBbPprcBd4mEOpjSr3FqFsfSjNetN
test.txt lIL_Lx69anh5NEc54YpAq2wOdoct5KB4
test/test.txt .r_ZZYydapeG8hzufYOFUuUXoHg0vRsl
test/test.txt TY2KZY4Ny_HeY7RU9q4th_buEr5Rrtca
test/test2.txt xGFYyS1BHb.ue.waKQiNfTSwQPrPuEWo
test/test2.txt oDt8wsl0t.ZgU0NJ5K6tkOrTQQ4cfoDm
test/test2.txt t4yV.zYNTP6ZXi0PCftXqhKk1v9kkA73
test2.txt ujot_grq_oL3NrjGpgXC8Y_5eFR9RMr5
test2.txt NFZNfh.vXXivtWekN1LSDly_75Fyh0Yl
test2.txt w2hKNGVg6M6hlUwjw5rVD9ocOGrXlX.v
  • 過去バージョンを一括削除する。
% aws s3api list-object-versions --bucket test-bucket|jq -r '.Versions[]|select(.IsLatest==false)|@text "\(.Key) \(.VersionId)"'|while read KEY VER
do
        aws s3api delete-object --bucket test-bucket --key ${KEY} --version-id ${VER}
done

{
    "VersionId": "SFDNnzR6u1gfaeSV6ObrLubM.BoG5SiM"
}
{
    "VersionId": "kUD7TJofypM6OCKj6kKPLrsWQC3f6gCm"
}
{
    "VersionId": "ypCUBbPprcBd4mEOpjSr3FqFsfSjNetN"
}
{
    "VersionId": "lIL_Lx69anh5NEc54YpAq2wOdoct5KB4"
}
{
    "VersionId": ".r_ZZYydapeG8hzufYOFUuUXoHg0vRsl"
}
{
    "VersionId": "TY2KZY4Ny_HeY7RU9q4th_buEr5Rrtca"
}
{
    "VersionId": "xGFYyS1BHb.ue.waKQiNfTSwQPrPuEWo"
}
{
    "VersionId": "oDt8wsl0t.ZgU0NJ5K6tkOrTQQ4cfoDm"
}
{
    "VersionId": "t4yV.zYNTP6ZXi0PCftXqhKk1v9kkA73"
}
{
    "VersionId": "ujot_grq_oL3NrjGpgXC8Y_5eFR9RMr5"
}
{
    "VersionId": "NFZNfh.vXXivtWekN1LSDly_75Fyh0Yl"
}
{
    "VersionId": "w2hKNGVg6M6hlUwjw5rVD9ocOGrXlX.v"
}
  • 過去バージョンが削除されていることを確認する。
% aws s3api list-object-versions --bucket test-bucket|jq -r '.Versions[]|select(.IsLatest==false)|@text "\(.Key) \(.VersionId)"'

Athena で改行を含む CSV を扱いたい場合は Glue ジョブで Parquet に変換する

データの中身に改行を含む CSV を Athena でクエリすると正しく扱えなかったが、Glue ジョブで CSV を Parquet に変換すると改行を含むデータを扱うことができた。おそらく OpenCSVSerDe は改行に対応していないが、Parquet SerDe は改行に対応しているからではないかと思われる。

  • cr.csv を用意する。
c1,c2,c3_string
1,1,"test string"
2,2,"text string"
3,3,"string
with cr"
4,4,"text string"
  • S3 にアップロードする。
  • Glue のクローラで CSV をカタログに登録する。
  • Athena からCSV を参照すると改行で表示が崩れている。

f:id:yohei-a:20191015001912p:plain

  • Glue ジョブで Parquet に変換する。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

## initialize
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init('csv2parquet')

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "cr_csv", transformation_ctx = "datasource0")

## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()

## For large data sets, try to cache the data will accelerate later execution.
df.cache()

castedDf = df.withColumn("c1", df.c1.cast("decimal(38,0)")) \
    .withColumn("c2", df.c2.cast("decimal(38,0)")) \
    .withColumn("c3_string", df.c3_string.cast("varchar(30)"))

castedDf.write.partitionBy(["c1"]).mode("overwrite").parquet("s3://az-cr-test/parquet/",compression='snappy')

job.commit()
  • Glue のクローラで Parquet をカタログに登録する。
  • Athena からParquet を参照すると改行のある行が1行として扱われている。

f:id:yohei-a:20191015001834p:plain

  • 結果セットをダウンロードしてみると改行は残っている。
"c2","c3_string","c1"
"3","string
with cr","3"
"4","text string","4"
"1","test string","1"
"2","text string","2"

S3 に 1MB のオブジェクトを1億個作成する手順

S3 に 1MB のオブジェクトを1億個作成する手順(実際にはやらない)。

  • s3-cp-100m.sh
#!/bin/bash
date
dd if=/dev/urandom of=1mb.dat bs=1m count=1
aws s3 cp 1mb.dat s3://az-cp-src/
date
seq -w 1 100000000|xargs -t -P512 -I{} aws s3 cp s3://az-cp-src/1mb.dat s3://az-100m/{}.dat > /dev/null
date
  • 実行する
$ nohup ./s3-cp-100m.sh > s3-cp-100m.log 2>&1 &
  • ログを確認する
$ tail -f s3-cp-100m.log
nohup: ignoring input
Mon Oct 14 01:57:28 UTC 2019
dd: invalid number ‘1m’
Completed 256.0 KiB/1.0 MiB (1.7 MiB/s) with 1 file(s) remaining^MCompleted 512.0 KiB/1.0 MiB (3.3 MiB/s) with 1 file(s) remaining^MCompleted 768.0 KiB/1.0 MiB (4.9 MiB/s) with 1 file(s) remaining^MCompleted 1.0 MiB/1.0 MiB (6.4 MiB/s) with 1 file(s) remaining  ^Mupload: ./1mb.dat to s3://az-cp-src/1mb.dat
Mon Oct 14 01:57:29 UTC 2019
aws s3 cp s3://az-cp-src/1mb.dat s3://az-100m/000000001.dat
aws s3 cp s3://az-cp-src/1mb.dat s3://az-100m/000000002.dat
aws s3 cp s3://az-cp-src/1mb.dat s3://az-100m/000000003.dat
  • htop で実行状況を確認する。
$ sudo yum -y install htop
$ htop