Skip to content

Commit

Permalink
feat: Filter out invalid splits and commit messages to improve flink …
Browse files Browse the repository at this point in the history
…database compaction efficiency
  • Loading branch information
zhourui999 committed Apr 21, 2024
1 parent e27ceb4 commit 6c75f2c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -181,6 +182,17 @@ protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long
StoreSinkWrite write = entry.getValue();
committables.addAll(
write.prepareCommit(waitCompaction, checkpointId).stream()
.filter(
committable -> {
if (committable.kind() == Committable.Kind.FILE) {
CommitMessageImpl commitMessage =
(CommitMessageImpl)
committable.wrappedCommittable();
return !commitMessage.newFilesIncrement().isEmpty()
&& !commitMessage.compactIncrement().isEmpty();
}
return true;
})
.map(
committable ->
MultiTableCommittable.fromCommittable(key, committable))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;

Expand Down Expand Up @@ -81,8 +83,19 @@ public void run(SourceContext<Tuple2<Split, String>> ctx) throws Exception {
for (Map.Entry<Identifier, StreamTableScan> entry : scansMap.entrySet()) {
Identifier identifier = entry.getKey();
StreamTableScan scan = entry.getValue();
int maxLevel = ((InnerStreamTableScanImpl) scan).options().numLevels() - 1;
splits.addAll(
scan.plan().splits().stream()
.filter(
split -> {
DataSplit dataSplit = (DataSplit) split;
if (dataSplit.dataFiles().isEmpty()) {
return false;
}
return dataSplit.dataFiles().stream()
.map(DataFileMeta::level)
.anyMatch(level -> level != maxLevel);
})
.map(split -> new Tuple2<>(split, identifier.getFullName()))
.collect(Collectors.toList()));
}
Expand Down

0 comments on commit 6c75f2c

Please sign in to comment.