Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] ClassCastException when reading a Hudi dataset with a timestamp partition field #11140

Open
harichandanp opened this issue May 2, 2024 · 1 comment
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. reader-core

Comments

@harichandanp
Copy link

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced
I am getting a class cast exception (java.lang.ClassCastException: class java.lang.Long cannot be cast to class org.apache.spark.unsafe.types.UTF8String) when I try to read a Hudi dataset with a timestamp partition field

To Reproduce

Steps to reproduce the behavior:

Generated dummy data as follows:

case class A(id: Long, ts: Long)

scala> List(A(1L, 1713033644000L), A(2L, 1713033887000L), A(3L, 1713033890000L)).toDF
      .write
      .format("hudi")
      .option("hoodie.table.name", "dummy3")
      .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
      .option("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMddHH")
      .option("hoodie.keygen.timebased.timestamp.type", "EPOCHMILLISECONDS")
      .option("hoodie.datasource.write.partitionpath.field", "ts")
      .option("hoodie.datasource.write.precombine.field", "ts")
      .option("hoodie.datasource.write.recordkey.field", "id")
      .mode("overwrite")
      .save("/tmp/dummy3")

When I try to read this data using Hudi 0.13.0+ (0.13.0 - 0.14.1), I get the following error:

$ /usr/local/spark-3.3.0-bin-hadoop3/bin/spark-shell --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

scala> spark.read.format("hudi").load("/tmp/dummy3").show
24/05/02 10:54:51 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/05/02 10:54:51 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/05/02 10:54:56 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ClassCastException: class java.lang.Long cannot be cast to class org.apache.spark.unsafe.types.UTF8String (java.lang.Long is in module java.base of loader 'bootstrap'; org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app')
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:72)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:269)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:280)
	at org.apache.spark.sql.execution.datasources.parquet.Spark33LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark33LegacyHoodieParquetFileFormat.scala:316)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:553)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

The schema of the data is:

scala> spark.read.format("hudi").load("/tmp/dummy3").printSchema
root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- ts: string (nullable = true)

The field is stored as long in the parquet file:

scala> spark.read.format("parquet").load("/tmp/dummy3/2024041311").printSchema
root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ts: long (nullable = true)

Expected behavior

I am able to read in the data when using Hudi 0.12.3 or below (tested till 0.9.0)

$ /usr/local/spark-3.3.0-bin-hadoop3/bin/spark-shell --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.3 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

scala> spark.read.format("hudi").load("/tmp/dummy3").show
24/05/02 10:55:26 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/05/02 10:55:26 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+
|  20240502105345612|20240502105345612...|                 2|            2024041311|cfcdc5ab-77da-4eb...|  2|2024041311|
|  20240502105345612|20240502105345612...|                 3|            2024041311|cfcdc5ab-77da-4eb...|  3|2024041311|
|  20240502105345612|20240502105345612...|                 1|            2024041311|cfcdc5ab-77da-4eb...|  1|2024041311|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----------+

Environment Description

  • Hudi version : 0.13.0 and later (0.13.0, 0.13.1, 0.14.0, 0.14.1)

  • Spark version : 3.3.0 and 3.4.1

  • Hive version : -

  • Hadoop version : -

  • Storage (HDFS/S3/GCS..) : Tested with Local Disk and S3

  • Running on Docker? (yes/no) : no

Additional context

Stacktrace

Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class org.apache.spark.unsafe.types.UTF8String (java.lang.Long is in module java.base of loader 'bootstrap'; org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app')
  at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
  at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
  at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
  at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:72)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:269)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:280)
  at org.apache.spark.sql.execution.datasources.parquet.Spark33LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark33LegacyHoodieParquetFileFormat.scala:316)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
  at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:553)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)
@ad1happy2go
Copy link
Contributor

Thanks for raising this @harichandanp . I was able to reproduce the issue and it looks like a valid bug. Created jira for tracking the fix - https://issues.apache.org/jira/browse/HUDI-7709

Feel free to contribute in case you are interested to fix. Thanks.

@codope codope added priority:critical production down; pipelines stalled; Need help asap. on-call-triaged reader-core labels May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:critical production down; pipelines stalled; Need help asap. reader-core
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

3 participants