AWS Glue で開発エンドポイントを作成して、Zeppelin のノートブックで PySpark を実行して S3にある CSV を加工(行をフィルタ)してS3に書いてみた。S3 から読んだ CSV は Glue の DynamicFrame から SparkSQL DataFrame に変換してフィルタした後、DynamicFrame に変換してS3に書いた。DynamicFrame には単純なメソッドしかないため、SparkSQL を使って高度なデータ処理をしたい場合は、DataFrame に変換してやればよい。
準備
テストデータ作成
- テストデータ(CSV)を作成する。
% perl -e 'printf(qq/%d,%d,%d,%d\n/,$_,2..4) for 1..100' > number.csv % wc -l number.csv 100 number.csv % head -3 number.csv 1,2,3,4 2,2,3,4 3,2,3,4 % tail -3 number.csv 98,2,3,4 99,2,3,4 100,2,3,4
S3バケットにデータを置く
Glue で開発エンドポイントと Zeppelin ノートブックを作成
- AWSマネジメントコンソールから AWS Glue を選択し、開発エンドポイントを作成する。
- [ノートブックサーバー]の[HTTPS URL]にブラウザでアクセスしてログインする。
- [Create new note]をクリックして、任意の名前でノートブックを作成する。
- [Default Interpreter] は Spark を選択する
実行
- Zeppelin ノートブックを開いて以下の PySpark のコードを実行する。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import StringType glueContext = GlueContext(SparkContext.getOrCreate()) # S3 location for input input_dir = "s3://az-handson/input" # S3 location for output output_dir = "s3://az-handson/output" # Read CSV dyf = glueContext.create_dynamic_frame.from_options(connection_type = "s3", connection_options = {"paths": [ input_dir ]}, format="csv", format_options={ "withHeader": False}) # Convert to DataFrame df = dyf.toDF() # Filter filtered_df = df.where(df['col0'] > 50) # Turn it back to a dynamic frame output_dyf = DynamicFrame.fromDF(filtered_df, glueContext, "nested") # Write it out in CSV glueContext.write_dynamic_frame.from_options(frame = output_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "csv")
結果
- S3 に加工結果が出力されている
- ダウンロードして開くとフィルタされた結果になっている。
- Zeppelin でフィルタ後の DataFrame を表示してみる。
filtered_df.show()
メモ
- DynamicFrame の定義を確認する
dyf.printSchema()
- dataframe の定義を確認する
df.describe()
参考
- CSV のダウンロード - AWS 請求情報とコスト管理
- DynamicFrame Class - AWS Glue
- DynamicFrameWriter クラス - AWS Glue
- Best Practices When Using Athena with AWS Glue - Amazon Athena
- https://github.com/aws-samples/aws-glue-samples/blob/master/examples/data_cleaning_and_lambda.py
- https://gist.github.com/crawles/b47e23da8218af0b9bd9d47f5242d189
- ZeppelinでAWS BillingのCSVを分析する | レコチョクのエンジニアブログ
- AWS Glue の基本的な使い方 - Qoosky
- pyspark.sql module — PySpark 2.1.0 documentation