ablog

不器用で落着きのない技術者のメモ

AWS Glue で Amazon S3 にある Parquet を Amazon DynamoDB にロードする

AWS Glue で Amazon S3 にある Parquet を Amazon DynamoDB にロードしてみた。

  • DynamoDB にテーブルを作成する。
    • プライマリキーの項目名と型を設定する。
  • Parquet ファイルを S3 バケットにアップロードする。
  • AWS Glue でクロールしてテーブルを作成する。
  • Glue Spark ジョブを作成する。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "timeseries", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "timeseries", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("ts", "long", "ts", "long"), ("col1", "double", "col1", "double"), ("col2", "double", "col2", "double"), ("col3", "double", "col3", "double")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [(ts", "long", "ts", "long"), ("col1", "double", "col1", "double"), ("col2", "double", "col2", "double"), ("col3", "double", "col3", "double")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame_from_options(
    frame=applymapping1,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": "timeseries",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

job.commit()

Glue ジョブを実行すると DynamoDB のテーブルにデータがロードされた。