AWS Glue で Amazon S3 にある Parquet を Amazon DynamoDB にロードしてみた。
- DynamoDB にテーブルを作成する。
- プライマリキーの項目名と型を設定する。
- Parquet ファイルを S3 バケットにアップロードする。
- AWS Glue でクロールしてテーブルを作成する。
- Glue Spark ジョブを作成する。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "timeseries", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "timeseries", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("ts", "long", "ts", "long"), ("col1", "double", "col1", "double"), ("col2", "double", "col2", "double"), ("col3", "double", "col3", "double")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [(ts", "long", "ts", "long"), ("col1", "double", "col1", "double"), ("col2", "double", "col2", "double"), ("col3", "double", "col3", "double")], transformation_ctx = "applymapping1") datasink2 = glueContext.write_dynamic_frame_from_options( frame=applymapping1, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "timeseries", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Glue ジョブを実行すると DynamoDB のテーブルにデータがロードされた。