ablog

不器用で落着きのない技術者のメモ

Apache Spark パフォーマンス分析・チューニング

Spark とは

  • Spark とは

  • SparkはBDAS(Berkeley Data Analytics Stack)の一部

  • Haddop エコシステムとの対応


アーキテクチャ

  • RDD(resilient distributed dataset)

f:id:yohei-a:20200523175319p:plain

  • クライアントからクエリ実行時の構成例

f:id:yohei-a:20200523175323p:plain

  • Glue の Worker Type 別 CPU数とメモリサイズ

実行プロセス
  • クエリ実行プロセス

  • タスクスケジューリング(FIFO と Fair)


  • Spark executer のメモリ内訳

  • Spark executer のメモリ内訳


分析方法

linux.wwing.net

  • Spark SQL のクエリは複数のジョブから構成される。
  • 一つのジョブにおける RDD の依存関係は DAG で表される。
ボトルネックの見つけ方

f:id:yohei-a:20200504141235p:plain

  • Spark UI の [Jobs] - [Completed Jobs] - [Duration] で実行時間を確認する。

f:id:yohei-a:20200504141032p:plain

  • [Stages] タブを選択する。
    • Event Timeline で実行時間の内訳の何の割合が大きいか確認する。
    • Tasks の Attempt と Status から Kill されていないか確認する。
    • Duration で各タスクの実行時間に偏りがないか確認する。
    • Input Size / Records でサイズに偏りがないか確認する。

f:id:yohei-a:20200504141505p:plain
f:id:yohei-a:20200504141624p:plain

  • [Executers] タブを選択する。
    • [Address] でどのノードで実行されたか確認する。
    • [Storage Memory] でメモリ使用量を確認する。
    • [Task Time] でタスクの実行時間を確認する。
    • [Shuffle Read]、[Shuffle Write] でステージ間で転送されたデータサイズを確認する。

f:id:yohei-a:20200504142553p:plain

  • [Storage] タブを選択する確認する。

チューニング方法

IO量を減らす
仕事量を均等分散する
Spark
  • ヒント句

SELECT SQL Statements With Hints
SELECT SQL statement supports query hints as comments in SQL query that Spark SQL translates into a UnresolvedHint unary logical operator in a logical plan.
COALESCE and REPARTITION Hints
Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL comments):

SELECT /*+ COALESCE(5) */ …​
SELECT /*+ REPARTITION(3) */ …​

Broadcast Hints
Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL comments:

SELECT /*+ MAPJOIN(b) */ …​
SELECT /*+ BROADCASTJOIN(b) */ …​
SELECT /*+ BROADCAST(b) */ …​
Hint Framework · The Internals of Spark SQL
Glue

--enable-s3-parquet-optimized-committer — Parquet データを Amazon S3 に書き込むために EMRFS S3 最適化コミッターを有効にします。AWS Glue ジョブを作成または更新するときに、AWS Glue コンソールからパラメータ/値のペアを指定できます。値を true に設定すると、コミッターが有効になります。デフォルトでは、このフラグはオフになっています。

詳細については、「EMRFS S3 向けに最適化されたコミッターの使用」を参照してください。

AWS Glue で使用される特別なパラメータ - AWS Glue

format="glueparquet"
この値は、動的フレーム用に最適化されたカスタム Parquet ライタータイプをデータ形式として指定します。書き込む前に事前計算スキーマは必要ありません。データが到着すると、glueparquet はスキーマを動的に計算して変更します。

format="glueparquet" には、以下の format_options 値を使用できます。

  • compression — Parquet ファイルを書き込むときに使用する圧縮コーデックを指定します。デフォルト値は "snappy" です。
  • blockSize — メモリにバッファされる行グループのサイズを指定します。デフォルト値は "128MB" です。
  • pageSize — 単一のレコードにアクセスするために完全に読み取る必要がある最小単位のサイズを指定します。デフォルト値は "1MB" です。

制約事項:

  • glueparquet スキーマの縮小または拡張のみをサポートし、タイプの変更はサポートしません。
  • glueparquet はスキーマ専用ファイルを格納できません。
AWS Glue での ETL 入力および出力の形式オプション - AWS Glue

from_options
from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx="")
指定された接続と形式を使用して DynamicFrame を書き込みます。

  • frame – 書き込む DynamicFrame。
  • connection_type – 接続タイプ。有効な値には、s3、mysqlpostgresql、redshift、sqlserver、および oracle があります。
  • connection_options – 接続オプション (パスやデータベーステーブルなど) (オプション)。s3 の connection_type では、Amazon S3 パスが定義されています。
connection_options = {"path": "s3://aws-glue-target/temp"}

JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

dbtable プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。
詳細については、「AWS Glue での ETL の接続タイプとオプション」を参照してください。

  • format – 形式の仕様 (オプション)。これは、複数の形式をサポートする Amazon Simple Storage Service (Amazon S3) または AWS Glue 接続に使用します。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。
  • format_options – 指定した形式の形式オプション。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。
  • transformation_ctx – 使用する変換コンテキスト (オプション)。
DynamicFrameWriter クラス - AWS Glue

Glue カタログを使わずに S3 から読む例

# Create DynamicFrame from Options
dyf = glue_context.create_dynamic_frame.from_options(
    connection_type='s3',
    connection_options={
        'paths': [’s3://….'],
        'useS3ListImplementation': True,
        'recurse': True
    },
    format=‘parquet',
    transformation_ctx='dyf'
)
EMR

参考情報