ablog

不器用で落着きのない技術者のメモ

AWS Glue の Zeppelin ノートブックで PySpark を実行して CSV を加工してみた

AWS Glue で開発エンドポイントを作成して、Zeppelin のノートブックで PySpark を実行して S3にある CSV を加工(行をフィルタ)してS3に書いてみた。S3 から読んだ CSV は Glue の DynamicFrame から SparkSQL DataFrame に変換してフィルタした後、DynamicFrame に変換してS3に書いた。DynamicFrame には単純なメソッドしかないため、SparkSQL を使って高度なデータ処理をしたい場合は、DataFrame に変換してやればよい。

準備

テストデータ作成
  • テストデータ(CSV)を作成する。
% perl -e 'printf(qq/%d,%d,%d,%d\n/,$_,2..4) for 1..100' > number.csv
% wc -l number.csv
     100 number.csv
% head -3 number.csv
1,2,3,4
2,2,3,4
3,2,3,4
% tail -3 number.csv
98,2,3,4
99,2,3,4
100,2,3,4
S3バケットにデータを置く
  • S3バケット 「az-handson」を作成する。
  • 「az-handson」バケットの下に 「input」、「output」 フォルダを作成する。
  • 「az-handson/input」に「number.csv」をアップロードする。
Glue で開発エンドポイントと Zeppelin ノートブックを作成
  • AWSマネジメントコンソールから AWS Glue を選択し、開発エンドポイントを作成する。
  • [ノートブックサーバー]の[HTTPS URL]にブラウザでアクセスしてログインする。
  • [Create new note]をクリックして、任意の名前でノートブックを作成する。

実行

  • Zeppelin ノートブックを開いて以下の PySpark のコードを実行する。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

glueContext = GlueContext(SparkContext.getOrCreate())

# S3 location for input
input_dir = "s3://az-handson/input"

# S3 location for output
output_dir = "s3://az-handson/output"

# Read CSV
dyf = glueContext.create_dynamic_frame.from_options(connection_type = "s3", 
    connection_options = {"paths": [ input_dir ]}, format="csv", format_options={ "withHeader": False})

# Convert to DataFrame
df = dyf.toDF()

# Filter
filtered_df = df.where(df['col0'] > 50)

# Turn it back to a dynamic frame
output_dyf = DynamicFrame.fromDF(filtered_df, glueContext, "nested")

# Write it out in CSV
glueContext.write_dynamic_frame.from_options(frame = output_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "csv")

結果

  • S3 に加工結果が出力されている

  • ダウンロードして開くとフィルタされた結果になっている。

  • Zeppelin でフィルタ後の DataFrame を表示してみる。
filtered_df.show()

メモ

  • DynamicFrame の定義を確認する
dyf.printSchema()
  • dataframe の定義を確認する
df.describe()