Spark とは
- Spark とは
- SparkはBDAS(Berkeley Data Analytics Stack)の一部
- Haddop エコシステムとの対応
- Spark in Action - 1.4. SPARK ECOSYSTEM - Figure 1.6
アーキテクチャ
- コンポーネント
- Spark in Action - 1.2. SPARK COMPONENTS - Figure 1.2
- RDD(resilient distributed dataset)
- Spark in Action - 1.3. SPARK PROGRAM FLOW - Figure 1.5
- クライアントからクエリ実行時の構成例
- Spark in Action - 5.1. WORKING WITH DATAFRAMES - Figure 5.1
- Glue の Worker Type 別 CPU数とメモリサイズ
実行プロセス
- クエリ実行プロセス
- プロセスモデル
- Spark in Action - 10.1.1. Spark runtime components - Figure 10.1
- タスクスケジューリング(FIFO と Fair)
- Spark in Action - 10.2.2. Spark job scheduling - 10.3, 10.4
- Spark executer のメモリ内訳
- Spark in Action - 10.2.4. Spark memory scheduling - 10.5
- Spark executer のメモリ内訳
- Spark in Action - 12.1.6. Configuring resources for Spark jobs - 12.3
- プロセスモデル(Stand alone)
- Spark in Action - 11.1. SPARK STANDALONE CLUSTER COMPONENTS - 11.1
- プロセスモデル(YARN)
- Spark in Action - 12.1.1. YARN architecture - 12.1
分析方法
linux.wwing.net
ボトルネックの見つけ方
- Spark UI の [Jobs] - [Completed Jobs] - [Duration] で実行時間を確認する。
- [Stages] タブを選択する。
- Event Timeline で実行時間の内訳の何の割合が大きいか確認する。
- Tasks の Attempt と Status から Kill されていないか確認する。
- Duration で各タスクの実行時間に偏りがないか確認する。
- Input Size / Records でサイズに偏りがないか確認する。
- [Executers] タブを選択する。
- [Address] でどのノードで実行されたか確認する。
- [Storage Memory] でメモリ使用量を確認する。
- [Task Time] でタスクの実行時間を確認する。
- [Shuffle Read]、[Shuffle Write] でステージ間で転送されたデータサイズを確認する。
- [Storage] タブを選択する確認する。
チューニング方法
IO量を減らす
仕事量を均等分散する
Spark
- ヒント句
SELECT SQL Statements With Hints
SELECT SQL statement supports query hints as comments in SQL query that Spark SQL translates into a UnresolvedHint unary logical operator in a logical plan.
COALESCE and REPARTITION Hints
Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL comments):SELECT /*+ COALESCE(5) */ … SELECT /*+ REPARTITION(3) */ …Broadcast Hints
Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL comments:SELECT /*+ MAPJOIN(b) */ … SELECT /*+ BROADCASTJOIN(b) */ … SELECT /*+ BROADCAST(b) */ …Hint Framework · The Internals of Spark SQL
Glue
--enable-s3-parquet-optimized-committer — Parquet データを Amazon S3 に書き込むために EMRFS S3 最適化コミッターを有効にします。AWS Glue ジョブを作成または更新するときに、AWS Glue コンソールからパラメータ/値のペアを指定できます。値を true に設定すると、コミッターが有効になります。デフォルトでは、このフラグはオフになっています。
詳細については、「EMRFS S3 向けに最適化されたコミッターの使用」を参照してください。
AWS Glue で使用される特別なパラメータ - AWS Glue
format="glueparquet"
この値は、動的フレーム用に最適化されたカスタム Parquet ライタータイプをデータ形式として指定します。書き込む前に事前計算スキーマは必要ありません。データが到着すると、glueparquet はスキーマを動的に計算して変更します。format="glueparquet" には、以下の format_options 値を使用できます。
- compression — Parquet ファイルを書き込むときに使用する圧縮コーデックを指定します。デフォルト値は "snappy" です。
- blockSize — メモリにバッファされる行グループのサイズを指定します。デフォルト値は "128MB" です。
- pageSize — 単一のレコードにアクセスするために完全に読み取る必要がある最小単位のサイズを指定します。デフォルト値は "1MB" です。
制約事項:
AWS Glue での ETL 入力および出力の形式オプション - AWS Glue
from_options
from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx="")
指定された接続と形式を使用して DynamicFrame を書き込みます。
- frame – 書き込む DynamicFrame。
- connection_type – 接続タイプ。有効な値には、s3、mysql、postgresql、redshift、sqlserver、および oracle があります。
- connection_options – 接続オプション (パスやデータベーステーブルなど) (オプション)。s3 の connection_type では、Amazon S3 パスが定義されています。
connection_options = {"path": "s3://aws-glue-target/temp"}JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。
connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}dbtable プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。
詳細については、「AWS Glue での ETL の接続タイプとオプション」を参照してください。DynamicFrameWriter クラス - AWS Glue
- format – 形式の仕様 (オプション)。これは、複数の形式をサポートする Amazon Simple Storage Service (Amazon S3) または AWS Glue 接続に使用します。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。
- format_options – 指定した形式の形式オプション。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。
- transformation_ctx – 使用する変換コンテキスト (オプション)。
Glue カタログを使わずに S3 から読む例
# Create DynamicFrame from Options dyf = glue_context.create_dynamic_frame.from_options( connection_type='s3', connection_options={ 'paths': [’s3://….'], 'useS3ListImplementation': True, 'recurse': True }, format=‘parquet', transformation_ctx='dyf' )
EMR
- spark.sql.shuffle.partitions、spark.default.parallelism など
参考情報
- aws.amazon.com
- dev.classmethod.jp
- 作者:Zecevic, Petar,Bonaci, Marko
- 発売日: 2016/11/26
- メディア: ペーパーバック
- externaltable.blogspot.com
- Demystifying Spark Jobs to Optimize for Cost and Performance - Cloudera Blog
- Performance Analysis of Apache Spark and Presto in Cloud Environments
- https://dl.acm.org/doi/10.1145/3185768.3185772
- 5TB/日 のデータをAWS Glueでさばくためにやったこと(性能編) | フューチャー技術ブログ
- Modernizing Big Data Workload Using Amazon EMR & AWS Glue
- An Insider’s Guide to Maximizing Spark SQL Performance
- Sparkの性能向上のためのパラメータチューニングとバッチ処理向けの推奨構成 | Think IT(シンクイット)
- Amazon | High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark | Karau, Holden, Warren, Rachel | Software Development
- Spark performance tuning from the trenches - Teads Engineering - Medium
- What is Spark Tuning? - Databricks
- https://youtu.be/daXEp4HmS-E
- A Deep Dive into Query Execution Engine of Spark SQL - Maryann Xue - YouTube
- Apache Sparkコミッターが教える、Spark SQLの詳しい仕組みとパフォーマンスチューニング Part2 - ログミーTech
- Apache Sparkについて
- Apache Spark で分散処理入門 - Qiita
- SparkInternalsで知る、Sparkの内部構造概要(Architecture) - Qiita
SparkでJDBCデータソースからデータを並列で読み取りたいすべての人へ。このドキュメントを読んでnumPartitions/partitionColumn/upperBound/lowerBoundを設定するんだ。まずはそれからだ。https://t.co/n1HEy52rDL
— Noritaka Sekiyama (@moomindani) July 20, 2020
AWS GlueでJDBCデータソースからデータを並列で読み取りたいすべての人へ。このドキュメントを読んでhashfield/hashexpression/hashpartitionsを設定するんだ。まずはそれからだ。https://t.co/cVEAFEeYaq https://t.co/zoShmSZvwj
— Noritaka Sekiyama (@moomindani) July 20, 2020
Sparkクラスタ上でタスクがひとつだけ起動して、ひとつのExecutorからJDBCデータソースにSELECT * FROM tをクエリして、他のExecutorが全員暇してる光景はもう見たくないんだ・・・!
— Noritaka Sekiyama (@moomindani) July 20, 2020