ablog

不器用で落着きのない技術者のメモ

S3 にある CSV を DynamoDB にインポートする Python スクリプト

ソースコード

#!/usr/bin/env bash

export bucket=aws-s3-bucket
export key=test/test_table.csv
export table=testTable

python ./ddb_csv_importer.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import boto3
import os
import csv
import codecs

s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')

bucket = os.environ['bucket']
key = os.environ['key']
tableName = os.environ['table']

 
def write_to_dynamo(rows):
   try:
      table = dynamodb.Table(tableName)
   except:
      print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")

   try:
      with table.batch_writer() as batch:
         for i in range(len(rows)):
            casted_row = dict(map(lambda cols: map(str, cols), rows[i].items()))
            batch.put_item(
               Item=casted_row
            )

   except:
      print("Error executing batch_writer")
      import traceback
      traceback.print_exc()

if __name__ == '__main__':
   #get() does not store in memory
   try:
       obj = s3.Object(bucket, key).get()['Body']
   except:
       print("S3 Object could not be opened. Check environment variable. ")
   try:
       table = dynamodb.Table(tableName)
   except:
       print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")

   batch_size = 100
   batch = []

   #DictReader is a generator; not stored in memory
   for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
      if len(batch) >= batch_size:
         write_to_dynamo(batch)
         batch.clear()

      batch.append(row)

   if batch:
      write_to_dynamo(batch)

前提

  • DynamoDB にテーブルを作成しておく。

実行する

$ ./ddb_csv_importer.sh