事象
- Hive で Parquet にクエリすると "java.io.IOException: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file" と怒られる。
$ hive Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true hive> select count(*) from sales; FAILED: SemanticException [Error 10001]: Line 1:21 Table not found 'sales' hive> select count(*) from sh10_option.sales; Query ID = hadoop_20181208023457_ca9f628a-2f32-493a-8900-f22f51ff774b Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1543787350286_0018) ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 container RUNNING 28 0 0 28 46 0 Reducer 2 container INITED 1 0 0 1 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 00/02 [>>--------------------------] 0% ELAPSED TIME: 139.23 s ---------------------------------------------------------------------------------------------- Status: Failed Vertex failed, vertexName=Map 1, vertexId=vertex_1543787350286_0018_1_00, diagnostics=[Task failed, taskId=task_1543787350286_0018_1_00_000008, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_1543787350286_0018_1_00_000008_0:java.lang.RuntimeException:java.lang.RuntimeException: java.io.IOException: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://bigdata-handson-123456789012/data/parquet_pyspark/sh10/sales/year=2012/month=6/part-00048-86118253-0ee3-45f9-8a9b-c0df410712e0.c000.snappy.parquet at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.io.IOException: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://bigdata-handson-123456789012/data/parquet_pyspark/sh10/sales/year=2012/month=6/part-00048-86118253-0ee3-45f9-8a9b-c0df410712e0.c000.snappy.parquet at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:206) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.<init>(TezGroupedSplitsInputFormat.java:145) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.getRecordReader(TezGroupedSplitsInputFormat.java:111) at org.apache.tez.mapreduce.lib.MRReaderMapred.setupOldRecordReader(MRReaderMapred.java:157) at org.apache.tez.mapreduce.lib.MRReaderMapred.setSplit(MRReaderMapred.java:83) at org.apache.tez.mapreduce.input.MRInput.initFromEventInternal(MRInput.java:694) at org.apache.tez.mapreduce.input.MRInput.initFromEvent(MRInput.java:653) at org.apache.tez.mapreduce.input.MRInputLegacy.checkAndAwaitRecordReaderInitialization(MRInputLegacy.java:145) at org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:109) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.getMRInput(MapRecordProcessor.java:525) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:171) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:184) ... 14 more
原因
- Hive では decimal 型は INT 32 で定義されるが、Spark 1.4 以降では精度によって INT32 か INT64 か変わるため。INT 64 になると上記のエラーになる。
This issue is caused because of different parquet conventions used in Hive and Spark. In Hive, the decimal datatype is represented as fixed bytes (INT 32). In Spark 1.4 or later the default convention is to use the Standard Parquet representation for decimal data type. As per the Standard Parquet representation based on the precision of the column datatype, the underlying representation changes.
eg: DECIMAL can be used to annotate the following types: int32: for 1 <= precision <= 9 int64: for 1 <= precision <= 18; precision < 10 will produce a warningHence this issue happens only with the usage of datatypes which have different representations in the different Parquet conventions. If the datatype is DECIMAL (10,3), both the conventions represent it as INT32, hence we won't face an issue. If you are not aware of the internal representation of the datatypes it is safe to use the same convention used for writing while reading. With Hive, you do not have the flexibility to choose the Parquet convention. But with Spark, you do.
java - parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file - Stack Overflow
解決策
- PySpark のコードで sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true") を指定するかもしくは、/etc/spark/conf/spark-defaults.conf で以下の通り設定する。
spark.sql.parquet.writeLegacyFormat true
- 設定を確認する
$ pyspark >>> from pyspark.context import SparkContext >>> from pyspark.sql import SQLContext >>> from pyspark.sql.functions import year, month, date_format >>> sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2") >>> sc._conf.getAll() [(u'spark.eventLog.enabled', u'true'), (u'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem', u'2'), (中略)(u'spark.sql.parquet.writeLegacyFormat', u'true'), (中略)(u'spark.blacklist.decommissioning.enabled', u'true')]
環境
- emr-5.19.0
- Spark 2.3.2