Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Implement Purge table for SparkCatalog #3015

Open
wants to merge 82 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
5bf3fc0
[PAIMON-1401] [Feature] Implement Purge table for SparkCatalog
zhu3pang Mar 15, 2024
db88321
[hotfix] Fix unstable testRandomCdcEventsUnawareBucket (#3014)
yuzelin Mar 15, 2024
94a15a0
[flink] Refactor: flink tests use common CatalogITCaseBase (#3016)
yuzelin Mar 15, 2024
dd0ef63
[core] Support compiled class cache in CodeGenUtils
xiangyuf Mar 14, 2024
dbdcffd
[core] Optimize compiled class cache in CodeGenUtils
JingsongLi Mar 15, 2024
3939cd3
[core] AvroBulkFormat should not get file-size by file-io repeatly (#…
leaves12138 Mar 15, 2024
ae4ecc0
[hive] Cache TableSchema into Configuration to avoid loading read sch…
wg1026688210 Mar 15, 2024
4508589
[core] Support configuring lock in paimon catalog (#2933)
FangYongs Mar 15, 2024
c647733
[codegen] EqualiserCodeGenerator supports ARRAY<ROW> (#3023)
yuzelin Mar 15, 2024
a30275f
[core] Unified Naming of JdbcCatalog Fields (#3025)
sunxiaojian Mar 18, 2024
d0f684e
[hotfix] upgrade json-path and avro version (#2987)
zhuangchong Mar 18, 2024
55304dc
[core] Dv table supports value filter pushdown (#3024)
Zouxxyy Mar 18, 2024
edbc555
[hive] Fix hive writer in tez-mr (#2954)
Alibaba-HZY Mar 18, 2024
aea5509
[hive] Add license for TezUtil
JingsongLi Mar 18, 2024
c2b2cc8
[core] Reduce schema read for readers (#3021)
JingsongLi Mar 18, 2024
b86a8a6
[core] Add more validation for sequence field during create table. (#…
hzjhjjyy Mar 18, 2024
5712563
[doc] Updated Spark quickstart documentation for using --packages (#2…
cxzl25 Mar 18, 2024
66eb936
[core] Use var length encoding for row position (#3031)
JingsongLi Mar 18, 2024
a0817bd
[oss] use multi-release mechanism to ship jaxb (#2658)
zhoulii Mar 18, 2024
731f3ef
[core] Data files with delete records should not be upgraded directly…
tsreaper Mar 18, 2024
b88ba74
[Flink] Introduce range strategy for sort compaction. (#2749)
wg1026688210 Mar 18, 2024
e76d242
[spark] spark sql support get/list function (#2880)
waywtdcc Mar 18, 2024
1e22661
[docs] Trino docs update for sharing file system. (#3035)
leaves12138 Mar 18, 2024
b8ebe59
[core] Fix dv table with partial-update and aggregate (#3036)
Zouxxyy Mar 18, 2024
5dacfb5
[core] Introduce BatchTableCommit.truncateTable (#3037)
JingsongLi Mar 18, 2024
ede603e
[core] orc/parquet reader obtain the fileSize from metadata (#2918)
wg1026688210 Mar 19, 2024
abc5b8c
[core] Optimize write performance (#3039)
JingsongLi Mar 19, 2024
5d5dcc4
[core] Reduce getFileSize for avro reader (#3040)
JingsongLi Mar 19, 2024
4df3140
[core] support create tag based on tag (#3044)
Aitozi Mar 19, 2024
b721428
[flink] Remove compatibility utils that are used for flink-1.14 (#3054)
yuzelin Mar 20, 2024
453d1e0
[core] Add validation for fields prefix (#3052)
Zouxxyy Mar 20, 2024
77f050c
[core] add initial map size to solve load hash index slowly (#3051)
Stephen0421 Mar 20, 2024
ea88e87
[core] Introduce Int2ShortHashMap.Builder to to accelerate init
JingsongLi Mar 20, 2024
32378ad
[core] support to drop partition when delete tag (#3042)
Aitozi Mar 20, 2024
3ebe41a
[core] Optimize first_row batch read (#3055)
JingsongLi Mar 20, 2024
961bd64
[flink] Bump flink version to 1.19 (#3049)
yuzelin Mar 20, 2024
e001946
[orc] ORC Support ZStandard compression using zstd-jni (#3056)
zyl891229 Mar 20, 2024
ba8e999
[license] Add license to orc copied files
JingsongLi Mar 20, 2024
a9ace38
[doc] Introduce orc options documentation page (#3061)
JingsongLi Mar 21, 2024
e38916e
[doc] Add flink 1.19 to the documentation. (#3064)
zhuangchong Mar 21, 2024
df25ef2
[hotfix] Remove redundant and unused test code (#3066)
yuzelin Mar 21, 2024
332a994
[doc] Fix version in trino documentation (#3072)
Zouxxyy Mar 21, 2024
92f0ad5
[core] Skip rewrite when delete row count is 0 with dv (#3070)
Zouxxyy Mar 21, 2024
7559efa
[hotfix] Add project root path to files to facilitate compiling modul…
Zouxxyy Mar 21, 2024
2aad966
[flink] Compact procedure does not process 'ALL' word (#3065)
leaves12138 Mar 21, 2024
6575b81
[fix] Fix unstable test TagAutoCreationTest#testModifyTagPeriod (#3073)
yuzelin Mar 22, 2024
5579dee
[spark] Minor refactor SparkWriter to separate commit (#3067)
YannByron Mar 22, 2024
afa1b10
[flink] Fix that compact actions haven't handled scan parallelism (#3…
yuzelin Mar 22, 2024
f16ccdd
[core] JdbcCatalog supports custom configuration for the length of lo…
sunxiaojian Mar 22, 2024
5bc04c6
[doc] Introduce the minimum version of Maven. (#3082)
wg1026688210 Mar 25, 2024
ee4fa5d
[doc] Apache Paimon became top level project. (#3085)
leaves12138 Mar 25, 2024
ece3375
[incubating] Remove incubator disclaimer
JingsongLi Mar 25, 2024
e745c2a
[flink] Compact procedure supports named arguments (#3078)
yuzelin Mar 25, 2024
5c78be1
[cdc] Update the latest dynamic options to the table schema file when…
zhuangchong Mar 25, 2024
de61a18
[flink] Fix performance issue in ContinuousFileSplitEnumerator (#3071)
yuzelin Mar 25, 2024
0939a82
[Refactor] Minor refactor update the latest dynamic options using the…
yuzelin Mar 25, 2024
8ce013c
[cdc] Extract common code (#3050)
zhangjun0x01 Mar 26, 2024
7e33038
[cdc] Fix Mysql sync_table unable to synchronize newly added tables w…
huyuanfeng2018 Mar 26, 2024
effc57e
[hotfix] Apply spotless to fix checkstyle violation in flink-cdc modu…
yuzelin Mar 26, 2024
2e65cc5
[test] Append only table can't test failing IO re-insert yet (#3089)
leaves12138 Mar 26, 2024
8ce82a7
[core] Introduce SplitGroup for SplitGenerator to optimize more rawFi…
Zouxxyy Mar 26, 2024
f2a4e38
[hotfix] Fix unstable test MySqlSyncTableActionITCase#testSyncShards …
yuzelin Mar 26, 2024
c928967
[core] Refactor FileStorePathFactory to clean test methods (#3093)
JingsongLi Mar 26, 2024
096c433
[core] Add TableFormatBenchmark to test format only
JingsongLi Mar 26, 2024
cbe121b
[spark][core] Support input_file_name UDF (#3094)
YannByron Mar 26, 2024
7f27f04
[core] Unify FileRecordReader and reduce file access (#3098)
JingsongLi Mar 26, 2024
59d9ec7
[core] Support common jdbc catalog lock for filesystem catalog. (#3076)
sunxiaojian Mar 26, 2024
78b1ace
[core] Fix OrphanFilesClean has a chance to delete normal file (#3075)
wxplovecc Mar 27, 2024
4c17a3a
[catalog] Refactor Catalog Factory and revert jdbc lock in FileSystem…
JingsongLi Mar 27, 2024
a0809a2
[core] Shade roaringbitmap dependency into paimon-common (#3100)
Zouxxyy Mar 27, 2024
ac57e65
[cdc] Fix cdc job mistakenly changes immutable options of existing ta…
zhuangchong Mar 27, 2024
c265c6f
[core] Support dv with avro format (#3105)
Zouxxyy Mar 27, 2024
22832e9
[core] Introduce File index format for data skipping (#3068)
leaves12138 Mar 27, 2024
a5eff5f
[test] Fix flaky test in `KeyValueFileStoreScanTest.testWithValueFilt…
Zouxxyy Mar 28, 2024
e110f57
[doc] Introduce deletion vector documentation page (#3107)
Zouxxyy Mar 28, 2024
4b99a3f
[core] Rename to compactDvIndexFiles in FileStoreCommitImpl
JingsongLi Mar 28, 2024
5fa79b8
[test] Fix test in KeyValueFileStoreScanTest, make it more accurate (…
leaves12138 Mar 28, 2024
9a89253
[format] Set orc.compression.zstd.level to 3 by default (#3114)
zyl891229 Mar 29, 2024
a9ff40a
[core] add deletedFiles in NewFilesIncrement (#3117)
YannByron Mar 29, 2024
fa9c6e7
[cdc] Fix that kafka message value might be null which causes NPE (#3…
yuzelin Mar 29, 2024
ea3797d
[PAIMON-1401] [Feature] Implement Purge table for SparkCatalog, add s…
zhu3pang Mar 29, 2024
9db339a
Merge remote-tracking branch 'origin/master' into PAIMON-1401
zhu3pang Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
dropTable(identifier, ignoreIfNotExists, false);
zhu3pang marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists, boolean ifPurge)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropTable");
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
Expand All @@ -181,10 +187,10 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throw new TableNotExistException(identifier);
}

dropTableImpl(identifier);
dropTableImpl(identifier, ifPurge);
}

protected abstract void dropTableImpl(Identifier identifier);
protected abstract void dropTableImpl(Identifier identifier, boolean ifPurge);

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ default boolean tableExists(Identifier identifier) {
*/
void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException;

/**
* @param ifPurge completely purge the table (skipping trash) while removing data from warehouse
* @see #dropTable(Identifier, boolean)
*/
void dropTable(Identifier identifier, boolean ignoreIfNotExists, boolean ifPurge)
throws TableNotExistException;

/**
* Create a new table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,21 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
Path path = getDataTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
if (ifPurge) {
uncheck(() -> fileIO.delete(path, true));
} else {
uncheck(
() ->
fileIO.rename(
path,
new Path(
trash(),
identifier.getDatabaseName()
+ ".db/"
+ identifier.getObjectName())));
}
}

@Override
Expand Down Expand Up @@ -188,4 +200,8 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

public String trash() {
return warehouse + "/.Trash";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ protected List<String> listTablesImpl(String databaseName) {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
//fixme purge for jdbc is not implemented
try {
int deletedRecords =
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
try {
client.dropTable(
identifier.getDatabaseName(), identifier.getObjectName(), true, false, true);
identifier.getDatabaseName(), identifier.getObjectName(), true, false, ifPurge);

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -105,6 +107,42 @@ public void testCsvTable() {
.containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
}

@Test
zhu3pang marked this conversation as resolved.
Show resolved Hide resolved
zhu3pang marked this conversation as resolved.
Show resolved Hide resolved
public void testDropTable() throws Exception {
spark.sql(
"CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"CT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
spark.sql("DROP TABLE CT").collect();
assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty();
assertThat(tableInTrash("CT")).isTrue();
}

@Test
public void testPurgeTable() throws Exception {
spark.sql(
"CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"CT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
spark.sql("DROP TABLE CT PURGE").collect();
assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty();
assertThat(tableInTrash("CT")).isFalse();
}

private static boolean tableInTrash(String table) {
return Files.exists(
Paths.get(
new Path(warehousePath, String.format(".Trash/default.db/%s", table))
.toUri()
.getPath()));
}

private static void writeTable(String tableName, GenericRow... rows) throws Exception {
FileStoreTable fileStoreTable =
FileStoreTableFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,17 @@ public SparkTable createTable(
@Override
public boolean dropTable(Identifier ident) {
try {
catalog.dropTable(toIdentifier(ident), false);
catalog.dropTable(toIdentifier(ident), false, false);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
return false;
}
}

@Override
public boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
try {
catalog.dropTable(toIdentifier(ident), true, true);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
return false;
Expand Down