https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html の lineorder テーブルのデータを増幅する PySpark スクリプト for Glue ジョブ。実行状況は Spark History UI から確認する(AWS マネジメントコンソールの EMR クラスターの「まとめ」タブからのリンク)。
Glue
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import SQLContext from pyspark.sql.functions import year, month, date_format, lit from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import LongType, StringType sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init('addYearColumn') datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "ssbgz", table_name = "gz_lineorder", transformation_ctx = "datasource0") df = datasource0.toDF() df.cache() schema = StructType([ StructField("col0",LongType(),True), StructField("col1",LongType(),True), StructField("col2",LongType(),True), StructField("col3",LongType(),True), StructField("col4",LongType(),True), StructField("col5",LongType(),True), StructField("col6",StringType(),True), StructField("col7",LongType(),True), StructField("col8",LongType(),True), StructField("col9",LongType(),True), StructField("col10",LongType(),True), StructField("col11",LongType(),True), StructField("col12",LongType(),True), StructField("col13",LongType(),True), StructField("col14",LongType(),True), StructField("col15",LongType(),True), StructField("col16",StringType(),True), StructField("year",StringType(),True) ]) yearAddedDf = spark.createDataFrame(sc.emptyRDD(), schema) for item in range(1001, 1020): yearAddedDf = yearAddedDf.union(df.withColumn("year", lit(item))) yearAddedDf.repartition(8).write.partitionBy(["year"]).mode("overwrite").csv("s3://<bucket name>/ssbgz/lineorder_partitioned/", compression="gzip")
Spark on EMR
import sys from pyspark.context import SparkContext from pyspark.sql import SQLContext from pyspark.sql import SparkSession from pyspark.sql.functions import year, month, date_format, lit from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import LongType, StringType from pyspark.sql import SparkSession spark = SparkSession.builder. \ appName("ExamplePySparkSubmitTask"). \ config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \ enableHiveSupport(). \ getOrCreate() df = sql("SELECT * FROM ssbgz.gz_lineorder") df.cache() schema = StructType([ StructField("col0",LongType(),True), StructField("col1",LongType(),True), StructField("col2",LongType(),True), StructField("col3",LongType(),True), StructField("col4",LongType(),True), StructField("col5",LongType(),True), StructField("col6",StringType(),True), StructField("col7",LongType(),True), StructField("col8",LongType(),True), StructField("col9",LongType(),True), StructField("col10",LongType(),True), StructField("col11",LongType(),True), StructField("col12",LongType(),True), StructField("col13",LongType(),True), StructField("col14",LongType(),True), StructField("col15",LongType(),True), StructField("col16",StringType(),True), StructField("year",StringType(),True) ]) yearAddedDf = spark.createDataFrame(sc.emptyRDD(), schema) for item in range(1001, 1020): yearAddedDf = yearAddedDf.union(df.withColumn("year", lit(item))) yearAddedDf.write.partitionBy(["year"]).mode("overwrite").csv("s3://<bucket name>/ssbgz/lineorder_part_emr/", compression="gzip")
AWS CLI
- lineorder_cp.sh
#!/bin/bash for i in {1001..2000} do aws s3 cp --recursive s3://<bucket name>/ssbgz/lineorder/ s3://<bucket name>/ssbgz/lineorder_part/year=${i}/ done
- 実行する
$ aws configure set default.s3.multipart_chunksize 64MB $ aws configure set default.s3.max_concurrent_requests 16 $ nohup ./lineorder_cp.sh &
- Athena でテーブル作成
CREATE EXTERNAL TABLE ssbgz.gz_lineorder_part( `col0` bigint, `col1` bigint, `col2` bigint, `col3` bigint, `col4` bigint, `col5` bigint, `col6` string, `col7` bigint, `col8` bigint, `col9` bigint, `col10` bigint, `col11` bigint, `col12` bigint, `col13` bigint, `col14` bigint, `col15` bigint, `col16` string) PARTITIONED BY (year INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://<bucket name>/ssbgz/lineorder_part/'; MSCK REPAIR TABLE ssbgz.gz_lineorder_part;