データの中身に改行を含む CSV を Athena でクエリすると正しく扱えなかったが、Glue ジョブで CSV を Parquet に変換すると改行を含むデータを扱うことができた。おそらく OpenCSVSerDe は改行に対応していないが、Parquet SerDe は改行に対応しているからではないかと思われる。
- cr.csv を用意する。
c1,c2,c3_string 1,1,"test string" 2,2,"text string" 3,3,"string with cr" 4,4,"text string"
- Glue ジョブで Parquet に変換する。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import SQLContext from pyspark.sql.functions import year, month, date_format ## initialize sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init('csv2parquet') datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "cr_csv", transformation_ctx = "datasource0") ## Convert to standard Spark DataFrame to do trasformation to be continued df = datasource0.toDF() ## For large data sets, try to cache the data will accelerate later execution. df.cache() castedDf = df.withColumn("c1", df.c1.cast("decimal(38,0)")) \ .withColumn("c2", df.c2.cast("decimal(38,0)")) \ .withColumn("c3_string", df.c3_string.cast("varchar(30)")) castedDf.write.partitionBy(["c1"]).mode("overwrite").parquet("s3://az-cr-test/parquet/",compression='snappy') job.commit()
- Glue のクローラで Parquet をカタログに登録する。
- Athena からParquet を参照すると改行のある行が1行として扱われている。
- 結果セットをダウンロードしてみると改行は残っている。
"c2","c3_string","c1" "3","string with cr","3" "4","text string","4" "1","test string","1" "2","text string","2"