Skip to content

Commit

Permalink
[BugFix][Oracle-cdc] Supports the incremental phase of the pdb
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN committed Apr 2, 2024
1 parent cbddcbc commit de5e152
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public void execute(ChangeEventSourceContext context, OracleOffsetContext offset
// log before proceeding.
if (archiveLogOnlyMode && startScn.equals(endScn)) {
pauseBetweenMiningSessions();
dispatcher.dispatchHeartbeatEvent(offsetContext);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config;

import io.debezium.config.Configuration;

public class OracleConnectorConfig extends io.debezium.connector.oracle.OracleConnectorConfig {

public OracleConnectorConfig(Configuration config) {
super(config);
}

@Override
public String getCatalogName() {
// Do not use pdb as catalog name for incremental data, currently using cdb name as the
// catalog name.
return getDatabaseName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.relational.RelationalTableFilters;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
Expand All @@ -40,7 +41,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleErrorHandler;
import io.debezium.connector.oracle.OracleOffsetContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.scan.OracleSnapshotFetchTask;
Expand All @@ -33,7 +34,6 @@
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
Expand All @@ -55,7 +55,7 @@ public OracleRedoLogFetchTask(IncrementalSplit split) {
}

@Override
public void execute(FetchTask.Context context) throws Exception {
public void execute(Context context) throws Exception {
OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext) context;
taskRunning = true;
RedoLogSplitReadTask redoLogSplitReadTask =
Expand Down Expand Up @@ -99,6 +99,8 @@ public static class RedoLogSplitReadTask extends LogMinerStreamingChangeEventSou
private final IncrementalSplit redoLogSplit;
private final JdbcSourceEventDispatcher dispatcher;
private final ErrorHandler errorHandler;
private final OracleConnectorConfig connectorConfig;
private final OracleConnection connection;
private ChangeEventSourceContext context;

public RedoLogSplitReadTask(
Expand All @@ -122,11 +124,16 @@ public RedoLogSplitReadTask(
this.redoLogSplit = redoLogSplit;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.connection = connection;
this.connectorConfig = connectorConfig;
}

@Override
public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
this.context = context;
if (connectorConfig.getPdbName() != null) {
connection.resetSessionToCdb();
}
super.execute(context, offsetContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.heartbeat.Heartbeat;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void shutdown() {
}

@Override
public void execute(FetchTask.Context context) throws Exception {
public void execute(Context context) throws Exception {
OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext) context;
taskRunning = true;
snapshotSplitReadTask =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
Expand All @@ -34,7 +35,6 @@

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleValueConverters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleConnectorConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;

import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.OracleValueConverters;
Expand Down

0 comments on commit de5e152

Please sign in to comment.