ablog

不器用で落着きのない技術者のメモ

gRPC と Protocol Buffers について

gRPC と Protocol Buffers とは

gRPC は、RPC (Remote Procedure Call) を実現するためにGoogleが開発したプロトコルの1つです。Protocol Buffers を使ってデータをシリアライズし、高速な通信を実現できる点が特長です。

gRPCでは、IDL(インターフェース定義言語)を使ってあらかじめAPI仕様を .proto ファイルとして定義し、そこからサーバー側&クライアント側に必要なソースコードのひな形を生成します。言語に依存しないIDLで先にインタフェースを定義することで、様々なプログラミング言語の実装を生成できるというメリットがあります。

gRPCって何? - Qiita

Like many RPC systems, gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types. By default, gRPC uses protocol buffers as the Interface Definition Language (IDL) for describing both the service interface and the structure of the payload messages. It is possible to use other alternatives if desired.

grpc / gRPC Concepts

gRPC で渡すデータサイズについて

gRPCで4MB以上のデータ転送をしようとすると

rpc error: code = ResourceExhausted desc = grpc: received message larger than max (xxxxxxx vs. 4194304)

のようなエラーが出ます。この上限はデフォルト値なので変更することもできるのですが、ドキュメントでも以下のように

Protocol Buffers are not designed to handle large messages. As a general rule of thumb, if you are dealing in messages larger than a megabyte each, it may be time to consider an alternate strategy.

Techniques  |  Protocol Buffers  |  Google Developers

と、1メッセージがメガバイトを超えるのであれば別の手段で転送することを考えるべき、と言っています。
なので

のようにストリームで送るのが良さそうです。

gRPCで大きなファイルのやり取りをする - Carpe Diem

Protocol Buffers are not designed to handle large messages. As a general rule of thumb, if you are dealing in messages larger than a megabyte each, it may be time to consider an alternate strategy.

Techniques  |  Protocol Buffers  |  Google Developers

The default is now 4MB, and if you want larger you need to override it. What code change made it work?

(中略)

It looks like our current API can't do that, because we encode the max message size as an int, so the maximum supported value is 2147483647. Do you actually need to use a max size larger than that? If so, we might need to make some API changes.

(中略)

gRPC currently needs to hold the entire serialized message in memory before deserialization, and so with multi gigabyte messages you're holding huge amounts of memory hostage, especially compared to a stream.

Can I use a parameter bigger than 20MB in one call · Issue #7882 · grpc/grpc · GitHub
  • TensorFlow Debugger (tfdbg)
    • Fix an issue in which the TensorBoard Debugger Plugin could not handle total source file size exceeding gRPC message size limit (4 MB).
Release TensorFlow 1.9.0 · tensorflow/tensorflow · GitHub

YCSB で Memcached に負荷をかける

YCSB で Amazon ElastiCache(Memcached) に負荷をかけてみたメモ。

準備

recordcount=10000
operationcount=10000
workload=com.yahoo.ycsb.workloads.CoreWorkload
readallfields=true
readproportion=0.33
updateproportion=0.33
scanproportion=0
insertproportion=0.33

負荷をかける

  • データをロードする。
$ ./bin/ycsb load memcached -s -P workloads/workload-mc-01 -p "memcached.hosts=memcached.******.cfg.apne1.cache.amazonaws.com"
  • 負荷をかける。
$ ./bin/ycsb run memcached -s -P workloads/workload-mc-01 -p "memcached.hosts=memcached.******.cfg.apne1.cache.amazonaws.com"

AWS CLI を実行すると "ImportError: cannot import name AliasedEventEmitter" と怒られる

事象

  • ある日、AWS CLI を実行すると "ImportError: cannot import name AliasedEventEmitter" と怒られるようになった。
$ aws s3 ls
Traceback (most recent call last):
  File "/usr/bin/aws", line 19, in <module>
    import awscli.clidriver
  File "/usr/lib/python2.7/dist-packages/awscli/clidriver.py", line 19, in <module>
    from botocore.hooks import AliasedEventEmitter
ImportError: cannot import name AliasedEventEmitter

解決策

  • AWS CLI をインストールし直したら、解消した。
$ python -m pip install --upgrade pip
$ sudo /usr/local/bin/pip uninstall awscli
$ sudo /usr/local/bin/pip install awscli

YCSBで DynamoDB に負荷をかけたメモ

設定

  • YCSB/dynamodb/conf/AWSCredentials.properties
accessKey = <アクセスキー>
secretKey = <シークレットキー>
  • YCSB/dynamodb/conf/dynamodb.properties
dynamodb.awsCredentialsFile = dynamodb/conf/AWSCredentials.properties
dynamodb.primaryKey = firstname
dynamodb.endpoint = http://dynamodb.ap-northeast-1.amazonaws.com
  • YCSB/workloads/workload02
recordcount=10000
table=ycsb02
operationcount=10000
workload=com.yahoo.ycsb.workloads.CoreWorkload
readallfields=true
readproportion=0.33
updateproportion=0.33
scanproportion=0
insertproportion=0.33

実行手順

  • データロード
$ ./bin/ycsb load dynamodb -P workloads/workload02 -P dynamodb/conf/dynamodb.properties
Command line: -db com.yahoo.ycsb.db.DynamoDBClient -P workloads/workload02 -P dynamodb/conf/dynamodb.properties -load
YCSB Client 0.16.0-SNAPSHOT
(中略)
Loading workload...
Starting test.
0    [Thread-1] INFO  com.yahoo.ycsb.db.DynamoDBClient  -dynamodb connection created with http://dynamodb.ap-northeast-1.amazonaws.com
DBWrapper: report latency for each error is false and specific error codes to track for latency are: []
[OVERALL], RunTime(ms), 98442
[OVERALL], Throughput(ops/sec), 101.58265780865891
[TOTAL_GCS_PS_Scavenge], Count, 17
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 52
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.05282298206050263
[TOTAL_GCS_PS_MarkSweep], Count, 0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 17
[TOTAL_GC_TIME], Time(ms), 52
[TOTAL_GC_TIME_%], Time(%), 0.05282298206050263
[CLEANUP], Operations, 1
[CLEANUP], AverageLatency(us), 4.0
[CLEANUP], MinLatency(us), 4
[CLEANUP], MaxLatency(us), 4
[CLEANUP], 95thPercentileLatency(us), 4
[CLEANUP], 99thPercentileLatency(us), 4
[INSERT], Operations, 10000
[INSERT], AverageLatency(us), 9763.9544
[INSERT], MinLatency(us), 8960
[INSERT], MaxLatency(us), 255359
[INSERT], 95thPercentileLatency(us), 10071
[INSERT], 99thPercentileLatency(us), 12527
[INSERT], Return=OK, 10000
  • 負荷をかける
$ ./bin/ycsb run dynamodb -P workloads/workload02 -P dynamodb/conf/dynamodb.properties -threads 4
(中略)
[OVERALL], RunTime(ms), 27116
[OVERALL], Throughput(ops/sec), 368.7859566307715
[TOTAL_GCS_PS_Scavenge], Count, 13
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 62
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.22864729311107834
[TOTAL_GCS_PS_MarkSweep], Count, 0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 13
[TOTAL_GC_TIME], Time(ms), 62
[TOTAL_GC_TIME_%], Time(%), 0.22864729311107834
[READ], Operations, 3326
[READ], AverageLatency(us), 8019.719783523752
[READ], MinLatency(us), 5880
[READ], MaxLatency(us), 227199
[READ], 95thPercentileLatency(us), 9455
[READ], 99thPercentileLatency(us), 16039
[READ], Return=OK, 3326
[CLEANUP], Operations, 4
[CLEANUP], AverageLatency(us), 1.5
[CLEANUP], MinLatency(us), 1
[CLEANUP], MaxLatency(us), 3
[CLEANUP], 95thPercentileLatency(us), 3
[CLEANUP], 99thPercentileLatency(us), 3
[INSERT], Operations, 3315
[INSERT], AverageLatency(us), 11822.618401206637
[INSERT], MinLatency(us), 10560
[INSERT], MaxLatency(us), 223487
[INSERT], 95thPercentileLatency(us), 12527
[INSERT], 99thPercentileLatency(us), 16655
[INSERT], Return=OK, 3315
[UPDATE], Operations, 3359
[UPDATE], AverageLatency(us), 11647.422447156892
[UPDATE], MinLatency(us), 10360
[UPDATE], MaxLatency(us), 108095
[UPDATE], 95thPercentileLatency(us), 12463
[UPDATE], 99thPercentileLatency(us), 19071
[UPDATE], Return=OK, 3359
$ ./bin/ycsb run dynamodb -P workloads/workload02 -P dynamodb/conf/dynamodb.properties -threads 4
(中略)
[OVERALL], RunTime(ms), 27247
[OVERALL], Throughput(ops/sec), 367.0128821521635
[TOTAL_GCS_PS_Scavenge], Count, 13
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 62
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.22754798693434142
[TOTAL_GCS_PS_MarkSweep], Count, 0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 13
[TOTAL_GC_TIME], Time(ms), 62
[TOTAL_GC_TIME_%], Time(%), 0.22754798693434142
[READ], Operations, 3331
[READ], AverageLatency(us), 8085.699789852897
[READ], MinLatency(us), 5840
[READ], MaxLatency(us), 231039
[READ], 95thPercentileLatency(us), 9495
[READ], 99thPercentileLatency(us), 19775
[READ], Return=OK, 3331
[CLEANUP], Operations, 4
[CLEANUP], AverageLatency(us), 1.75
[CLEANUP], MinLatency(us), 1
[CLEANUP], MaxLatency(us), 4
[CLEANUP], 95thPercentileLatency(us), 4
[CLEANUP], 99thPercentileLatency(us), 4
[UPDATE], Operations, 3336
[UPDATE], AverageLatency(us), 11837.420863309353
[UPDATE], MinLatency(us), 10312
[UPDATE], MaxLatency(us), 227711
[UPDATE], 95thPercentileLatency(us), 12503
[UPDATE], 99thPercentileLatency(us), 33151
[UPDATE], Return=OK, 3336
[INSERT], Operations, 3333
[INSERT], AverageLatency(us), 11616.116411641164
[INSERT], MinLatency(us), 10504
[INSERT], MaxLatency(us), 65919
[INSERT], 95thPercentileLatency(us), 12415
[INSERT], 99thPercentileLatency(us), 16463
[INSERT], Return=OK, 3333
$ ./bin/ycsb run dynamodb -P workloads/workload02 -P dynamodb/conf/dynamodb.properties -threads 4
(中略)
[OVERALL], RunTime(ms), 26427
[OVERALL], Throughput(ops/sec), 378.4008778900367
[TOTAL_GCS_PS_Scavenge], Count, 13
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 61
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.23082453551292237
[TOTAL_GCS_PS_MarkSweep], Count, 0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 13
[TOTAL_GC_TIME], Time(ms), 61
[TOTAL_GC_TIME_%], Time(%), 0.23082453551292237
[READ], Operations, 3335
[READ], AverageLatency(us), 8170.629085457272
[READ], MinLatency(us), 6060
[READ], MaxLatency(us), 239103
[READ], 95thPercentileLatency(us), 9703
[READ], 99thPercentileLatency(us), 12135
[READ], Return=OK, 3335
[CLEANUP], Operations, 4
[CLEANUP], AverageLatency(us), 1.25
[CLEANUP], MinLatency(us), 1
[CLEANUP], MaxLatency(us), 2
[CLEANUP], 95thPercentileLatency(us), 2
[CLEANUP], 99thPercentileLatency(us), 2
[INSERT], Operations, 3357
[INSERT], AverageLatency(us), 10820.53142686923
[INSERT], MinLatency(us), 9104
[INSERT], MaxLatency(us), 245375
[INSERT], 95thPercentileLatency(us), 11463
[INSERT], 99thPercentileLatency(us), 31871
[INSERT], Return=OK, 3357
[UPDATE], Operations, 3308
[UPDATE], AverageLatency(us), 10581.397823458283
[UPDATE], MinLatency(us), 8872
[UPDATE], MaxLatency(us), 231551
[UPDATE], 95thPercentileLatency(us), 11287
[UPDATE], 99thPercentileLatency(us), 19039
[UPDATE], Return=OK, 3308

環境

  • DynamoDB
    • テーブル名: ycsb02
    • パーティションキー: firstname
    • Item件数:1万件
    • Provisioned capacity
      • Read: 3000
      • Write: 1000
  • EC2インスタンス
    • Instance type: m4.xlarge
    • OS: Amazon Linux AMI release 2018.03(kernel 4.14.77-70.59.amzn1.x86_64)
    • AMI: amzn-ami-hvm-2018.03.0.20181129-x86_64-gp2 (ami-00a5245b4816c38e6)
    • Availability zone: ap-northeast-1a

Redshift のエラーハンドリングとエラーメッセージの確認方法

copy customer from 's3://awssampledbuswest2/ssbgz/customer'
credentials 'aws_iam_role=arn:aws:iam::123456789012:role/RedshiftRole'
gzip compupdate off region 'us-west-2';
# create schema admin;
# create or replace view admin.v_my_last_copy_errors as 
select query, 
       starttime,
       filename, 
       line_number,
       err_reason,
       colname,
       type column_type,
       col_length,
       raw_field_value
from stl_load_errors le
where le.query = pg_last_copy_id();
  • 実行する
    • パスワードの指定は環境変数と .pgpass に記述する方法があり、.pgpass 推奨だがとりあえず。
$ export PGPASSWORD=<パスワード>
$ psql --set ON_ERROR_STOP=on "host=redshift-cluster-1.******.us-west-2.redshift.amazonaws.com user=awsuser dbname=dev port=5439" -f copy.sql
psql:copy.sql:3: ERROR:  Cannot COPY into nonexistent table customer ★エラーメッセージ
$ echo $?
3 ★戻り値(正常終了は0
  • エラーメッセージを参照する。
$ psql "host=redshift-cluster-1.******.us-west-2.redshift.amazonaws.com user=awsuser dbname=dev port=5439"
# select * from admin.v_my_last_copy_errors;

参考

終了状態
psqlは、正常に終了した時には0を、psqlにとって致命的なエラー(メモリ不足やファイルが見つからないなど)が発生した時には1を、セッションが対話式でない状態でサーバとの接続が不完全になった時には2を、ON_ERROR_STOP変数が設定されている状態でスクリプトでエラーが発生した時には3をシェルに返します。

https://www.postgresql.jp/document/8.1/html/app-psql.html

AWS CLI でS3バケットのライフサイクル設定を取得する

AWS CLI でS3バケットのライフサイクル設定を取得する例。

  • ライフサイクル設定を取得する。
$ aws s3api get-bucket-lifecycle-configuration --bucket az-cp-src
{
    "Rules": [
        {
            "Filter": {
                "Tag": {
                    "Value": "10y",
                    "Key": "Name"
                }
            },
            "Status": "Enabled",
            "Expiration": {
                "Days": 365
            },
            "ID": "test"
        },
        {
            "Filter": {
                "Prefix": ""
            },
            "Status": "Enabled",
            "Expiration": {
                "Days": 365
            },
            "ID": "test2"
        }
    ]
}
  • ルールIDでフィルタする。
$ aws s3api get-bucket-lifecycle-configuration --bucket az-cp-src|\
jq '.Rules[]|select(.ID == "test")' 
{
  "Filter": {
    "Tag": {
      "Value": "10y",
      "Key": "Name"
    }
  },
  "Status": "Enabled",
  "Expiration": {
    "Days": 365
  },
  "ID": "test"
}
  • さらにCSVで出力する。
$ aws s3api get-bucket-lifecycle-configuration --bucket az-cp-src|\
jq -r '.Rules[]|select(.ID == "test")|@text "\(.ID), \(.Status), \(.Expiration.Days), \(.Tag.Name), \(.Tag.Value)"'
test, Enabled, 365, null, null

EMRFS で KMS のカスタマー管理 CMK で暗号化する

EMR で KMS のカスタマー管理CMKを利用するには、S3 バケットのデフォルト暗号化で CMK を指定し、S3 と KMS に対する必要な権限を持つロールを EMR にアタッチすればよい。
また、S3 のバケットポリシーで特定の CMK 以外での PUT を禁止すると、「CMK 指定なし」と「許可した CMK 指定する」とアップロードできるが、「許可した CMK 以外を指定する」と失敗する。

検証結果

  • ファイルを作成する。
$ perl -le 'print for 1..10000' > number10000.txt
  • AWS CLIでCMK指定なしでのアップロードは成功する
$ aws s3 cp number10000.txt s3://az-emr-kms-test/
upload: ./number10000.txt to s3://az-emr-kms-test/number10000.txt
  • AWS CLIバケットポリシーで許可されたCMKを指定してのアップロードは成功する。
$ aws s3 cp number10000.txt s3://az-emr-kms-test/number10000.txt --sse aws:kms --sse-kms-key-id 'arn:aws:kms:ap-northeast-1:123456789012:key/c3******-168d-4***-9***-9***********'
upload: ./number10000.txt to s3://az-emr-kms-test/number10000.txt
  • AWS CLIで違うCMKを指定してのアップロードは失敗する。
$ aws s3 cp number10000.txt s3://az-emr-kms-test/number10000.txt --sse aws:kms --sse-kms-key-id 'arn:aws:kms:ap-northeast-1:123456789012:key/2a******-8b8f-4***-9***-0***********'
upload failed: ./number10000.txt to s3://az-emr-kms-test/number10000.txt An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
  • PySpark on EMR で、EMRFS で S3 に書ける(CMK指定なし)。
$ pyspark
Python 2.7.15 (default, Nov 28 2018, 22:38:08)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/01 07:54:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/01 07:54:31 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
19/04/01 07:54:31 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
19/04/01 07:54:34 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.15 (default, Nov 28 2018 22:38:08)
SparkSession available as 'spark'.
>>> df = spark.read.csv("s3://az-emr-kms-test/number10000.txt")
>>> df.head(10)
[Row(_c0=u'1'), Row(_c0=u'2'), Row(_c0=u'3'), Row(_c0=u'4'), Row(_c0=u'5'), Row(_c0=u'6'), Row(_c0=u'7'), Row(_c0=u'8'), Row(_c0=u'9'), Row(_c0=u'10')]
>>> df.write.mode('overwrite').parquet("s3://az-emr-kms-test/number10000.parquet")
>>> exit
  • Sparkから書いたファイルを確認する。
$ aws s3 ls --recursive s3://az-emr-kms-test/
2019-04-01 07:34:49      48894 num1
2019-04-01 05:39:51  888888898 number.txt
2019-04-01 07:55:47          0 number10000.parquet/_SUCCESS
2019-04-01 07:55:47      43888 number10000.parquet/part-00000-4c91ddce-b347-4239-85ec-0eba160c6c15-c000.snappy.parquet
2019-04-01 07:53:21      48894 number10000.txt

準備

  • EMRクラスタを作成する。
  • IAMポリシー「KMSUserPolicy1」を作成して、ロール「EMR_EC2_DefaultRole」にアタッチする
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey",
                "kms:ListAliases"
            ],
            "Resource": "*"
        }
    ]
}
  • S3バケットポリシーで指定したCMK以外での暗号化を禁止する。
{
    "Version": "2012-10-17",
    "Id": "S3KeyPolicy",
    "Statement": [
        {
            "Sid": "Force KMS Key",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::az-emr-kms-test/*",
            "Condition": {
                "StringNotEquals": {
                    "s3:x-amz-server-side-encryption-aws-kms-key-id": "arn:aws:kms:ap-northeast-1:123456789012:key/c3******-168d-4***-9***-9***********"
                }
            }
        }
    ]
}