Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Dropping partition column from old partition table corrupts entire table #10234

Open
EXPEbdodla opened this issue Apr 26, 2024 · 8 comments · May be fixed by #10352
Open

Spark: Dropping partition column from old partition table corrupts entire table #10234

EXPEbdodla opened this issue Apr 26, 2024 · 8 comments · May be fixed by #10352
Labels
bug Something isn't working

Comments

@EXPEbdodla
Copy link

Apache Iceberg version

1.5.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

Problem: Dropping an old partition spec column corrupts table. Metadata queries failing, Select fails and dataframe appends failing. Verified this on 1.2.1, 1.4.3 and 1.5.1 iceberg versions

What to expect:

  • If its a valid scenario, INSERT, SELECT should work as expected
  • If Dropping a old partition spec column is not expected, it should fail to run drop column command

How to reproduce:
Try_Iceberg_partition_column_delete.md

Exported notebook as MD file and uploading it

StackTrace:
SELECT:

Caused by: java.lang.NullPointerException: Type cannot be null at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) at org.apache.iceberg.Partitioning.buildPartitionProjectionType(Partitioning.java:286) at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:254) at org.apache.iceberg.spark.source.SparkTable.metadataColumns(SparkTable.java:251) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput$lzycompute(DataSourceV2Relation.scala:61) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput(DataSourceV2Relation.scala:51) at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.metadataOutput(basicLogicalOperators.scala:1339) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3$adapted(Analyzer.scala:966) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2$adapted(Analyzer.scala:961) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:346) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1$adapted(Analyzer.scala:961) at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95) at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92) at scala.collection.immutable.Stream.exists(Stream.scala:204) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.org$apache$spark$sql$catalyst$analysis$Analyzer$AddMetadataColumns$$hasMetadataCol(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:931) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:928) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:928) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:924) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:231) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:212) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) ... 25 more

For Spark Dataframe Append:

org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: event_date: identity(6) at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49) at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:587) at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:568) at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88) at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:95) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86) at org.apache.iceberg.SerializableTable.lambda$specs$1(SerializableTable.java:192) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at org.apache.iceberg.SerializableTable.specs(SerializableTable.java:190) at org.apache.iceberg.SerializableTable.spec(SerializableTable.java:180) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:638) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:632) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:430) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

@EXPEbdodla EXPEbdodla added the bug Something isn't working label Apr 26, 2024
@EXPEbdodla EXPEbdodla changed the title Dropping partition column from old partition table corrupts table Spark: Dropping partition column from old partition table corrupts entire table Apr 26, 2024
@manuzhang
Copy link
Contributor

Which Spark version are you using?

@EXPEbdodla
Copy link
Author

Which Spark version are you using?

I was originally trying with Spark 3.3.0 and iceberg 1.2.1 version.

Later I tried with Spark-iceberg docker images tabulario/spark-iceberg:latest, tabulario/spark-iceberg:3.5.1_1.4.3 and tabulario/spark-iceberg:3.3.2_1.2.1.

@manuzhang
Copy link
Contributor

I don't think drop column should be allowed when you still have data with the column as partition.

@EXPEbdodla
Copy link
Author

Once partition column is dropped/ replaced from the partition spec, it should be similar to DROP COLUMN. Is that a right assumption?

@Fokko
Copy link
Contributor

Fokko commented May 2, 2024

@EXPEbdodla To clarify, are you dropping a column that's part of the current partition spec?

I did some work to fix this on previous partition specs: #5399

@Fokko
Copy link
Contributor

Fokko commented May 2, 2024

Thanks for sharing the steps on how to reproduce this. I think the problem here is that it is not the current spec, but it is still in use. This looks like a bug indeed and should be fixed.

@manuzhang
Copy link
Contributor

manuzhang commented May 3, 2024

The issue is we don't persist the partition spec along with its schema in metadata.json file, and always use current schema when rebuilding partition spec from metadata file.

@nastra
Copy link
Contributor

nastra commented May 17, 2024

I was able to reproduce this with this short example and I agree that this is a bug that we need to fix:

create schema iceberg;
create table iceberg.tbl (id int, trip_id int, distance int) partitioned by (bucket(8, trip_id));
insert into iceberg.tbl values (1, 1, 10);
alter table iceberg.tbl replace partition field trip_id_bucket with bucket(4, distance);
alter table iceberg.tbl drop column trip_id;
select * from iceberg.tbl;
Caused by: org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: trip_id_bucket: bucket[8](2)
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:562)
	at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:543)
	at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46)
	at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71)
	at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88)
	at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:98)
	at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
	at org.apache.iceberg.BaseContentScanTask.spec(BaseContentScanTask.java:71)
	at org.apache.iceberg.BaseFileScanTask.spec(BaseFileScanTask.java:27)
	at org.apache.iceberg.BaseFileScanTask$SplitScanTask.spec(BaseFileScanTask.java:127)
	at org.apache.iceberg.util.PartitionUtil.constantsMap(PartitionUtil.java:49)
	at org.apache.iceberg.util.PartitionUtil.constantsMap(PartitionUtil.java:42)
	at org.apache.iceberg.spark.source.BaseReader.constantsMap(BaseReader.java:198)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:91)
	at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants