ablog

不器用で落着きのない技術者のメモ

Athena で改行を含む CSV を扱いたい場合は Glue ジョブで Parquet に変換する

データの中身に改行を含む CSV を Athena でクエリすると正しく扱えなかったが、Glue ジョブで CSV を Parquet に変換すると改行を含むデータを扱うことができた。おそらく OpenCSVSerDe は改行に対応していないが、Parquet SerDe は改行に対応しているからではないかと思われる。

  • cr.csv を用意する。
c1,c2,c3_string
1,1,"test string"
2,2,"text string"
3,3,"string
with cr"
4,4,"text string"
  • S3 にアップロードする。
  • Glue のクローラで CSV をカタログに登録する。
  • Athena からCSV を参照すると改行で表示が崩れている。

f:id:yohei-a:20191015001912p:plain

  • Glue ジョブで Parquet に変換する。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

## initialize
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init('csv2parquet')

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "cr_csv", transformation_ctx = "datasource0")

## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()

## For large data sets, try to cache the data will accelerate later execution.
df.cache()

castedDf = df.withColumn("c1", df.c1.cast("decimal(38,0)")) \
    .withColumn("c2", df.c2.cast("decimal(38,0)")) \
    .withColumn("c3_string", df.c3_string.cast("varchar(30)"))

castedDf.write.partitionBy(["c1"]).mode("overwrite").parquet("s3://az-cr-test/parquet/",compression='snappy')

job.commit()
  • Glue のクローラで Parquet をカタログに登録する。
  • Athena からParquet を参照すると改行のある行が1行として扱われている。

f:id:yohei-a:20191015001834p:plain

  • 結果セットをダウンロードしてみると改行は残っている。
"c2","c3_string","c1"
"3","string
with cr","3"
"4","text string","4"
"1","test string","1"
"2","text string","2"