Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Fix Multibyte Character Encoding in MultiTableCommittable Serialization #3028

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* {@link SimpleVersionedSerializer} for {@link MultiTableCommittable}. If a type info class is
Expand Down Expand Up @@ -53,9 +54,9 @@ public int getVersion() {
public byte[] serialize(MultiTableCommittable committable) throws IOException {
// first serialize all metadata
String database = committable.getDatabase();
int databaseLen = database.length();
int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
String table = committable.getTable();
int tableLen = table.length();
int tableLen = table.getBytes(StandardCharsets.UTF_8).length;

int multiTableMetaLen = databaseLen + tableLen + 2 * 4;

Expand Down Expand Up @@ -83,17 +84,18 @@ public MultiTableCommittable deserialize(int committableVersion, byte[] bytes)
int databaseLen = buffer.getInt();
byte[] databaseBytes = new byte[databaseLen];
buffer.get(databaseBytes, 0, databaseLen);
String database = new String(databaseBytes);
String database = new String(databaseBytes,StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

database.getBytes() should use database.getBytes(UTF_8) too.


int tableLen = buffer.getInt();
byte[] tableBytes = new byte[tableLen];
buffer.get(tableBytes, 0, tableLen);
String table = new String(tableBytes);
int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
String table = new String(tableBytes,StandardCharsets.UTF_8);
int multiTableMetaLen = 4 + databaseLen + 4 + tableLen;

// use committable serializer (of the same version) to deserialize committable
byte[] serializedCommittable = new byte[bytes.length - multiTableMetaLen];

buffer.get(serializedCommittable, 0, bytes.length - multiTableMetaLen);
buffer.get(serializedCommittable, 0, serializedCommittable.length);
Committable committable = deserializeCommittable(committableVersion, serializedCommittable);

return MultiTableCommittable.fromCommittable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.NewFilesIncrement;
Expand All @@ -28,11 +30,14 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.function.Consumer;

import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;

class MultiTableCommittableSerializerTest {
private final CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
Expand All @@ -41,23 +46,73 @@ class MultiTableCommittableSerializerTest {
new MultiTableCommittableSerializer(fileSerializer);

@Test
public void testFileMetadata() throws IOException {
public void testDeserialize() throws IOException {
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, newFilesIncrement, compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage);
String database = "database";
String table = "table";
MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);
MultiTableCommittable deserializeCommittable =
serializer.deserialize(2, serializer.serialize(multiTableCommittable));

assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class);

assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
assertThat(deserializeCommittable.getTable()).isEqualTo(table);

Lists.newArrayList(Tuple2.of("测试数据库","用户信息表"),Tuple2.of("database","table")).stream().forEach(new Consumer<Tuple2<String, String>>() {
@Override
public void accept(Tuple2<String, String> stringStringTuple2) {
String database = stringStringTuple2.f0;
String table = stringStringTuple2.f1;
MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);
MultiTableCommittable deserializeCommittable =
null;
try {
deserializeCommittable = serializer.deserialize(2, serializer.serialize(multiTableCommittable));
} catch (IOException e) {
throw new RuntimeException(e);
}

assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class);

assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
}
});

}
@Test
public void testSerialize() throws IOException {
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, newFilesIncrement, compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage);


Lists.newArrayList(Tuple2.of("测试数据库","用户信息表"),Tuple2.of("database","table")).stream().forEach(new Consumer<Tuple2<String, String>>() {
@Override
public void accept(Tuple2<String, String> stringStringTuple2) {
String database = stringStringTuple2.f0;
String table = stringStringTuple2.f1;

MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);

byte[] serializedData = null;
try {
serializedData = serializer.serialize(multiTableCommittable);
} catch (BufferOverflowException e) {
e.printStackTrace();
assert false : "Should not throw BufferOverflowException";
} catch (IOException e) {
e.printStackTrace();
assert false : "IOException occurred";
}

assertNotNull("The serialized data should not be null.", serializedData);

}
});



}
}