package com.ververica.cdc.connectors.oracle.source.reader.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask;
import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.class */
public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private OracleSnapshotSplitReadTask snapshotSplitReadTask;

    /* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask$OracleSnapshotSplitReadTask.class */
    public static class OracleSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource<OraclePartition, OracleOffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(OracleSnapshotSplitReadTask.class);
        private static final Duration LOG_INTERVAL = Duration.ofMillis(10000);
        private final OracleConnectorConfig connectorConfig;
        private final OracleDatabaseSchema databaseSchema;
        private final OracleConnection jdbcConnection;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
        private final Clock clock;
        private final SnapshotSplit snapshotSplit;
        private final OracleOffsetContext offsetContext;
        private final SnapshotProgressListener<OraclePartition> snapshotProgressListener;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask$OracleSnapshotSplitReadTask$OracleSnapshotContext.class */
        public static class OracleSnapshotContext extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> {
            public OracleSnapshotContext(OraclePartition oraclePartition) throws SQLException {
                super(oraclePartition, "");
            }
        }

        public OracleSnapshotSplitReadTask(OracleConnectorConfig oracleConnectorConfig, OracleOffsetContext oracleOffsetContext, SnapshotProgressListener<OraclePartition> snapshotProgressListener, OracleDatabaseSchema oracleDatabaseSchema, OracleConnection oracleConnection, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher, SnapshotSplit snapshotSplit) {
            super(oracleConnectorConfig, snapshotProgressListener);
            this.offsetContext = oracleOffsetContext;
            this.connectorConfig = oracleConnectorConfig;
            this.databaseSchema = oracleDatabaseSchema;
            this.jdbcConnection = oracleConnection;
            this.dispatcher = jdbcSourceEventDispatcher;
            this.clock = Clock.SYSTEM;
            this.snapshotSplit = snapshotSplit;
            this.snapshotProgressListener = snapshotProgressListener;
        }

        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource, io.debezium.pipeline.source.spi.SnapshotChangeEventSource
        public SnapshotResult<OracleOffsetContext> execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) throws InterruptedException {
            try {
                try {
                    return doExecute2(changeEventSourceContext, oracleOffsetContext, (AbstractSnapshotChangeEventSource.SnapshotContext) prepare(oraclePartition), getSnapshottingTask(oraclePartition, oracleOffsetContext));
                } catch (InterruptedException e) {
                    LOG.warn("Snapshot was interrupted before completion");
                    throw e;
                } catch (Exception e2) {
                    throw new DebeziumException(e2);
                }
            } catch (Exception e3) {
                LOG.error("Failed to initialize snapshot context.", e3);
                throw new RuntimeException(e3);
            }
        }

        /* renamed from: doExecute, reason: avoid collision after fix types in other method */
        protected SnapshotResult<OracleOffsetContext> doExecute2(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleOffsetContext oracleOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            OracleSnapshotContext oracleSnapshotContext = (OracleSnapshotContext) snapshotContext;
            oracleSnapshotContext.offset = this.offsetContext;
            RedoLogOffset currentRedoLogOffset = OracleConnectionUtils.currentRedoLogOffset(this.jdbcConnection);
            LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", currentRedoLogOffset, this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setLowWatermark(currentRedoLogOffset);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, currentRedoLogOffset, WatermarkKind.LOW);
            LOG.info("Snapshot step 2 - Snapshotting data");
            createDataEvents(oracleSnapshotContext, this.snapshotSplit.getTableId());
            RedoLogOffset currentRedoLogOffset2 = OracleConnectionUtils.currentRedoLogOffset(this.jdbcConnection);
            LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", currentRedoLogOffset2, this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setHighWatermark(currentRedoLogOffset2);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, currentRedoLogOffset2, WatermarkKind.HIGH);
            return SnapshotResult.completed((OracleOffsetContext) oracleSnapshotContext.offset);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> prepare(OraclePartition oraclePartition) throws Exception {
            return new OracleSnapshotContext(oraclePartition);
        }

        private void createDataEvents(OracleSnapshotContext oracleSnapshotContext, TableId tableId) throws Exception {
            EventDispatcher.SnapshotReceiver<OraclePartition> snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
            LOG.debug("Snapshotting table {}", tableId);
            createDataEventsForTable(oracleSnapshotContext, snapshotChangeEventReceiver, this.databaseSchema.tableFor(tableId));
            snapshotChangeEventReceiver.completeSnapshot();
        }

        private void createDataEventsForTable(OracleSnapshotContext oracleSnapshotContext, EventDispatcher.SnapshotReceiver<OraclePartition> snapshotReceiver, Table table) throws InterruptedException {
            long currentTimeInMillis = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", this.snapshotSplit.splitId(), table.id());
            String buildSplitScanQuery = OracleUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.info("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), buildSplitScanQuery});
            try {
                PreparedStatement readTableSplitDataStatement = OracleUtils.readTableSplitDataStatement(this.jdbcConnection, buildSplitScanQuery, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                try {
                    ResultSet executeQuery = readTableSplitDataStatement.executeQuery();
                    try {
                        ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                        long j = 0;
                        Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                        while (executeQuery.next()) {
                            j++;
                            Object[] rowToArray = this.jdbcConnection.rowToArray(table, this.databaseSchema, executeQuery, array);
                            if (tableScanLogTimer.expired()) {
                                LOG.info("Exported {} records for split '{}' after {}", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                                this.snapshotProgressListener.rowsScanned((OraclePartition) oracleSnapshotContext.partition, table.id(), j);
                                tableScanLogTimer = getTableScanLogTimer();
                            }
                            this.dispatcher.dispatchSnapshotEvent((OraclePartition) oracleSnapshotContext.partition, table.id(), getChangeRecordEmitter(oracleSnapshotContext, table.id(), rowToArray), snapshotReceiver);
                        }
                        LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                        if (executeQuery != null) {
                            executeQuery.close();
                        }
                        if (readTableSplitDataStatement != null) {
                            readTableSplitDataStatement.close();
                        }
                    } catch (Throwable th) {
                        if (executeQuery != null) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
            }
        }

        protected ChangeRecordEmitter<OraclePartition> getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, TableId tableId, Object[] objArr) {
            snapshotContext.offset.event(tableId, this.clock.currentTime());
            return new SnapshotChangeRecordEmitter(snapshotContext.partition, snapshotContext.offset, objArr, this.clock);
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        protected /* bridge */ /* synthetic */ SnapshotResult<OracleOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleOffsetContext oracleOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            return doExecute2(changeEventSourceContext, oracleOffsetContext, (AbstractSnapshotChangeEventSource.SnapshotContext) snapshotContext, snapshottingTask);
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask$SnapshotBinlogSplitChangeEventSourceContext.class */
    public class SnapshotBinlogSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
        public SnapshotBinlogSplitChangeEventSourceContext() {
        }

        public void finished() {
            OracleScanFetchTask.this.taskRunning = false;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return OracleScanFetchTask.this.taskRunning;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask$SnapshotSplitChangeEventSourceContext.class */
    public static class SnapshotSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
        private RedoLogOffset lowWatermark;
        private RedoLogOffset highWatermark;

        public RedoLogOffset getLowWatermark() {
            return this.lowWatermark;
        }

        public void setLowWatermark(RedoLogOffset redoLogOffset) {
            this.lowWatermark = redoLogOffset;
        }

        public RedoLogOffset getHighWatermark() {
            return this.highWatermark;
        }

        public void setHighWatermark(RedoLogOffset redoLogOffset) {
            this.highWatermark = redoLogOffset;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return (this.lowWatermark == null || this.highWatermark == null) ? false : true;
        }
    }

    public OracleScanFetchTask(SnapshotSplit snapshotSplit) {
        this.split = snapshotSplit;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public SourceSplitBase getSplit2() {
        return this.split;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public void close() {
        this.taskRunning = false;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        OracleSourceFetchTaskContext oracleSourceFetchTaskContext = (OracleSourceFetchTaskContext) context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new OracleSnapshotSplitReadTask(oracleSourceFetchTaskContext.getDbzConnectorConfig(), oracleSourceFetchTaskContext.getOffsetContext(), oracleSourceFetchTaskContext.getSnapshotChangeEventSourceMetrics(), oracleSourceFetchTaskContext.getDatabaseSchema(), oracleSourceFetchTaskContext.getConnection(), oracleSourceFetchTaskContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        SnapshotResult<OracleOffsetContext> execute = this.snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext) snapshotSplitChangeEventSourceContext, oracleSourceFetchTaskContext.getPartition(), oracleSourceFetchTaskContext.getOffsetContext());
        StreamSplit createBackfillRedoLogSplit = createBackfillRedoLogSplit(snapshotSplitChangeEventSourceContext);
        if (!createBackfillRedoLogSplit.getEndingOffset().isAfter(createBackfillRedoLogSplit.getStartingOffset())) {
            dispatchBinlogEndEvent(createBackfillRedoLogSplit, oracleSourceFetchTaskContext.getPartition().getSourcePartition(), ((OracleSourceFetchTaskContext) context).getDispatcher());
            this.taskRunning = false;
        } else {
            if (!execute.isCompletedOrSkipped()) {
                this.taskRunning = false;
                throw new IllegalStateException(String.format("Read snapshot for oracle split %s fail", this.split));
            }
            createBackfillRedoLogReadTask(createBackfillRedoLogSplit, oracleSourceFetchTaskContext).execute((ChangeEventSource.ChangeEventSourceContext) new SnapshotBinlogSplitChangeEventSourceContext(), oracleSourceFetchTaskContext.getPartition(), new LogMinerOracleOffsetContextLoader(((OracleSourceFetchTaskContext) context).getDbzConnectorConfig()).load((Map<String, ?>) createBackfillRedoLogSplit.getStartingOffset().getOffset()));
            this.taskRunning = false;
        }
    }

    private StreamSplit createBackfillRedoLogSplit(SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext) {
        return new StreamSplit(this.split.splitId(), snapshotSplitChangeEventSourceContext.getLowWatermark(), snapshotSplitChangeEventSourceContext.getHighWatermark(), new ArrayList(), this.split.getTableSchemas(), 0);
    }

    private OracleStreamFetchTask.RedoLogSplitReadTask createBackfillRedoLogReadTask(StreamSplit streamSplit, OracleSourceFetchTaskContext oracleSourceFetchTaskContext) {
        return new OracleStreamFetchTask.RedoLogSplitReadTask(new OracleConnectorConfig(oracleSourceFetchTaskContext.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.split.getTableId().toString()).with(Heartbeat.HEARTBEAT_INTERVAL, 0).build()), oracleSourceFetchTaskContext.getConnection(), oracleSourceFetchTaskContext.getDispatcher(), oracleSourceFetchTaskContext.getErrorHandler(), oracleSourceFetchTaskContext.getDatabaseSchema(), oracleSourceFetchTaskContext.getSourceConfig().getOriginDbzConnectorConfig(), oracleSourceFetchTaskContext.getStreamingChangeEventSourceMetrics(), streamSplit);
    }

    private void dispatchBinlogEndEvent(StreamSplit streamSplit, Map<String, ?> map, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher) throws InterruptedException {
        jdbcSourceEventDispatcher.dispatchWatermarkEvent(map, streamSplit, streamSplit.getEndingOffset(), WatermarkKind.END);
    }
}
