Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Postgres-CDC] Fix read data missing when restore #6785

Merged
merged 1 commit into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.cdc.base.dialect;

import org.apache.seatunnel.api.state.CheckpointListener;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

Expand All @@ -33,8 +33,7 @@
*
* @param <C> The source config of data source.
*/
public interface DataSourceDialect<C extends SourceConfig>
extends Serializable, CheckpointListener {
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {

/** Get the name of dialect. */
String getName();
Expand All @@ -57,9 +56,6 @@ public interface DataSourceDialect<C extends SourceConfig>
/**
* We have an empty default implementation here because most dialects do not have to implement
* the method.
*
* @see CheckpointListener#notifyCheckpointComplete(long)
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
default void commitChangeLogOffset(Offset offset) throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class IncrementalSourceReader<T, C extends SourceConfig>

private final DataSourceDialect<C> dataSourceDialect;

private transient volatile Offset snapshotChangeLogOffset;

private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);

public IncrementalSourceReader(
Expand Down Expand Up @@ -113,7 +116,7 @@ public void pollNext(Collector<T> output) throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
dataSourceDialect.notifyCheckpointComplete(checkpointId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset);
}

@Override
Expand Down Expand Up @@ -238,7 +241,9 @@ public List<SourceSplitBase> snapshotState(long checkpointId) {
unfinishedSplits.addAll(finishedUnackedSplits.values());

if (isIncrementalSplitPhase(unfinishedSplits)) {
return snapshotCheckpointDataType(unfinishedSplits);
IncrementalSplit incrementalSplit = unfinishedSplits.get(0).asIncrementalSplit();
snapshotChangeLogOffset = incrementalSplit.getStartupOffset();
return snapshotCheckpointDataType(incrementalSplit);
}

return unfinishedSplits;
Expand All @@ -253,12 +258,7 @@ private boolean isIncrementalSplitPhase(List<SourceSplitBase> stateSplits) {
return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit();
}

private List<SourceSplitBase> snapshotCheckpointDataType(List<SourceSplitBase> stateSplits) {
if (!isIncrementalSplitPhase(stateSplits)) {
throw new IllegalStateException(
"The splits should be incremental split when snapshot checkpoint datatype");
}
IncrementalSplit incrementalSplit = stateSplits.get(0).asIncrementalSplit();
private List<SourceSplitBase> snapshotCheckpointDataType(IncrementalSplit incrementalSplit) {
// Snapshot current datatype to checkpoint
SeaTunnelDataType<T> checkpointDataType = debeziumDeserializationSchema.getProducedType();
IncrementalSplit newIncrementalSplit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
Expand All @@ -33,6 +34,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.enumerator.PostgresChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask;
Expand Down Expand Up @@ -161,9 +163,9 @@ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBas
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void commitChangeLogOffset(Offset offset) throws Exception {
if (postgresWalFetchTask != null) {
postgresWalFetchTask.commitCurrentOffset();
postgresWalFetchTask.commitCurrentOffset((LsnOffset) offset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -180,28 +179,31 @@ public void configure(SourceSplitBase sourceSplitBase) {
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}

SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
final boolean doSnapshot = snapshotter.shouldSnapshot();
createReplicationConnection(
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
if (slotInfo == null) {
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext,
snapshotter.shouldSnapshot(),
connectorConfig.maxRetries(),
connectorConfig.retryDelay());
try {
slotCreatedInfo =
replicationConnection.createReplicationSlot().orElse(null);
// create the slot if it doesn't exist, otherwise update slot to add new
// table(job restore and add table)
Comment on lines +194 to +195
Copy link
Contributor Author

@hailin0 hailin0 Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
throw new DebeziumException(message, ex);
}
} else {
slotCreatedInfo = null;
}
}

Expand Down Expand Up @@ -265,20 +267,6 @@ private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
engineHistory);
}

public void createReplicationConnection(
boolean doSnapshot, int maxRetries, Duration retryDelay) {
if (this.replicationConnection != null) {
return;
}
synchronized (this) {
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext, doSnapshot, maxRetries, retryDelay);
}
}
}

@Override
public PostgresSourceConfig getSourceConfig() {
return (PostgresSourceConfig) sourceConfig;
Expand Down Expand Up @@ -350,7 +338,9 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
public void close() {
try {
this.dataConnection.close();
this.replicationConnection.close();
if (this.replicationConnection != null) {
this.replicationConnection.close();
}
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;

import io.debezium.connector.postgresql.PostgresOffsetContext;
Expand Down Expand Up @@ -73,12 +74,11 @@ public void execute(FetchTask.Context context) throws Exception {
streamingChangeEventSource.execute(changeEventSourceContext, offsetContext);
}

public void commitCurrentOffset() {
if (streamingChangeEventSource != null && offsetContext != null) {
public void commitCurrentOffset(LsnOffset offset) {
if (streamingChangeEventSource != null && offset != null) {

// only extracting and storing the lsn of the last commit
Long commitLsn =
(Long) offsetContext.getOffset().get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
Long commitLsn = offset.getLsn().asLong();
if (commitLsn != null
&& (lastCommitLsn == null
|| Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) {
Expand Down