Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] paimon 0.8 spark read tag error #3280

Closed
2 tasks done
eric666666 opened this issue Apr 28, 2024 · 3 comments · Fixed by #3289
Closed
2 tasks done

[Bug] paimon 0.8 spark read tag error #3280

eric666666 opened this issue Apr 28, 2024 · 3 comments · Fixed by #3289
Labels
bug Something isn't working

Comments

@eric666666
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

paimon 0.8 snapshot

Compute Engine

spark 3.3

Minimal reproduce step

select * from paimon.paimon.`ods_trade_orders_wm1$tags`
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:451)
	at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.$anonfun$run$2(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:984)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1072)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1081)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArithmeticException: long overflow
	at java.lang.Math.multiplyExact(Math.java:892)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:240)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:190)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(DateTimeUtils.scala)
	at org.apache.paimon.spark.SparkInternalRow.fromPaimon(SparkInternalRow.java:282)
	at org.apache.paimon.spark.SparkInternalRow.getTimestampMicros(SparkInternalRow.java:133)
	at org.apache.paimon.spark.SparkInternalRow.getLong(SparkInternalRow.java:128)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

It only fails on 0.8 snapshot. But 0.7-inclubating run success.
image

What doesn't meet your expectations?

I have debug it.
image
It seems create_time value get wrong.
please fix it.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@Zouxxyy
Copy link
Contributor

Zouxxyy commented Apr 29, 2024

I haven't reproduced it. Can you provide more detailed steps to reproduce it, is the tag written in spark?

@eric666666
Copy link
Contributor Author

I haven't reproduced it. Can you provide more detailed steps to reproduce it, is the tag written in spark?

I find the reson. The tag is created by flink at paimon 0.8 snapshot version which compliled early than 2024-04-15, But after 2024-04-15 Tag class add two new class field.
My tag file content:

{
  "version" : 3,
  "id" : 1,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-0",
  "deltaManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-1",
  "changelogManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-2",
  "commitUser" : "59c08f33-b5de-4478-823c-5e681ee6e0b3",
  "commitIdentifier" : 1,
  "commitKind" : "APPEND",
  "timeMillis" : 1714379998639,
  "logOffsets" : { },
  "totalRecordCount" : 2531,
  "deltaRecordCount" : 2531,
  "changelogRecordCount" : 8227,
  "watermark" : 1714407456182
}

However new commit add new class fields:
image
Spark read tag system table‘s jar is compiled after 2024-04-15.
So new version code read tag's tagCreateTime using LocalDateTime.MIN.
image
This directly made next exception

Caused by: java.lang.ArithmeticException: long overflow
	at java.lang.Math.multiplyExact(Math.java:892)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:240)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:190)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(DateTimeUtils.scala)

Because LocalDateTime.MIN timestamp millis convert to micros is bigger than Long.MAX_VALUE.
image
I think there could use a big Date to instead LocalDateTime.MIN which is just a symbol to express old Tag file do not having tagCreateTime field. Such as we could use Epoch time.
So i will submit a PR to fix it.

@JingsongLi
Copy link
Contributor

Thanks @eric666666 for reporting and investigation. We should return null for these two fields when they are nulls.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants