ablog

不器用で落着きのない技術者のメモ

PySpark でデータを増幅する

https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html の lineorder テーブルのデータを増幅する PySpark スクリプト for Glue ジョブ。実行状況は Spark History UI から確認する(AWS マネジメントコンソールの EMR クラスターの「まとめ」タブからのリンク)。

Glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format, lit
from pyspark.sql.types import StructType 
from pyspark.sql.types import StructField 
from pyspark.sql.types import LongType, StringType

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init('addYearColumn')
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "ssbgz", table_name = "gz_lineorder", transformation_ctx = "datasource0")
df = datasource0.toDF()
df.cache()

schema = StructType([ 
    StructField("col0",LongType(),True), 
    StructField("col1",LongType(),True),
    StructField("col2",LongType(),True),
    StructField("col3",LongType(),True),
    StructField("col4",LongType(),True),
    StructField("col5",LongType(),True),
    StructField("col6",StringType(),True),
    StructField("col7",LongType(),True),
    StructField("col8",LongType(),True),
    StructField("col9",LongType(),True),
    StructField("col10",LongType(),True),
    StructField("col11",LongType(),True),
    StructField("col12",LongType(),True),
    StructField("col13",LongType(),True),
    StructField("col14",LongType(),True),
    StructField("col15",LongType(),True),
    StructField("col16",StringType(),True),
    StructField("year",StringType(),True)
]) 

yearAddedDf = spark.createDataFrame(sc.emptyRDD(), schema) 

for item in range(1001, 1020):
    yearAddedDf = yearAddedDf.union(df.withColumn("year", lit(item)))
yearAddedDf.repartition(8).write.partitionBy(["year"]).mode("overwrite").csv("s3://<bucket name>/ssbgz/lineorder_partitioned/", compression="gzip")
Spark on EMR
import sys
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, date_format, lit
from pyspark.sql.types import StructType 
from pyspark.sql.types import StructField 
from pyspark.sql.types import LongType, StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.           \
  appName("ExamplePySparkSubmitTask").  \
  config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \
  enableHiveSupport(). \
  getOrCreate()

df = sql("SELECT * FROM ssbgz.gz_lineorder")
df.cache()

schema = StructType([ 
    StructField("col0",LongType(),True), 
    StructField("col1",LongType(),True),
    StructField("col2",LongType(),True),
    StructField("col3",LongType(),True),
    StructField("col4",LongType(),True),
    StructField("col5",LongType(),True),
    StructField("col6",StringType(),True),
    StructField("col7",LongType(),True),
    StructField("col8",LongType(),True),
    StructField("col9",LongType(),True),
    StructField("col10",LongType(),True),
    StructField("col11",LongType(),True),
    StructField("col12",LongType(),True),
    StructField("col13",LongType(),True),
    StructField("col14",LongType(),True),
    StructField("col15",LongType(),True),
    StructField("col16",StringType(),True),
    StructField("year",StringType(),True)
]) 

yearAddedDf = spark.createDataFrame(sc.emptyRDD(), schema) 

for item in range(1001, 1020):
    yearAddedDf = yearAddedDf.union(df.withColumn("year", lit(item)))
yearAddedDf.write.partitionBy(["year"]).mode("overwrite").csv("s3://<bucket name>/ssbgz/lineorder_part_emr/", compression="gzip")
AWS CLI
  • lineorder_cp.sh
#!/bin/bash 
for i in {1001..2000}
do
    aws s3 cp --recursive s3://<bucket name>/ssbgz/lineorder/ s3://<bucket name>/ssbgz/lineorder_part/year=${i}/
done
  • 実行する
$ aws configure set default.s3.multipart_chunksize 64MB
$ aws configure set default.s3.max_concurrent_requests 16
$ nohup ./lineorder_cp.sh &
  • Athena でテーブル作成
CREATE EXTERNAL TABLE ssbgz.gz_lineorder_part(
  `col0` bigint, 
  `col1` bigint, 
  `col2` bigint, 
  `col3` bigint, 
  `col4` bigint, 
  `col5` bigint, 
  `col6` string, 
  `col7` bigint, 
  `col8` bigint, 
  `col9` bigint, 
  `col10` bigint, 
  `col11` bigint, 
  `col12` bigint, 
  `col13` bigint, 
  `col14` bigint, 
  `col15` bigint, 
  `col16` string)
PARTITIONED BY (year INT)
ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '|'
 LINES TERMINATED BY '\n'
LOCATION 's3://<bucket name>/ssbgz/lineorder_part/';

MSCK REPAIR TABLE  ssbgz.gz_lineorder_part;