Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch procedure #3086

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
21 changes: 21 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -158,6 +159,12 @@ default void deleteQuietly(Path file) {
}
}

default void deleteFilesQuietly(List<Path> files) {
for (Path file : files) {
deleteQuietly(file);
}
}

default void deleteDirectoryQuietly(Path directory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete " + directory.toString());
Expand Down Expand Up @@ -255,6 +262,20 @@ default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOExceptio
return writeFileUtf8(targetPath, content);
}

/** Copy all files in sourceDirectory to directory targetDirectory. */
default void copyFilesUtf8(Path sourceDirectory, Path targetDirectory) throws IOException {
FileStatus[] fileStatuses = listStatus(sourceDirectory);
List<Path> copyFiles =
Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath())
.collect(Collectors.toList());
for (Path file : copyFiles) {
String fileName = file.getName();
Path targetPath = new Path(targetDirectory.toString() + "/" + fileName);
copyFileUtf8(file, targetPath);
}
}

/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
int retryNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
}
}

private Path schemaDirectory() {
public Path schemaDirectory() {
return new Path(tableRoot + "/schema");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.schema;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
Expand All @@ -26,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -289,6 +292,15 @@ public static TableSchema fromJson(String json) {
return JsonSerdeUtil.fromJson(json, TableSchema.class);
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return TableSchema.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read schema from path " + path, e);
}
}

@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void mergeBranch(String branchName) {
branchManager().mergeBranch(branchName);
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void mergeBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mergeBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

/** Merge a branch to main branch. */
@Experimental
void mergeBranch(String branchName);

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

Expand All @@ -34,6 +36,7 @@
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -142,6 +145,74 @@ public boolean fileExists(Path path) {
}
}

public void mergeBranch(String branchName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
"Branch name '%s' do not use in merge branch.",
branchName);
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);
Long earliestSnapshotId = snapshotManager.earliestSnapshotId(branchName);
Snapshot earliestSnapshot = snapshotManager.snapshot(branchName, earliestSnapshotId);
long earliestSchemaId = earliestSnapshot.schemaId();

try {
// Delete snapshot, schema, and tag from the main branch which occurs after
// earliestSnapshotId
List<Path> deleteSnapshotPaths =
listVersionedFileStatus(
fileIO, snapshotManager.snapshotDirectory(), "snapshot-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
List<Path> deleteSchemaPaths =
listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-")
.map(FileStatus::getPath)
.filter(
path ->
TableSchema.fromPath(fileIO, path).id()
>= earliestSchemaId)
.collect(Collectors.toList());
List<Path> deleteTagPaths =
listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());

List<Path> deletePaths =
Stream.concat(
Stream.concat(
deleteSnapshotPaths.stream(),
deleteSchemaPaths.stream()),
deleteTagPaths.stream())
.collect(Collectors.toList());

// Delete latest snapshot
snapshotManager.deleteLatestHint();

fileIO.deleteFilesQuietly(deletePaths);
fileIO.copyFilesUtf8(
snapshotManager.branchSnapshotDirectory(branchName),
snapshotManager.snapshotDirectory());
fileIO.copyFilesUtf8(
schemaManager.branchSchemaDirectory(branchName),
schemaManager.schemaDirectory());
fileIO.copyFilesUtf8(
tagManager.branchTagDirectory(branchName), tagManager.tagDirectory());
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when merge branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

/** Check if a branch exists. */
public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,16 @@ private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
.orElse(null);
}

public void deleteLatestHint() throws IOException {
deleteLatestHint(DEFAULT_MAIN_BRANCH);
}

public void deleteLatestHint(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path hintFile = new Path(snapshotDir, LATEST);
fileIO.delete(hintFile, false);
}

public void commitLatestHint(long snapshotId) throws IOException {
commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public Path tagPath(String tagName) {
return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
}

/** Return the path of tag directory in branch. */
public Path branchTagDirectory(String branchName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag");
}

/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,134 @@ public void testDeleteBranch() throws Exception {
"Branch name 'branch1' doesn't exist."));
}

@Test
public void testMergeBranch() throws Exception {
FileStoreTable table = createFileStoreTable();

generateBranch(table);

// Verify branch1 and the main branch have the same data
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");

// Test for unsupported branch name
assertThatThrownBy(() -> table.mergeBranch("test-branch"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'test-branch' doesn't exist."));

assertThatThrownBy(() -> table.mergeBranch("main"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'main' do not use in merge branch."));

// Write data to branch1
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
}

// Validate data in branch1
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// Validate data in main branch not changed
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");

// Merge branch1 to main branch
table.mergeBranch("branch1");

// After merge branch1, verify branch1 and the main branch have the same data
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// verify snapshot in branch1 and main branch is same
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(), snapshotManager.branchSnapshotPath("branch1", 2));
Snapshot snapshot =
Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2));
assertThat(branchSnapshot.equals(snapshot)).isTrue();

// verify schema in branch1 and main branch is same
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
new TraceableFileIO(), schemaManager.branchSchemaPath("branch1", 0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();

// Write two rows data to branch1 again
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
write.write(rowData(3, 30, 300L));
write.write(rowData(4, 40, 400L));
commit.commit(2, write.prepareCommit(false, 3));
}

// Verify data in branch1
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
"3|30|300|binary|varbinary|mapKey:mapVal|multiset",
"4|40|400|binary|varbinary|mapKey:mapVal|multiset");

// Verify data in main branch not changed
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// Merge branch1 to main branch again
table.mergeBranch("branch1");

// Verify data in main branch is same to branch1
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
"3|30|300|binary|varbinary|mapKey:mapVal|multiset",
"4|40|400|binary|varbinary|mapKey:mapVal|multiset");
}

@Test
public void testUnsupportedTagName() throws Exception {
FileStoreTable table = createFileStoreTable();
Expand Down Expand Up @@ -1553,7 +1681,6 @@ protected void generateBranch(FileStoreTable table) throws Exception {
table.createBranch(BRANCH_NAME, "tag1");

// verify that branch1 file exist
TraceableFileIO fileIO = new TraceableFileIO();
BranchManager branchManager = table.branchManager();
assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();

Expand Down