Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite_data_files does not respect table sort order #10346

Open
bk-mz opened this issue May 17, 2024 · 5 comments
Open

rewrite_data_files does not respect table sort order #10346

bk-mz opened this issue May 17, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@bk-mz
Copy link

bk-mz commented May 17, 2024

Apache Iceberg version

1.5.2 (latest release)

Query engine

Spark

Please describe the bug 馃悶

image

// let's take this partitions that has 77 files
val partitionId = 476167 // 2024-04-27 07:00:00
val sortFieldId = 44 // key='sid'
val table_name = '' // 

def countOverlappingRanges(): Long = {
  val df = spark.sql(s"""
    select
      string(element_at(lower_bounds, $sortFieldId)) as lower,
      string(element_at(upper_bounds, $sortFieldId)) as upper
    from $table_name.files
    where not contains(file_path, 'deletes')
      and partition.data_load_ts_hour = $partitionId
  """)

  val overlappingCount = df.as("df1")
    .crossJoin(df.as("df2"))
    .filter($"df1.upper" >= $"df2.lower" && $"df1.lower" <= $"df2.upper" && $"df1.lower" < $"df2.lower")
    .count()

  overlappingCount
}


// let's rewrite with explicit sort order
spark.sql(s"""CALL system.rewrite_data_files(table => $table_name, where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'), sort_order => 'sid ASC')""").show(false);

countOverlappingRanges

// let's rewrite w/o specifying sort order
spark.sql(s"""CALL system.rewrite_data_files(table => $table_name, where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'))""").show(false);

countOverlappingRanges

Output

scala> spark.sql("""CALL system.rewrite_data_files(table => '...', where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'), sort_order => 'sid ASC')""").show(false);
+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|80                        |73                    |9848363198           |0                      |
+--------------------------+----------------------+---------------------+-----------------------+


scala>

scala> countOverlappingRanges
res37: Long = 0

scala>

scala> // let's rewrite w/o specifying sort order

scala> spark.sql("""CALL system.rewrite_data_files(table => '...', where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'))""").show(false);
+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|73                        |81                    |9869637801           |0                      |
+--------------------------+----------------------+---------------------+-----------------------+


scala>

scala> countOverlappingRanges
res39: Long = 1385
@bk-mz bk-mz added the bug Something isn't working label May 17, 2024
@RussellSpitzer
Copy link
Member

What plan was used when the sort order wasn't specified, we should be able to see without any data file checking what sort order was used?

This should be easily visible in the Spark UI

@bk-mz
Copy link
Author

bk-mz commented May 17, 2024

Screenshot 2024-05-17 at 18 11 57

with explicit sort

== Parsed Logical Plan ==
AppendData RelationV2[ ... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[ ... 22 more fields] default_cache_iceberg.`id1` glue.table

== Analyzed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table

== Optimized Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true, IcebergWrite(table=glue.table, format=PARQUET), Sort [sid#155 ASC NULLS FIRST], false
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[... 22 more fields] glue.table

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$4781/1107532386@2e3397b1, IcebergWrite(table=glue.table, format=PARQUET)
+- Sort [sid#155 ASC NULLS FIRST], false, 0
   +- Exchange rangepartitioning(sid#155 ASC NULLS FIRST, 73), REPARTITION_BY_NUM, [plan_id=17]
      +- Project [... 22 more fields]
         +- BatchScan glue.table[... 22 more fields] glue.table (branch=null) [filters=, groupedBy=] RuntimeFilters: []

w/o explicit sort

== Parsed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true
+- RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table

== Analyzed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true
+- RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table

== Optimized Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true, IcebergWrite(table=glue.table, format=PARQUET), RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table
+- Sort [staticinvoke(class org.apache.iceberg.spark.functions.HoursFunction$TimestampToHoursFunction, IntegerType, invoke, data_load_ts#641, TimestampType, false, true, true) ASC NULLS FIRST, sid#639 ASC NULLS FIRST], false
   +- RelationV2[... 22 more fields] glue.table

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$4781/1107532386@204b1e50, IcebergWrite(table=glue.table, format=PARQUET)
+- Sort [staticinvoke(class org.apache.iceberg.spark.functions.HoursFunction$TimestampToHoursFunction, IntegerType, invoke, data_load_ts#641, TimestampType, false, true, true) ASC NULLS FIRST, sid#639 ASC NULLS FIRST], false, 0
   +- Project [... 22 more fields]
      +- BatchScan glue.table[... 22 more fields] glue.table (branch=null) [filters=, groupedBy=] RuntimeFilters: []

@bk-mz
Copy link
Author

bk-mz commented May 17, 2024

Okay, I admit that calling this non-working was a bit premature.

Still, the thing is that partition has a lot of overlapping files after a partition without explicitly setting the sort-order.

data_load_ts in any case is a hidden partition timestamp which is a constant for all of the records in it, truncated to hour.

@RussellSpitzer
Copy link
Member

So definitely using the default sort order as evidenced by the plan but something in our sort request to spark isn't working properly. While the two plans are different I feel like they should both have correct output. Probably will need to debug a bit more

@RussellSpitzer
Copy link
Member

A little odd that the first plan doesn't have the partitioning transform which it probably should have ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants