ablog

不器用で落着きのない技術者のメモ

PySpark on Glue で S3 上のファイル名をリネームする

PySpark on Glue で S3 上のファイルをリネームしてみた。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

URI = SparkContext._gateway.jvm.java.net.URI
Path = SparkContext._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = SparkContext._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem

fs = FileSystem.get(URI("s3://{}".format('raw_log_bucket')), glueContext._jsc.hadoopConfiguration())

fs.rename(
    Path("s3://raw_log_bucket/log_20200122"),
    Path("s3://raw_log_bucket/log_20200122_renamed")
)
  • パスを取得して変更する例、ファイルが複数あるケースには対応していない。
import boto3
df_sql_result.coalesce(1).write.mode('overwrite').csv("s3://s3-bucket-name/output/", header=True)
URI = SparkContext._gateway.jvm.java.net.URI
Path = SparkContext._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = SparkContext._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem

fs = FileSystem.get(URI("s3://{}".format('s3-bucket-name')), glueContext._jsc.hadoopConfiguration())

s3_resource = boto3.resource('s3')
s3_object_list = s3_resource.Bucket('s3-bucket-name').objects.filter(Prefix='output/')
s3_object_names = [item.key for item in s3_object_list]
fs.rename(
    Path("s3://s3-bucket-name/" + s3_object_names[0]),
    Path("s3://s3-bucket-name/output/output_01.csv")
)