Skip to content

Commit

Permalink
[Hotfix][Postgres-CDC/OpenGauss-CDC] Fix read data missing when restore
Browse files Browse the repository at this point in the history
1. The job was not added new tables into `publication` before start to snapshot read.
2. Delete of database logs did not use checkpoint offset.
  • Loading branch information
hailin0 committed May 6, 2024
1 parent 6d71069 commit cd2acad
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.cdc.base.dialect;

import org.apache.seatunnel.api.state.CheckpointListener;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

Expand All @@ -33,8 +33,7 @@
*
* @param <C> The source config of data source.
*/
public interface DataSourceDialect<C extends SourceConfig>
extends Serializable, CheckpointListener {
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {

/** Get the name of dialect. */
String getName();
Expand All @@ -57,9 +56,6 @@ public interface DataSourceDialect<C extends SourceConfig>
/**
* We have an empty default implementation here because most dialects do not have to implement
* the method.
*
* @see CheckpointListener#notifyCheckpointComplete(long)
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
default void commitChangeLogOffset(Offset offset) throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class IncrementalSourceReader<T, C extends SourceConfig>

private final DataSourceDialect<C> dataSourceDialect;

private transient volatile Offset snapshotChangeLogOffset;

private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);

public IncrementalSourceReader(
Expand Down Expand Up @@ -113,7 +116,7 @@ public void pollNext(Collector<T> output) throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
dataSourceDialect.notifyCheckpointComplete(checkpointId);
dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset);
}

@Override
Expand Down Expand Up @@ -238,7 +241,9 @@ public List<SourceSplitBase> snapshotState(long checkpointId) {
unfinishedSplits.addAll(finishedUnackedSplits.values());

if (isIncrementalSplitPhase(unfinishedSplits)) {
return snapshotCheckpointDataType(unfinishedSplits);
IncrementalSplit incrementalSplit = unfinishedSplits.get(0).asIncrementalSplit();
snapshotChangeLogOffset = incrementalSplit.getStartupOffset();
return snapshotCheckpointDataType(incrementalSplit);
}

return unfinishedSplits;
Expand All @@ -253,12 +258,7 @@ private boolean isIncrementalSplitPhase(List<SourceSplitBase> stateSplits) {
return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit();
}

private List<SourceSplitBase> snapshotCheckpointDataType(List<SourceSplitBase> stateSplits) {
if (!isIncrementalSplitPhase(stateSplits)) {
throw new IllegalStateException(
"The splits should be incremental split when snapshot checkpoint datatype");
}
IncrementalSplit incrementalSplit = stateSplits.get(0).asIncrementalSplit();
private List<SourceSplitBase> snapshotCheckpointDataType(IncrementalSplit incrementalSplit) {
// Snapshot current datatype to checkpoint
SeaTunnelDataType<T> checkpointDataType = debeziumDeserializationSchema.getProducedType();
IncrementalSplit newIncrementalSplit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
Expand All @@ -33,6 +34,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.enumerator.PostgresChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask;
Expand Down Expand Up @@ -161,9 +163,9 @@ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBas
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void commitChangeLogOffset(Offset offset) throws Exception {
if (postgresWalFetchTask != null) {
postgresWalFetchTask.commitCurrentOffset();
postgresWalFetchTask.commitCurrentOffset((LsnOffset) offset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -180,28 +179,31 @@ public void configure(SourceSplitBase sourceSplitBase) {
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}

SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
final boolean doSnapshot = snapshotter.shouldSnapshot();
createReplicationConnection(
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
if (slotInfo == null) {
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext,
snapshotter.shouldSnapshot(),
connectorConfig.maxRetries(),
connectorConfig.retryDelay());
try {
slotCreatedInfo =
replicationConnection.createReplicationSlot().orElse(null);
// create the slot if it doesn't exist, otherwise update slot to add new
// table(job restore and add table)
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
throw new DebeziumException(message, ex);
}
} else {
slotCreatedInfo = null;
}
}

Expand Down Expand Up @@ -265,20 +267,6 @@ private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
engineHistory);
}

public void createReplicationConnection(
boolean doSnapshot, int maxRetries, Duration retryDelay) {
if (this.replicationConnection != null) {
return;
}
synchronized (this) {
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext, doSnapshot, maxRetries, retryDelay);
}
}
}

@Override
public PostgresSourceConfig getSourceConfig() {
return (PostgresSourceConfig) sourceConfig;
Expand Down Expand Up @@ -350,7 +338,9 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
public void close() {
try {
this.dataConnection.close();
this.replicationConnection.close();
if (this.replicationConnection != null) {
this.replicationConnection.close();
}
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;

import io.debezium.connector.postgresql.PostgresOffsetContext;
Expand Down Expand Up @@ -73,12 +74,11 @@ public void execute(FetchTask.Context context) throws Exception {
streamingChangeEventSource.execute(changeEventSourceContext, offsetContext);
}

public void commitCurrentOffset() {
if (streamingChangeEventSource != null && offsetContext != null) {
public void commitCurrentOffset(LsnOffset offset) {
if (streamingChangeEventSource != null && offset != null) {

// only extracting and storing the lsn of the last commit
Long commitLsn =
(Long) offsetContext.getOffset().get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
Long commitLsn = offset.getLsn().asLong();
if (commitLsn != null
&& (lastCommitLsn == null
|| Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) {
Expand Down

0 comments on commit cd2acad

Please sign in to comment.