ablog

不器用で落着きのない技術者のメモ

Presto で Parquet にクエリすると、参照するカラムのみ読んでいることを確認した

HDFS の Datanode の Flame Graph

f:id:yohei-a:20191208172548p:plain

  • 一番左のスタックをドリルダウンしたもの。

f:id:yohei-a:20191208172603p:plain

Presto Server の Flame Graph

f:id:yohei-a:20191208173139p:plain

  • 一番左のスタックをドリルダウンしたもの、com.facebook.presto.parquet.reader.BinaryColumnReader:::readValue で Columnar Read していると思われる。

f:id:yohei-a:20191208173319p:plain

確認ポイント

  • Presto で Parquet にクエリする際、参照するカラムのデータのみ読む。

環境

手順

インストール
  • マスターノードにログイン
ssh -i ~/mykeytokyo.pem hadoop@ec2-54-***-**-112.ap-northeast-1.compute.amazonaws.com
  • 各種パッケージのインストール
$ sudo yum -y install htop sysstat dstat iotop ltrace strace perf blktrace gnuplot
  • perf-map-agent のインストール
$ sudo yum -y install cmake git
$ git clone --depth=1 https://github.com/jrudolph/perf-map-agent
$ cd perf-map-agent
$ cmake .
$ make
  • /usr/bin/bno_plot.py の以下をコメントする
# os.system(‘/bin/rm -rf ’ + tmpdir)”
  • FlameGraph のインストール
$ git clone https://github.com/brendangregg/FlameGraph
$ chmod +x FlameGraph/*.pl
$ vi ~/.bashrc
$ export FLAMEGRAPH_DIR=~/FlameGraph
  • sysdig のインストール
$ sudo su -
# rpm --import https://s3.amazonaws.com/download.draios.com/DRAIOS-GPG-KEY.public
# curl -s -o /etc/yum.repos.d/draios.repo https://s3.amazonaws.com/download.draios.com/stable/rpm/draios.repo
# rpm -i https://mirror.us.leaseweb.net/epel/6/i386/epel-release-6-8.noarch.rpm
# yum -y install kernel-devel-$(uname -r)
# yum -y install sysdig
  • bcc のインストール
sudo yum -y install kernel-headers-$(uname -r | cut -d'.' -f1-5)
sudo yum -y install kernel-devel-$(uname -r | cut -d'.' -f1-5)
sudo yum -y install bcc
  • eBPF ツールをダウンロード
$ git clone https://github.com/iovisor/bcc.git
  • JVM のオプションを設定
$ sudo vi /etc/hadoop/conf/hadoop-env.sh
# Extra Java runtime options.  Empty by default.
export HADOOP_OPTS=-XX:+PreserveFramePointer
  • HDFS の Datanode を再起動
$ sudo initctl list|grep hadoop
hadoop-mapreduce-historyserver start/running, process 19605
hadoop-yarn-timelineserver start/running, process 18124
hadoop-kms start/running, process 9781
hadoop-yarn-resourcemanager start/running, process 18294
hadoop-httpfs start/running, process 10292
hadoop-hdfs-datanode start/running, process 10996
hadoop-yarn-proxyserver start/running, process 17977
hadoop-hdfs-namenode start/running, process 10587
hadoop-yarn-nodemanager start/running, process 19139
$ sudo stop hadoop-hdfs-datanode
hadoop-hdfs-datanode stop/waiting
$ sudo status hadoop-hdfs-datanode
hadoop-hdfs-datanode stop/waiting
$ sudo start hadoop-hdfs-datanode
hadoop-hdfs-datanode start/running, process 27016
  • Presto Server を再起動
$ sudo initctl list|grep presto
presto-server start/running, process 17624
$ sudo stop presto-server
presto-server stop/waiting
$ sudo start presto-server
presto-server start/running, process 29763
データを S3 から HDFS にコピーする
  • hdfs ユーザーにスイッチする。
$ sudo su - hdfs
$ hadoop fs -mkdir /amazon-reviews-pds
  • データを S3 から HDFS にコピーする。
$ nohup s3-dist-cp --src  s3://amazon-reviews-pds/ --dest /amazon-reviews-pds  &
# s3-dist-cp がない場合は AWS CLI でコピーする
$ nohup aws s3 cp --recursive  s3://amazon-reviews-pds/ hdfs://amazon-reviews-pds &
  • コピーされたファイルを確認する。
$ hadoop fs -ls -R -h /amazon-reviews-pds/
外部テーブルを定義する
  • hive シェルを起動する。
$ hive
  • データベースを作成する。
> create database if not exists parquet;
> show databases;
> use parquet;
  • テーブルを作成する。
> CREATE EXTERNAL TABLE parquet.amazon_reviews_parquet(
  marketplace string, 
  customer_id string, 
  review_id string, 
  product_id string, 
  product_parent string, 
  product_title string, 
  star_rating int, 
  helpful_votes int, 
  total_votes int, 
  vine string, 
  verified_purchase string, 
  review_headline string, 
  review_body string, 
  review_date bigint, 
  year int)
PARTITIONED BY (product_category string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs:///amazon-reviews-pds/parquet';
> MSCK REPAIR TABLE parquet.amazon_reviews_parquet;
  • hive シェルを終了する。
> quit;

検証

  • ページキャッシュをクリアする
$ sudo su -
# free -g 
# echo 3 > /proc/sys/vm/drop_caches
# free -g 
  • Presto で クエリを実行する。
$ presto-cli
presto> use hive.parquet;
presto:parquet> select count(review_body) from amazon_reviews_parquet;
   _col0   
-----------
 160789772 
(1 row)

Query 20191208_024302_00027_6zu9k, FINISHED, 1 node
Splits: 1,134 total, 1,134 done (100.00%)
0:17 [161M rows, 34GB] [9.43M rows/s, 2GB/s]
  • IOを発行しているプロセスを確認すると HDFS であることが分かる。
$ sudo iotop -aoP
Total DISK READ: 0.00 B/s | Total DISK WRITE: 3.75 K/s
  PID  PRIO  USER     DISK READ  DISK WRITE  SWAPIN     IO>    COMMAND                                                                       
10858 be/4 hdfs         22.56 G★     28.00 K  0.00 %  1.47 % java -Dproc_datanode -Xmx4096m -server -~che.hadoop.hdfs.server.datanode.DataNode
 7262 be/4 ganglia     564.00 K      2.03 M  0.00 %  0.02 % rrdcached -p /tmp/ganglia-rrdcached.pid ~ed/journal -z 1800 -w 1800 -f 3600 -B -g
14848 be/4 root          9.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:2]
 1815 be/3 root          0.00 B     16.00 K  0.00 %  0.00 % [jbd2/nvme0n1p1-]
  • sysdig で確認すると、HDFS の datanode プロセスが HDFS のファイルをオープンしていることが分かる。
$ sudo csysdig
# F2押下、File Opens List を選択
# F4押下、"hdfs" でフィルタ

f:id:yohei-a:20191208102847p:plain

  • 発行されているシステコールの種類を確認する。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(review_body) from amazon_reviews_parquet; # クエリを実行中に
$ sudo strace -fc -o strace_hdfs_count.log -p 10858 # strace をかける
$ cat  strace_hdfs_count.log
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 61.25 3555.752632       42765     83146     11425 futex
 13.32  773.389390         689   1123106           epoll_wait
  8.37  485.990166         433   1122904           epoll_ctl
  4.42  256.738351         458    560237           sendfile ★
  4.30  249.436856         441    566137           write
  4.23  245.286061         433    566554           fstat
  2.50  144.961730     3716967        39        36 restart_syscall
  0.59   34.502151      442335        78           accept
  0.49   28.632010    28632010         1           poll
  0.14    8.069533         437     18455           fadvise64
  0.11    6.362389         421     15120      1186 read
  0.08    4.458707         395     11279           lseek
  0.06    3.725950         396      9401         6 stat
  0.04    2.456106         342      7190           close
  0.04    2.378046         391      6076           prctl
  0.04    2.331405         398      5856        12 open
(中略)
------ ----------- ----------- --------- --------- ----------------
100.00 5805.164067               4101929     12668 total
  • sendfile システコールをトレースする。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(review_body) from amazon_reviews_parquet;
   _col0   
-----------
 160789772 
(1 row)

Query 20191208_024302_00027_6zu9k, FINISHED, 1 node
Splits: 1,134 total, 1,134 done (100.00%)
0:17 [161M rows, 34GB] [9.43M rows/s, 2GB/s]

$ sudo strace -fe sendfile -s 200 -o strace_hdfs_review_body.log -p 10858
  • strace のログを確認すると、sendfile システムコールで 64Kbyte(65536 byte) 単位で読んでいる。
$ head strace_hdfs_review_body.log
3546  sendfile(984, 1042, [16796672], 65536★ <unfinished ...>
3438  sendfile(979, 1007, [86431232], 65536 <unfinished ...>
3546  <... sendfile resumed> )          = 65536
3522  sendfile(981, 1020, [36381184], 65536 <unfinished ...>
3438  <... sendfile resumed> )          = 65536
3522  <... sendfile resumed> )          = 65536
3434  sendfile(1003, 993, [68038656], 65536) = 65536
3546  sendfile(984, 1042, [16862208], 65536) = 65536
3438  sendfile(979, 1007, [86496768], 65536) = 65536
3422  sendfile(971, 1032, [101465600], 65536 <unfinished ...>
  • ほぼ、64KB で読んでいる。
$ perl -lane '$F[1]=~/^sendfile/ and ($s)=$F[4]=~/^(\d+)/ and print $s' strace_hdfs_review_body.log|sort|uniq -c|sort -r|head -10
 465521 65536
     24 165
     22 177
     21 279
     20 382
     20 288
     20 24576
     19 240
     18 513
     17 505
  • sendfile システコールをトレースする。
# echo 3 > /proc/sys/vm/drop_caches # ページキャッシュをクリアして
presto:parquet> select count(*) from amazon_reviews_parquet;
   _col0   
-----------
 160796570 
(1 row)

Query 20191208_024332_00028_6zu9k, FINISHED, 1 node
Splits: 1,124 total, 1,124 done (100.00%)
0:07 [161M rows, 0B] [23M rows/s, 0B/s]

$ sudo strace -fe sendfile -s 200 -o strace_hdfs_asterisk.log -p 10858
$ head strace_hdfs_asterisk.log 
14649 sendfile(1047, 1004, [73965568], 74) = 74
14596 sendfile(1001, 1007, [35630080], 37) = 37
14589 sendfile(971, 1025, [84801024], 60) = 60
14676 sendfile(1086, 1027, [93463040], 250) = 250
14637 sendfile(1060, 1028, [26791424], 331) = 331
14660 sendfile(1056, 1004, [8282112], 29) = 29
14651 sendfile(1006, 1039, [2904576], 35) = 35
14687 sendfile(1108, 1044, [114178560], 345) = 345
14631 sendfile(1054, 1039, [65089024], 361) = 361
14682 sendfile(1114, 972, [65166336], 126) = 126
  • HDFS の datanode のプロセスはほとんど読んでいない。
$ sudo iotop -aoP
Total DISK READ: 11.24 K/s | Total DISK WRITE: 3.75 K/s
  PID  PRIO  USER     DISK READ  DISK WRITE  SWAPIN     IO>    COMMAND                                                                       
 7262 be/4 ganglia     640.00 K      2.39 M  0.00 %  0.00 % rrdcached -p /tmp/ganglia-rrdcached.pid ~ed/journal -z 1800 -w 1800 -f 3600 -B -g
26568 be/4 root         18.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:1]
 1815 be/3 root          0.00 B     16.00 K  0.00 %  0.00 % [jbd2/nvme0n1p1-]
 3245 be/4 root         64.00 K      0.00 B  0.00 %  0.00 % [xfsaild/nvme1n1]
12037 be/4 root          9.00 K      0.00 B  0.00 %  0.00 % [kworker/u64:0]
28666 be/4 hadoop      556.00 K     68.00 K  0.00 %  0.00 % java -cp /usr/share/aws/emr/hadoop-state~oop-state-pusher/hadoop-state-pusher.pid
10858 be/4 hdfs        708.00 K★     60.00 K  0.00 %  0.00 % java -Dproc_datanode -Xmx4096m -server -~che.hadoop.hdfs.server.datanode.DataNode
 4439 be/4 root          4.00 K      4.00 K  0.00 %  0.00 % rsyslogd -i /var/run/syslogd.pid -c 5
23
  • IOサイズが小さく、回数も少ない。
$ perl -lane '$F[1]=~/^sendfile/ and ($s)=$F[4]=~/^(\d+)/ and print $s' strace_hdfs_asterisk.log|sort|uniq -c|sort -r|head -10
     23 165
     21 279
     21 177
     19 382
     19 288
     18 513
     18 240
     17 505
     17 29861
     17 29689
  • Flame Graph を取得する
$ ps -fU hdfs,presto
UID        PID  PPID  C STIME TTY          TIME CMD
hdfs     10399     1  0 Dec07 ?        00:02:40 /usr/lib/jvm/java-openjdk/bin/java -Dproc_namenode -Xmx26419m -server -XX:OnOutOfMemoryError=
hdfs     26883     1  5 07:30 ?        00:02:04 /usr/lib/jvm/java-openjdk/bin/java -Dproc_datanode -Xmx4096m -XX:+PreserveFramePointer -serve
presto   29762     1 87 07:37 ?        00:28:04 java -cp /usr/lib/presto/lib/* -verbose:class -server -Xmx214026810294 -XX:+UseG1GC -XX:G1Hea
$ sudo su -
# cd /home/hadoop/perf-map-agent/bin
# export FLAMEGRAPH_DIR=/home/hadoop/FlameGraph/
# export PERF_RECORD_SECONDS=15
# ./perf-java-flames 26883 & ./perf-java-flames 29762
  • blktrace を取得する
$ sudo su -
# blktrace -w 15 -d /dev/nvme1n1p2 -o nvme1n1p2 & blktrace -w 15 -d /dev/nvme2n1 -o nvme2n1 &
  • btt でテキストに変換する
# ls nvme1n1p2.blktrace.*|while read LINE
 do
 btt -i ${LINE} -B ${LINE}.btt
 done
# ls nvme2n1.blktrace.*|while read LINE
 do
 btt -i ${LINE} -B ${LINE}.btt
 done
  • 1ファイルにまとめる
# cat nvme1n1p2.blktrace.*.btt_*_c.dat > nvme1n1p2_btt_c_all_c.dat
# cat nvme2n1.blktrace.*.btt_*_c.dat > nvme2n1_btt_c_all_c.dat
# bno_plot.py  nvme1n1p2_btt_c_all_c.dat
# bno_plot.py  nvme2n1_btt_c_all_c.dat
  • 生成されたファイルを編集する
# cd /tmp
# ls -ltr
  • gnuplot のコマンドを編集する
# vi plot.cmds
set terminal png # 追記
set output 'nvme1n1p2_btt_c_all_c.png'  # 追記
set title 'btt Generated Block Accesses'
set xlabel 'Time (secs)'
set ylabel 'Block Number'
set zlabel '# Blocks per IO'
set grid
splot 'nvme1n1p2_btt_c_all_c.dat'
set output  # 追記
  • 画像を生成する
# gnuplot plot.cmds
SYNOPSIS         top
       #include <sys/sendfile.h>

       ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
DESCRIPTION         top
       sendfile() copies data between one file descriptor and another.
       Because this copying is done within the kernel, sendfile() is more
       efficient than the combination of read(2) and write(2), which would
       require transferring data to and from user space.

       in_fd should be a file descriptor opened for reading and out_fd
       should be a descriptor opened for writing.

       If offset is not NULL, then it points to a variable holding the file
       offset from which sendfile() will start reading data from in_fd.
       When sendfile() returns, this variable will be set to the offset of
       the byte following the last byte that was read.  If offset is not
       NULL, then sendfile() does not modify the file offset of in_fd;
       otherwise the file offset is adjusted to reflect the number of bytes
       read from in_fd.

       If offset is NULL, then data will be read from in_fd starting at the
       file offset, and the file offset will be updated by the call.

       count is the number of bytes to copy between the file descriptors.
transferTo
public abstract long transferTo(long position,
                                long count,
                                WritableByteChannel target)
                         throws IOException
Transfers bytes from this channel's file to the given writable byte channel.
An attempt is made to read up to count bytes starting at the given position in this channel's file and write them to the target channel. An invocation of this method may or may not transfer all of the requested bytes; whether or not it does so depends upon the natures and states of the channels. Fewer than the requested number of bytes are transferred if this channel's file contains fewer than count bytes starting at the given position, or if the target channel is non-blocking and it has fewer than count bytes free in its output buffer.

This method does not modify this channel's position. If the given position is greater than the file's current size then no bytes are transferred. If the target channel has a position then bytes are written starting at that position and then the position is incremented by the number of bytes written.

This method is potentially much more efficient than a simple loop that reads from this channel and writes to the target channel. Many operating systems can transfer bytes directly from the filesystem cache to the target channel without actually copying them.

The transferTo() method transfers data from the file channel to the given writable byte channel. Internally, it depends on the underlying operating system’s support for zero copy; in UNIX and various flavors of Linux, this call is routed to the sendfile() system call, shown in Listing 3, which transfers data from one file descriptor to another:

Efficient data transfer through zero copy – IBM Developer

To Do

  • blktrace + bttgnuplot でグラフ化(* と review_body)
  • Web UI で確認
  • CWメトリクスの確認
  • 旧いPrestoで
  • プロセス間通信の確認
  • atena, S3
  • parquet reader のコード確認
  • Uber の話