New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support clone latest snapshot #3287
base: master
Are you sure you want to change the base?
support clone latest snapshot #3287
Conversation
bd32ebd
to
b67e04e
Compare
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
Outdated
Show resolved
Hide resolved
...k/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfoTypeInfo.java
Outdated
Show resolved
Hide resolved
...link/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
Outdated
Show resolved
Hide resolved
...link/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
Outdated
Show resolved
Hide resolved
...imon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/clone/CopyFileUtils.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/clone/CopyFileUtils.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/clone/CopyFileUtils.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/clone/CloneFileInfo.java
Outdated
Show resolved
Hide resolved
...nk/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
Outdated
Show resolved
Hide resolved
b67e04e
to
034753f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your IT cases should also include copying changelog, index files, etc. Everything you copy must be tested.
You'll also need a random test with very fast expiration speed to make sure your code really works when the source table is modified at the same time.
+ "--target_database <target_database_name> " | ||
+ "--target_table <target_table_name> " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
target_database
and target_table
are also optional.
if (!sourceCatalogConfig.isEmpty()) { | ||
this.sourceCatalogConfig = sourceCatalogConfig; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!sourceCatalogConfig.isEmpty()) { | |
this.sourceCatalogConfig = sourceCatalogConfig; | |
} | |
this.sourceCatalogConfig = sourceCatalogConfig; |
if (!targetCatalogConfig.isEmpty()) { | ||
this.targetCatalogConfig = targetCatalogConfig; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!targetCatalogConfig.isEmpty()) { | |
this.targetCatalogConfig = targetCatalogConfig; | |
} | |
this.targetCatalogConfig = targetCatalogConfig; |
|
||
/** | ||
* Pick the tables to be cloned based on the user input parameters. The record type of the build | ||
* DataStream is Tuple2. The left element is the identifier of source table and the right element is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* DataStream is Tuple2. The left element is the identifier of source table and the right element is | |
* DataStream is {@link Tuple2}. The left element is the identifier of source table and the right element is |
} catch (Exception e) { | ||
// ignore | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't igonre exceptions. Throw it out.
|
||
import org.apache.paimon.table.sink.ChannelComputer; | ||
|
||
/** SnapshotHintChannelComputer. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write meaningful comments please.
* When the files copy finished of a table, then create snapshot hint, it means that this table can | ||
* be used now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* When the files copy finished of a table, then create snapshot hint, it means that this table can | |
* be used now. | |
* Creates snapshot hint files after copying a table. |
LOG.info( | ||
"Skipping target file {} because it already exists and has the same size.", | ||
targetPath); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider copying the whole database. There are two tables in this database. All files of table A has been copied, so no record about table A will be sent to SnapshotHintOperator
. Snapshot hint files of table A will not be created.
+ " dt STRING," | ||
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED" | ||
+ ") PARTITIONED BY (dt, hh) WITH (" | ||
+ " 'write-only' = 'true'," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why write only?
prepareTable( | ||
Arrays.asList("dt", "hh"), | ||
Arrays.asList("dt", "hh", "k"), | ||
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why write only?
Run the following command to submit a clone job for the table's latest Snapshot. | ||
|
||
```bash | ||
CALL sys.clone('source_warehouse', 'source_database', 'source_table', '', 'target_warehouse', 'target_database', 'target_table', '', '') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just support Flink 1.19. Using named argurment.
Purpose
Linked issue: close #xxx
Tests
API and Format
Documentation