Skip to content

Commit

Permalink
[core] Fix SST files written using SstFileWriter don't work with TtlDB
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Apr 30, 2024
1 parent 7b378dc commit 654d8d1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
19 changes: 19 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
import org.rocksdb.TtlDB;

import java.io.File;
import java.util.ArrayList;
Expand All @@ -39,18 +40,22 @@ public class BulkLoader {
private final ColumnFamilyHandle columnFamily;
private final String path;
private final RocksDB db;
private final boolean isTtlEnabled;
private final Options options;
private final List<String> files = new ArrayList<>();
private final int currentTimeSeconds;

private SstFileWriter writer = null;
private int sstIndex = 0;
private long recordNum = 0;

public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle columnFamily, String path) {
this.db = db;
this.isTtlEnabled = db instanceof TtlDB;
this.options = options;
this.columnFamily = columnFamily;
this.path = path;
this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
}

public void write(byte[] key, byte[] value) throws WriteException {
Expand All @@ -62,6 +67,10 @@ public void write(byte[] key, byte[] value) throws WriteException {
files.add(path);
}

if (isTtlEnabled) {
value = appendTimestamp(value);
}

try {
writer.put(key, value);
} catch (RocksDBException e) {
Expand All @@ -80,6 +89,16 @@ public void write(byte[] key, byte[] value) throws WriteException {
}
}

private byte[] appendTimestamp(byte[] value) {
byte[] newValue = new byte[value.length + 4];
System.arraycopy(value, 0, newValue, 0, value.length);
newValue[value.length] = (byte) (currentTimeSeconds & 0xff);
newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff);
newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) & 0xff);
newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) & 0xff);
return newValue;
}

public void finish() {
try {
if (writer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,27 @@ public void testBootstrapRecords() throws Exception {
output.clear();
assigner.close();
}

@Test
public void testBootstrapWithTTL() throws Exception {
// enableTtl is true
GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, true);
List<List<Integer>> output = new ArrayList<>();
assigner.open(
0,
ioManager(),
2,
0,
(row, bucket) ->
output.add(
Arrays.asList(
row.getInt(0), row.getInt(1), row.getInt(2), bucket)));

// assigner.bootstrapKey can trigger the problem
assigner.bootstrapKey(GenericRow.of(1, 1, 1));
assigner.processInput(GenericRow.of(1, 1, 1));
assigner.endBoostrap(true);

assertThat(output).containsExactlyInAnyOrder(Arrays.asList(1, 1, 1, 1));
}
}

0 comments on commit 654d8d1

Please sign in to comment.