/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.oracle.source.reader.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask;
import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleScanFetchTask
implements FetchTask<SourceSplitBase> {
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private OracleSnapshotSplitReadTask snapshotSplitReadTask;

    public OracleScanFetchTask(SnapshotSplit split) {
        this.split = split;
    }

    @Override
    public SnapshotSplit getSplit() {
        return this.split;
    }

    @Override
    public void close() {
        this.taskRunning = false;
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext)context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new OracleSnapshotSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getOffsetContext(), sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        SnapshotResult<OracleOffsetContext> snapshotResult = this.snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
        StreamSplit backfillBinlogSplit = this.createBackfillRedoLogSplit(changeEventSourceContext);
        boolean binlogBackfillRequired = backfillBinlogSplit.getEndingOffset().isAfter(backfillBinlogSplit.getStartingOffset());
        if (!binlogBackfillRequired) {
            this.dispatchBinlogEndEvent(backfillBinlogSplit, sourceFetchContext.getPartition().getSourcePartition(), ((OracleSourceFetchTaskContext)context).getDispatcher());
            this.taskRunning = false;
            return;
        }
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for oracle split %s fail", this.split));
        }
        OracleStreamFetchTask.RedoLogSplitReadTask backfillBinlogReadTask = this.createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext);
        LogMinerOracleOffsetContextLoader loader = new LogMinerOracleOffsetContextLoader(((OracleSourceFetchTaskContext)context).getDbzConnectorConfig());
        OffsetContext oracleOffsetContext = loader.load((Map)backfillBinlogSplit.getStartingOffset().getOffset());
        backfillBinlogReadTask.execute((ChangeEventSource.ChangeEventSourceContext)new SnapshotBinlogSplitChangeEventSourceContext(), sourceFetchContext.getPartition(), (OracleOffsetContext)oracleOffsetContext);
        this.taskRunning = false;
    }

    private StreamSplit createBackfillRedoLogSplit(SnapshotSplitChangeEventSourceContext sourceContext) {
        return new StreamSplit(this.split.splitId(), sourceContext.getLowWatermark(), sourceContext.getHighWatermark(), new ArrayList<FinishedSnapshotSplitInfo>(), this.split.getTableSchemas(), 0);
    }

    private OracleStreamFetchTask.RedoLogSplitReadTask createBackfillRedoLogReadTask(StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
        OracleConnectorConfig oracleConnectorConfig = context.getSourceConfig().getDbzConnectorConfig();
        LogMinerOracleOffsetContextLoader loader = new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
        Configuration dezConf = ((Configuration.Builder)context.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.split.getTableId().toString()).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build();
        return new OracleStreamFetchTask.RedoLogSplitReadTask(new OracleConnectorConfig(dezConf), OracleConnectionUtils.createOracleConnection(context.getSourceConfig().getDbzConfiguration()), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), context.getSourceConfig().getOriginDbzConnectorConfig(), context.getStreamingChangeEventSourceMetrics(), backfillBinlogSplit);
    }

    private void dispatchBinlogEndEvent(StreamSplit backFillBinlogSplit, Map<String, ?> sourcePartition, JdbcSourceEventDispatcher<OraclePartition> eventDispatcher) throws InterruptedException {
        eventDispatcher.dispatchWatermarkEvent(sourcePartition, backFillBinlogSplit, backFillBinlogSplit.getEndingOffset(), WatermarkKind.END);
    }

    public class SnapshotBinlogSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        public void finished() {
            OracleScanFetchTask.this.taskRunning = false;
        }

        @Override
        public boolean isRunning() {
            return OracleScanFetchTask.this.taskRunning;
        }
    }

    public static class SnapshotSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        private RedoLogOffset lowWatermark;
        private RedoLogOffset highWatermark;

        public RedoLogOffset getLowWatermark() {
            return this.lowWatermark;
        }

        public void setLowWatermark(RedoLogOffset lowWatermark) {
            this.lowWatermark = lowWatermark;
        }

        public RedoLogOffset getHighWatermark() {
            return this.highWatermark;
        }

        public void setHighWatermark(RedoLogOffset highWatermark) {
            this.highWatermark = highWatermark;
        }

        @Override
        public boolean isRunning() {
            return this.lowWatermark != null && this.highWatermark != null;
        }
    }

    public static class OracleSnapshotSplitReadTask
    extends AbstractSnapshotChangeEventSource<OraclePartition, OracleOffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(OracleSnapshotSplitReadTask.class);
        private static final Duration LOG_INTERVAL = Duration.ofMillis(10000L);
        private final OracleConnectorConfig connectorConfig;
        private final OracleDatabaseSchema databaseSchema;
        private final OracleConnection jdbcConnection;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
        private final Clock clock;
        private final SnapshotSplit snapshotSplit;
        private final OracleOffsetContext offsetContext;
        private final SnapshotProgressListener<OraclePartition> snapshotProgressListener;

        public OracleSnapshotSplitReadTask(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, SnapshotProgressListener<OraclePartition> snapshotProgressListener, OracleDatabaseSchema databaseSchema, OracleConnection jdbcConnection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, SnapshotSplit snapshotSplit) {
            super(connectorConfig, snapshotProgressListener);
            this.offsetContext = previousOffset;
            this.connectorConfig = connectorConfig;
            this.databaseSchema = databaseSchema;
            this.jdbcConnection = jdbcConnection;
            this.dispatcher = dispatcher;
            this.clock = Clock.SYSTEM;
            this.snapshotSplit = snapshotSplit;
            this.snapshotProgressListener = snapshotProgressListener;
        }

        @Override
        public SnapshotResult<OracleOffsetContext> execute(ChangeEventSource.ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext previousOffset) throws InterruptedException {
            AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> ctx;
            AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask = this.getSnapshottingTask(partition, previousOffset);
            try {
                ctx = this.prepare(partition);
            }
            catch (Exception e2) {
                LOG.error("Failed to initialize snapshot context.", (Throwable)e2);
                throw new RuntimeException(e2);
            }
            try {
                return this.doExecute(context, previousOffset, ctx, snapshottingTask);
            }
            catch (InterruptedException e3) {
                LOG.warn("Snapshot was interrupted before completion");
                throw e3;
            }
            catch (Exception t2) {
                throw new DebeziumException(t2);
            }
        }

        @Override
        protected SnapshotResult<OracleOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, OracleOffsetContext previousOffset, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            OracleSnapshotContext ctx = (OracleSnapshotContext)snapshotContext;
            ctx.offset = this.offsetContext;
            RedoLogOffset lowWatermark = OracleConnectionUtils.currentRedoLogOffset(this.jdbcConnection);
            LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext)context).setLowWatermark(lowWatermark);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, lowWatermark, WatermarkKind.LOW);
            LOG.info("Snapshot step 2 - Snapshotting data");
            this.createDataEvents(ctx, this.snapshotSplit.getTableId());
            RedoLogOffset highWatermark = OracleConnectionUtils.currentRedoLogOffset(this.jdbcConnection);
            LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext)context).setHighWatermark(highWatermark);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, highWatermark, WatermarkKind.HIGH);
            return SnapshotResult.completed((OracleOffsetContext)ctx.offset);
        }

        @Override
        protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        @Override
        protected AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> prepare(OraclePartition partition) throws Exception {
            return new OracleSnapshotContext(partition);
        }

        private void createDataEvents(OracleSnapshotContext snapshotContext, TableId tableId) throws Exception {
            EventDispatcher.SnapshotReceiver<OraclePartition> snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
            LOG.debug("Snapshotting table {}", (Object)tableId);
            this.createDataEventsForTable(snapshotContext, snapshotReceiver, this.databaseSchema.tableFor(tableId));
            snapshotReceiver.completeSnapshot();
        }

        private void createDataEventsForTable(OracleSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<OraclePartition> snapshotReceiver, Table table) throws InterruptedException {
            long exportStart = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", (Object)this.snapshotSplit.splitId(), (Object)table.id());
            String selectSql = OracleUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.info("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), selectSql});
            try (PreparedStatement selectStatement = OracleUtils.readTableSplitDataStatement(this.jdbcConnection, selectSql, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                 ResultSet rs = selectStatement.executeQuery();){
                ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
                long rows = 0L;
                Threads.Timer logTimer = this.getTableScanLogTimer();
                while (rs.next()) {
                    ++rows;
                    Object[] row = this.jdbcConnection.rowToArray(table, this.databaseSchema, rs, columnArray);
                    if (logTimer.expired()) {
                        long stop = this.clock.currentTimeInMillis();
                        LOG.info("Exported {} records for split '{}' after {}", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(stop - exportStart)});
                        this.snapshotProgressListener.rowsScanned((OraclePartition)snapshotContext.partition, table.id(), rows);
                        logTimer = this.getTableScanLogTimer();
                    }
                    this.dispatcher.dispatchSnapshotEvent((OraclePartition)snapshotContext.partition, table.id(), this.getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver);
                }
                LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - exportStart)});
            }
            catch (SQLException e2) {
                throw new ConnectException("Snapshotting of table " + table.id() + " failed", e2);
            }
        }

        protected ChangeRecordEmitter<OraclePartition> getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, TableId tableId, Object[] row) {
            ((OracleOffsetContext)snapshotContext.offset).event(tableId, this.clock.currentTime());
            return new SnapshotChangeRecordEmitter<OraclePartition>((OraclePartition)snapshotContext.partition, (OffsetContext)snapshotContext.offset, row, this.clock);
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        private static class OracleSnapshotContext
        extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> {
            public OracleSnapshotContext(OraclePartition partition) throws SQLException {
                super(partition, "");
            }
        }
    }
}

