ablog

不器用で落着きのない技術者のメモ

Spark

Parquet はファイルでカラムの型を持っているため、Glue カタログだけ変更しても型を変えることはできない

S3 にある Parquet ファイルを Glue の Crawler でクロールしてテーブルを作成し、文字列型のカラムを数値型に変更するとエラーになるという当たりまり前(Parquet は項目定義に型を持っているバイナリファイルのため)のことを検証した。型を変えたい場合は…

Spark on EMR から Glue カタログにアクセスできない

事象 Spark on EMR で Glue カタログのデータベース名を表示しようとすると、"because no identity-based policy allows the glue:GetDatabase action" で AccessDeniedException が発生する。 $ pyspark >>> from pyspark.sql import SparkSession >>> spar…

PySpark でタイムスタンプを UTC から JST に変換する

PySpark でタイムスタンプを UTC から JST に変換する例。 # 文字列をタイムスタンプ型に変換 df = df.withColumn("timestamp", col("timestamp").cast("Timestamp")) # UTC から JST に変換 df = df.withColumn("timestamp", from_utc_timestamp(col("times…

Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲む

Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲みたいときは DataDrame で write するときに quoteAll=True を指定してやればよい。 outputDf = newDf.repartition(1) s3OutputPath ="s3://dl-sfdc-dm/test/newline_test" outputDf.write.mode('…

Glue PySpark で CSV のカラム内の改行コードを置換する

Glue PySpark で CSV のカラム内の改行コードを置換する例。Spark では正規表現は Java の記法になる。 newDf = df.withColumn("col2", regexp_replace(col("col2"), "\\n|\\r", " ")) サンプルコード全量 import sys from awsglue.transforms import * from…

PySpark は Java の正規表現記法を使う

PySpark では Java の正規表現を使う Regex in pyspark internally uses java regex.One of the common issue with regex is escaping backslash as it uses java regex and we will pass raw python string to spark.sql we can see it with a sample examp…

Spark関連情報メモ

Spark 2.4.3 pyspark.sql module

pyspark.sql module の select で DataFrame に複数カラムを連結したカラムを追加する

pyspark.sql module の select、concat、col で DataFrame に複数カラムを連結したカラムを追加する。.alias("...") で連結したカラムに別名をつけている。 from pyspark.sql.functions import concat, col, lit df = df.select(col("col1"), col("col2"), c…

pyspark.sql module の select で DataFrame の全カラムを取得する

pyspark.sql module の select で DataFrame の全カラムを取得する。 df = df.select([column for column in df.columns]) 参考 drop_list = ['a column', 'another column', ...] df.select([column for column in df.columns if column not in drop_list])…

SparkSQL メモ

DataFrame を SparkSQL で操作する サンプル df.registerTempTable('table1') df_res = spark.sql('select * from table1') df_res.show() 参考: PySpark の DataFrame を SparkSQL で操作する - CUBE SUGAR CONTAINER Timestamp 型に変更する やりたいこと …

PySparkメモ

カウントする df.count() スキーマを表示する Spark DataframeのSample Code集 - Qiita print df.printSchema() DynamicFrame Dataframe 変換 from awsglue.dynamicframe import DynamicFrame # DynamicFrame -> Spark DataFrame df = DynamicFrame.toDF(<DynamicFrame>) #</dynamicframe>…

PySpark でデータを増幅する

https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html の lineorder テーブルのデータを増幅する PySpark スクリプト for Glue ジョブ。実行状況は Spark History UI から確認する(AWS マネジメントコンソ…

PySpark で DataFrame にリテラルで列を追加しようとすると "col should be Column" と怒られる

事象 PySpark で DataFrame にリテラルで列を追加しようとすると "col should be Column" と怒られる。 コード import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from …

Spark Meetup Tokyo #2 (Spark+AI Summit EU 2019) に参加してきた

Spark Meetup Tokyo #2 (Spark+AI Summit EU 2019) - connpass に参加してきた。今度は Spark について、どうでもいことに Dive deep して話してみるのも面白そう。 Spark+AI Summit Europe 2019 セッションハイライト by 萩原 悠二/Yuji Hagiwara and 酒井 …

Spark DataFrame の repartition メソッドは何をするものか

Spark DataFrame の repartition(パーティション数, カラム名) とすると指定したカラムで指定したパーティション数にパーティショニングする。パーティション数を省略するとデフォルト値(Spark 2.3.2 では 200)になる。 repartition(numPartitions, *cols)…