ablog

不器用で落着きのない技術者のメモ

Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲む

Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲みたいときは DataDrame で write するときに quoteAll=True を指定してやればよい。

outputDf = newDf.repartition(1)
s3OutputPath ="s3://dl-sfdc-dm/test/newline_test"
outputDf.write.mode('append').csv(s3OutputPath, quoteAll=True, escape='"')

補足

Glue の DynamicFrame はデフォルトで quoteChar はダブルクオート(")だが、全てのカラムにダブルクオートがつかなかったので、DataFrame を使った。

datasink2 = glueContext.write_dynamic_frame.from_options(
    frame = result, 
    connection_type = "s3", 
    connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, 
    format = "csv", 
    transformation_ctx = "datasink2")

format="csv"

This value designates comma-separated-values as the data format (for example, see RFC 4180 and RFC 7111).

You can use the following format_options values with format="csv":

  • separator — Specifies the delimiter character. The default is a comma: ",", but any other character can be specified.
  • escaper — Specifies a character to use for escaping. This option is used only when reading CSV files. The default value is none. If enabled, the character which immediately follows is used as-is, except for a small set of well-known escapes (\n, \r, \t, and \0).
  • quoteChar — Specifies the character to use for quoting. The default is a double quote: '"'. Set this to -1 to turn off quoting entirely.
  • multiLine — A Boolean value that specifies whether a single record can span multiple lines. This can occur when a field contains a quoted new-line character. You must set this option to True if any record spans multiple lines. The default value is False, which allows for more aggressive file-splitting during parsing.
  • withHeader — A Boolean value that specifies whether to treat the first line as a header. The default value is False. This option can be used in the DynamicFrameReader class.
  • writeHeader — A Boolean value that specifies whether to write the header to output. The default value is True. This option can be used in the DynamicFrameWriter class.
  • skipFirst — A Boolean value that specifies whether to skip the first data line. The default value is False.

The following example shows how to specify the format options within an AWS Glue ETL job script.

glueContext.write_dynamic_frame.from_options(
    frame = datasource1,
    connection_type = "s3", 
    connection_options = {
        "path": "s3://s3path"
        }, 
    format = "csv", 
    format_options={
        "quoteChar": -1, 
        "separator": "|"
        }, 
    transformation_ctx = "datasink2")
Format Options for ETL Inputs and Outputs in AWS Glue - AWS Glue

参考

pyspark.sql.DataFrameWriter.csv
DataFrameWriter.csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None)[source]
Saves the content of the DataFrame in CSV format at the specified path.
(中略)

  • quoteAllstr or bool, optional
    • a flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value false, only escaping values containing a quote character.
pyspark.sql.DataFrameWriter.csv — PySpark 3.1.2 documentation

コード全量

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import *


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "newline_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "newline_test", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string")], transformation_ctx = "applymapping1")

df = DynamicFrame.toDF(applymapping1)
newDf = df.withColumn("col2", regexp_replace(col("col2"), "\\n|\\r", " "))
result = DynamicFrame.fromDF(newDf, glueContext, "result")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, format = "csv", transformation_ctx = "datasink2")

outputDf = newDf.repartition(1)
s3OutputPath ="s3://dl-sfdc-dm/test/newline_test"
outputDf.write.mode('append').csv(s3OutputPath, quoteAll=True, escape='"')

job.commit()