Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Flink read / write data branch #3029

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sunxiaojian
Copy link
Contributor

@sunxiaojian sunxiaojian commented Mar 16, 2024

Purpose

Linked issue: close (#2861)

When writing and reading branch data for Flink, it was found that if the "TableCommitImpl newcommit (String commitUser, String branchName)" interface is added like this The parameter "branchName" has a wide range of impacts, and when adding features in the future, we will also consider whether to support the branch, which is a challenge for developers. Therefore, in order to make developers not need to worry about the branch function and not need to consider branch writing and reading when adding interfaces, I have made the following refactoring and optimization

  1. Refactoring TagManager, SnapshotManager, and SchemeManager to support specifying default branch parameters,

The implementation logic of 'SnapshotManager snapshotManager()', 'TagManager tagManager()', and 'BranchManager branchManager()' directly specifies the default written branch, so that we no longer need to specify the 'branch' parameter when declaring new interfaces for subsequent interfaces.

  1. Add necessary unit test tests, such as CDC testing, to support Flink CDC writing to branches

  2. After this PR, remove interfaces with the branchName parameter such as' TableCommitImpl newcommit (String commitUser, String branchName) '

Tests

API and Format

Documentation

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch from 2a0fd59 to 2ab54f9 Compare March 16, 2024 15:59
@sunxiaojian
Copy link
Contributor Author

@FangYongs @schnappi17 PTAL cc @JingsongLi

@sunxiaojian
Copy link
Contributor Author

@FangYongs @schnappi17 When you have time, please take a look at this PR

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch from e16fa74 to b4da7b7 Compare March 27, 2024 11:53
@sunxiaojian
Copy link
Contributor Author

@FangYongs @schnappi17 Do we need to send an email to discuss the part of this optimization ?

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 4 times, most recently from 36c1484 to c9069e6 Compare April 15, 2024 15:13
@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 5 times, most recently from e9005ef to c98f80b Compare April 18, 2024 13:18
@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 10 times, most recently from a98a702 to ed4a620 Compare April 19, 2024 09:03
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH.key())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use options.get() will handle with the default value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the previous description was incorrect, Most of the parameters for this method come from "org.apache.paimon.table.FileStoreTable #options", 'branch' parameter may be empty.

@@ -78,61 +79,75 @@ public class SchemaManager implements Serializable {

private final FileIO fileIO;
private final Path tableRoot;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do redundant modifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@@ -104,7 +104,8 @@ public TableSchema(
this.highestFieldId = highestFieldId;
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
this.options = Collections.unmodifiableMap(options);
Objects.requireNonNull(options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this? The @param options is not marked as @NotNull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this? The @param options is not marked as @NotNull

removed

@@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() {

@Override
public SnapshotReader newSnapshotReader() {
return newSnapshotReader(DEFAULT_MAIN_BRANCH);
return newSnapshotReader(CoreOptions.branch(options()));
}

@Override
public SnapshotReader newSnapshotReader(String branchName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the branchName here is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the branchName here is not used.

Yes, I will contribute a PR and delete it uniformly

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 2 times, most recently from 01402fa to 5fb55e0 Compare April 24, 2024 09:52
@schnappi17
Copy link
Contributor

@JingsongLi Please help to take a look at this, thanks a lot!

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch from 5fb55e0 to 7714477 Compare April 25, 2024 01:59
@schnappi17
Copy link
Contributor

@FangYongs Please also help to take a look at~Thanks!

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 2 times, most recently from e7fbe3e to 734ecec Compare May 8, 2024 10:39
@@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
this.branchName = options.get(CoreOptions.BRANCH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the scope of branch option?
Catalog? Table? Why catalog need to know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the scope of branch option? Catalog? Table? Why catalog need to know?

The scope is Table, but when using getTable() or manipulating the schema in the catalog, it is necessary to specify the branch.

@@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() {

@Override
public SnapshotReader newSnapshotReader() {
return newSnapshotReader(DEFAULT_MAIN_BRANCH);
return newSnapshotReader(CoreOptions.branch(options()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use coreOptions()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use coreOptions()?

Yes, the table already specified the branch during initialization

@@ -85,55 +92,74 @@ public Path changelogDirectory() {
return new Path(tablePath + "/changelog");
}

public Path longLivedChangelogPath(long snapshotId) {
return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId);
public Path changelogDirectory(String branchName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the design? Why do need to pass branch here when there is already a branch in the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mostly opened up methods with and without branch parameters. The methods with branch parameters are mainly for BranchManager and test preparation

@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch 3 times, most recently from 363691d to 1642152 Compare May 14, 2024 11:27
@sunxiaojian sunxiaojian force-pushed the support-flink-write-branch-v2 branch from 1642152 to ecbc3bc Compare May 14, 2024 11:38
@sunxiaojian
Copy link
Contributor Author

@JingsongLi PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants