S3 にある Parquet ファイルを Glue の Crawler でクロールしてテーブルを作成し、文字列型のカラムを数値型に変更するとエラーになるという当たりまり前(Parquet は項目定義に型を持っているバイナリファイルのため)のことを検証した。型を変えたい場合は、Parquet ファイルを作り直す必要がある。
string->decimal
>>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder. \ appName("ExamplePySparkSubmitTask"). \ config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \ enableHiveSupport(). \ getOrCreate() >>> sql("SELECT cast(col0 as decimal(10)) FROM `tpc-h_10gb_parquet`.supplier_tbl").show(5) +----+ |col0| +----+ |1000| | 999| | 998| | 997| | 996| +----+ only showing top 5 rows
- Glue カタログで supplier_tbl の col0 を string 型から decimal 型に変更する。
- supplier_tbl の col0 を参照すると、"Parquet column cannot be converted in file s3://.... Column: [col0], Expected: decimal(10,0), Found: BINARY" とエラーになる。
>>> sql("SELECT cast(col0 as decimal(10)) FROM `tpc-h_10gb_parquet`.supplier_tbl").show(5) 23/05/30 15:39:15 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6) (ip-10-0-26-236.ec2.internal executor 3): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://.../TPC-H/2.18/parquet-10GB/run-1685084488806-part-block-0-r-00000-snappy.parquet. Column: [col0], Expected: decimal(10,0), Found: BINARY at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:724) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:397) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:703) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException ...
decimal(10,3)->decimal(10,4)
- 元々、col0 は decimal(10,3)
- pyspark で参照してみる
>>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder. \ appName("ExamplePySparkSubmitTask"). \ config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \ enableHiveSupport(). \ getOrCreate() >>> sql("SELECT * FROM `tpc-h_10gb_parquet`.nation_tbl").show(5) +-----+---------+----+--------------------+ | col0| col1|col2| col3| +-----+---------+----+--------------------+ |0.000| ALGERIA| 0| haggle. carefull...| |1.000|ARGENTINA| 1|al foxes promise ...| |2.000| BRAZIL| 1|y alongside of th...| |3.000| CANADA| 1|eas hang ironic, ...| |4.000| EGYPT| 4|y above the caref...| +-----+---------+----+--------------------+ only showing top 5 rows
- Glue カタログで decimal(10,3) -> decimal(10,4) に変更する。
- PySpark で参照するとエラーになる。
>>> sql("SELECT * FROM `tpc-h_10gb_parquet`.nation_tbl").show(5) 23/05/31 00:42:57 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 15) (ip-10-0-26-145.ec2.internal executor 8): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://.../TPC-H/2.18/parquet-10GB/sales/run-1685493062322-part-block-0-r-00000-snappy.parquet. Column: [col0], Expected: decimal(10,4), Found: INT64 at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:724) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:397) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:703) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
- Parquet ファイルの定義を確認する
% parquet meta run-1685493062322-part-block-0-r-00000-snappy.parquet File path: run-1685493062322-part-block-0-r-00000-snappy.parquet Created by: parquet-glue version 1.8.2 Properties: (none) Schema: message glue_schema { optional int64 col0 (DECIMAL(10,3)); optional binary col1 (STRING); optional int64 col2; optional binary col3 (STRING); } Row group 0: count: 25 77.32 B records start: 4 total(compressed): 1.888 kB total(uncompressed):2.811 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max col0 INT64 S _ 25 8.08 B 0 "0.000" / "24.000" col1 BINARY S _ 25 11.80 B 0 "ALGERIA" / "VIETNAM" col2 INT64 S _ R 25 5.04 B 0 "0" / "4" col3 BINARY S _ 25 52.40 B 0 " haggle. carefully final ..." / "y final packages. slow fo..."
カラム追加
- カラムを追加すると、カラム追加前のファイルは null となる。
>>> sql("SELECT * FROM `tpc-h_10gb_parquet`.nation_tbl").show(1000) +------+--------------+----+--------------------+ | col0| col1|col2| col3| +------+--------------+----+--------------------+ | 0.000| ALGERIA| 0| haggle. carefull...| | 1.000| ARGENTINA| 1|al foxes promise ...| | 2.000| BRAZIL| 1|y alongside of th...| | 3.000| CANADA| 1|eas hang ironic, ...| | 4.000| EGYPT| 4|y above the caref...| | 5.000| ETHIOPIA| 0|ven packages wake...| | 6.000| FRANCE| 3|refully final req...| | 7.000| GERMANY| 3|l platelets. regu...| | 8.000| INDIA| 2|ss excuses cajole...| | 9.000| INDONESIA| 2| slyly express as...| |10.000| IRAN| 4|efully alongside ...| |11.000| IRAQ| 4|nic deposits boos...| |12.000| JAPAN| 2|ously. final, exp...| |13.000| JORDAN| 4|ic deposits are b...| |14.000| KENYA| 0| pending excuses ...| |15.000| MOROCCO| 0|rns. blithely bol...| |16.000| MOZAMBIQUE| 0|s. ironic, unusua...| |17.000| PERU| 1|platelets. blithe...| |18.000| CHINA| 2|c dependencies. f...| |19.000| ROMANIA| 3|ular asymptotes a...| |20.000| SAUDI ARABIA| 4|ts. silent reques...| |21.000| VIETNAM| 2|hely enticingly e...| |22.000| RUSSIA| 3| requests against...| |23.000|UNITED KINGDOM| 3|eans boost carefu...| |24.000| UNITED STATES| 1|y final packages....| | 0.000| null| 0| haggle. carefull...| | 1.000| null| 1|al foxes promise ...| | 2.000| null| 1|y alongside of th...| | 3.000| null| 1|eas hang ironic, ...| | 4.000| null| 4|y above the caref...| | 5.000| null| 0|ven packages wake...| | 6.000| null| 3|refully final req...| | 7.000| null| 3|l platelets. regu...| | 8.000| null| 2|ss excuses cajole...| | 9.000| null| 2| slyly express as...| |10.000| null| 4|efully alongside ...| |11.000| null| 4|nic deposits boos...| |12.000| null| 2|ously. final, exp...| |13.000| null| 4|ic deposits are b...| |14.000| null| 0| pending excuses ...| |15.000| null| 0|rns. blithely bol...| |16.000| null| 0|s. ironic, unusua...| |17.000| null| 1|platelets. blithe...| |18.000| null| 2|c dependencies. f...| |19.000| null| 3|ular asymptotes a...| |20.000| null| 4|ts. silent reques...| |21.000| null| 2|hely enticingly e...| |22.000| null| 3| requests against...| |23.000| null| 3|eans boost carefu...| |24.000| null| 1|y final packages....| +------+--------------+----+--------------------+
- カラム追加前のファイル
% parquet meta run-1685494785647-part-block-0-r-00000-snappy.parquet File path: run-1685494785647-part-block-0-r-00000-snappy.parquet Created by: parquet-glue version 1.8.2 Properties: (none) Schema: message glue_schema { optional int64 col0 (DECIMAL(10,3)); optional int64 col2; optional binary col3 (STRING); } Row group 0: count: 25 65.52 B records start: 4 total(compressed): 1.600 kB total(uncompressed):2.494 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max col0 INT64 S _ 25 8.08 B 0 "0.000" / "24.000" col2 INT64 S _ R 25 5.04 B 0 "0" / "4" col3 BINARY S _ 25 52.40 B 0 " haggle. carefully final ..." / "y final packages. slow fo..."
- カラム(col1)追加後のファイル
% parquet meta run-1685493062322-part-block-0-r-00000-snappy.parquet File path: run-1685493062322-part-block-0-r-00000-snappy.parquet Created by: parquet-glue version 1.8.2 Properties: (none) Schema: message glue_schema { optional int64 col0 (DECIMAL(10,3)); optional binary col1 (STRING); optional int64 col2; optional binary col3 (STRING); } Row group 0: count: 25 77.32 B records start: 4 total(compressed): 1.888 kB total(uncompressed):2.811 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max col0 INT64 S _ 25 8.08 B 0 "0.000" / "24.000" col1 BINARY S _ 25 11.80 B 0 "ALGERIA" / "VIETNAM" col2 INT64 S _ R 25 5.04 B 0 "0" / "4" col3 BINARY S _ 25 52.40 B 0 " haggle. carefully final ..." / "y final packages. slow fo..."
- Glue カタログ
カラム(col1)削除
- 上記と同じ状態で Glue カタログでカラムだけ削除してみるとそのカラムは表示されない。
>>> sql("SELECT * FROM `tpc-h_10gb_parquet`.nation_tbl").show(1000) +------+----+--------------------+ | col0|col2| col3| +------+----+--------------------+ | 0.000| 0| haggle. carefull...| | 1.000| 1|al foxes promise ...| | 2.000| 1|y alongside of th...| | 3.000| 1|eas hang ironic, ...| | 4.000| 4|y above the caref...| | 5.000| 0|ven packages wake...| | 6.000| 3|refully final req...| | 7.000| 3|l platelets. regu...| | 8.000| 2|ss excuses cajole...| | 9.000| 2| slyly express as...| |10.000| 4|efully alongside ...| |11.000| 4|nic deposits boos...| |12.000| 2|ously. final, exp...| |13.000| 4|ic deposits are b...| |14.000| 0| pending excuses ...| |15.000| 0|rns. blithely bol...| |16.000| 0|s. ironic, unusua...| |17.000| 1|platelets. blithe...| |18.000| 2|c dependencies. f...| |19.000| 3|ular asymptotes a...| |20.000| 4|ts. silent reques...| |21.000| 2|hely enticingly e...| |22.000| 3| requests against...| |23.000| 3|eans boost carefu...| |24.000| 1|y final packages....| | 0.000| 0| haggle. carefull...| | 1.000| 1|al foxes promise ...| | 2.000| 1|y alongside of th...| | 3.000| 1|eas hang ironic, ...| | 4.000| 4|y above the caref...| | 5.000| 0|ven packages wake...| | 6.000| 3|refully final req...| | 7.000| 3|l platelets. regu...| | 8.000| 2|ss excuses cajole...| | 9.000| 2| slyly express as...| |10.000| 4|efully alongside ...| |11.000| 4|nic deposits boos...| |12.000| 2|ously. final, exp...| |13.000| 4|ic deposits are b...| |14.000| 0| pending excuses ...| |15.000| 0|rns. blithely bol...| |16.000| 0|s. ironic, unusua...| |17.000| 1|platelets. blithe...| |18.000| 2|c dependencies. f...| |19.000| 3|ular asymptotes a...| |20.000| 4|ts. silent reques...| |21.000| 2|hely enticingly e...| |22.000| 3| requests against...| |23.000| 3|eans boost carefu...| |24.000| 1|y final packages....| +------+----+--------------------+