Skip to content

Commit

Permalink
[Hotfix][Postgres-CDC] Fix read(new table) data missing when restore …
Browse files Browse the repository at this point in the history
…& add new tables

Prerequisites for triggering issues:
1. restore & add new tables
2. new data is written(new table) in the gap between snapshot switching to stream
  • Loading branch information
hailin0 committed Apr 30, 2024
1 parent 6d71069 commit 44f3804
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -180,28 +179,36 @@ public void configure(SourceSplitBase sourceSplitBase) {
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}

SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
final boolean doSnapshot = snapshotter.shouldSnapshot();
createReplicationConnection(
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
if (slotInfo == null) {
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext,
snapshotter.shouldSnapshot(),
connectorConfig.maxRetries(),
connectorConfig.retryDelay());
try {
slotCreatedInfo =
replicationConnection.createReplicationSlot().orElse(null);
// create the slot if it doesn't exist, otherwise update slot to add new
// table(job restore and add table)
replicationConnection.createReplicationSlot().orElse(null);
log.info(
"Updated replication slot: {}, publication: {}, tables: {}",
connectorConfig.slotName(),
connectorConfig.publicationName(),
connectorConfig.tableIncludeList());
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
throw new DebeziumException(message, ex);
}
} else {
slotCreatedInfo = null;
}
}

Expand Down Expand Up @@ -350,7 +357,9 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
public void close() {
try {
this.dataConnection.close();
this.replicationConnection.close();
if (this.replicationConnection != null) {
this.replicationConnection.close();
}
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
Expand Down

0 comments on commit 44f3804

Please sign in to comment.