PySpark on Glue で S3 上のファイルをリネームしてみた。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate()) URI = SparkContext._gateway.jvm.java.net.URI Path = SparkContext._gateway.jvm.org.apache.hadoop.fs.Path FileSystem = SparkContext._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem fs = FileSystem.get(URI("s3://{}".format('raw_log_bucket')), glueContext._jsc.hadoopConfiguration()) fs.rename( Path("s3://raw_log_bucket/log_20200122"), Path("s3://raw_log_bucket/log_20200122_renamed") )
- パスを取得して変更する例、ファイルが複数あるケースには対応していない。
import boto3 df_sql_result.coalesce(1).write.mode('overwrite').csv("s3://s3-bucket-name/output/", header=True) URI = SparkContext._gateway.jvm.java.net.URI Path = SparkContext._gateway.jvm.org.apache.hadoop.fs.Path FileSystem = SparkContext._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem fs = FileSystem.get(URI("s3://{}".format('s3-bucket-name')), glueContext._jsc.hadoopConfiguration()) s3_resource = boto3.resource('s3') s3_object_list = s3_resource.Bucket('s3-bucket-name').objects.filter(Prefix='output/') s3_object_names = [item.key for item in s3_object_list] fs.rename( Path("s3://s3-bucket-name/" + s3_object_names[0]), Path("s3://s3-bucket-name/output/output_01.csv") )
参考
- http://hatunina.hatenablog.com/entry/2018/08/06/200915
- Glue上のPySparkでS3上のファイルをリネーム - Qiita
- pysparkでデータハンドリングする時によく使うやつメモ - Qiita
- spark copy files to s3 using hadoop api - Big Data
- boto3 で S3 オブジェクトのコピー - set setting reset
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects