/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource<OraclePartition, OracleOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class);
    private final OracleConnectorConfig connectorConfig;
    private final OracleConnection jdbcConnection;
    private final OracleDatabaseSchema databaseSchema;

    public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher<OraclePartition, TableId> dispatcher, Clock clock, SnapshotProgressListener<OraclePartition> snapshotProgressListener) {
        super(connectorConfig, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
        this.connectorConfig = connectorConfig;
        this.jdbcConnection = jdbcConnection;
        this.databaseSchema = schema;
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset) {
        boolean snapshotSchema = true;
        boolean snapshotData = true;
        if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
            LOGGER.info("The previous offset has been found.");
            snapshotSchema = this.databaseSchema.isStorageInitializationExecuted();
            snapshotData = false;
        } else {
            LOGGER.info("No previous offset has been found.");
            snapshotData = this.connectorConfig.getSnapshotMode().includeData();
        }
        if (snapshotData && snapshotSchema) {
            LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
        } else if (snapshotSchema) {
            LOGGER.info("According to the connector configuration only schema will be snapshot.");
        }
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(snapshotSchema, snapshotData);
    }

    @Override
    protected AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> prepare(OraclePartition partition) throws Exception {
        if (this.connectorConfig.getPdbName() != null) {
            this.jdbcConnection.setSessionToPdb(this.connectorConfig.getPdbName());
        }
        return new OracleSnapshotContext(partition, this.connectorConfig.getCatalogName());
    }

    @Override
    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx) throws Exception {
        return this.jdbcConnection.getAllTableIds(ctx.catalogName);
    }

    @Override
    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) throws SQLException, InterruptedException {
        if (this.connectorConfig.getSnapshotLockingMode().usesLocking()) {
            ((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = this.jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
            try (Statement statement = this.jdbcConnection.connection().createStatement();){
                for (TableId tableId : snapshotContext.capturedTables) {
                    if (!sourceContext.isRunning()) {
                        throw new InterruptedException("Interrupted while locking table " + tableId);
                    }
                    LOGGER.debug("Locking table {}", (Object)tableId);
                    statement.execute("LOCK TABLE " + OracleSnapshotChangeEventSource.quote(tableId) + " IN ROW SHARE MODE");
                }
            }
        } else {
            LOGGER.info("Schema locking was disabled in connector configuration");
        }
    }

    @Override
    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) throws SQLException {
        if (this.connectorConfig.getSnapshotLockingMode().usesLocking()) {
            this.jdbcConnection.connection().rollback(((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint);
        }
    }

    @Override
    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx, OracleOffsetContext previousOffset) throws Exception {
        if (previousOffset != null) {
            ctx.offset = previousOffset;
            this.tryStartingSnapshot(ctx);
            return;
        }
        ctx.offset = this.connectorConfig.getAdapter().determineSnapshotOffset(ctx, this.connectorConfig, this.jdbcConnection);
    }

    @Override
    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, OracleOffsetContext offsetContext) throws SQLException, InterruptedException {
        Set<TableId> capturedSchemaTables;
        if (this.databaseSchema.storeOnlyCapturedTables()) {
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Only captured tables schema should be captured, capturing: {}", capturedSchemaTables);
        } else {
            capturedSchemaTables = snapshotContext.capturedSchemaTables;
            LOGGER.info("All eligible tables schema should be captured, capturing: {}", capturedSchemaTables);
        }
        Set schemas = capturedSchemaTables.stream().map(TableId::schema).collect(Collectors.toSet());
        for (String schema : schemas) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + schema);
            }
            this.jdbcConnection.readSchema(snapshotContext.tables, null, schema, null, null, false);
        }
    }

    @Override
    protected String enhanceOverriddenSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, String overriddenSelect, TableId tableId) {
        String snapshotOffset = (String)((OracleOffsetContext)snapshotContext.offset).getOffset().get("scn");
        String token = this.connectorConfig.getTokenToReplaceInSnapshotPredicate();
        if (token != null) {
            return overriddenSelect.replaceAll(token, " AS OF SCN " + snapshotOffset);
        }
        return overriddenSelect;
    }

    @Override
    protected void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        this.tryStartingSnapshot(snapshotContext);
        Iterator<TableId> iterator = snapshotContext.capturedSchemaTables.iterator();
        while (iterator.hasNext()) {
            TableId tableId = iterator.next();
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while capturing schema of table " + tableId);
            }
            LOGGER.info("Capturing structure of table {}", (Object)tableId);
            Table table = snapshotContext.tables.forTable(tableId);
            if (!this.schema().isHistorized()) continue;
            ((OracleOffsetContext)snapshotContext.offset).event(tableId, this.getClock().currentTime());
            if (!snapshottingTask.snapshotData() && !iterator.hasNext()) {
                this.lastSnapshotRecord(snapshotContext);
            }
            this.dispatcher.dispatchSchemaChangeEvent((OraclePartition)snapshotContext.partition, table.id(), receiver -> {
                try {
                    receiver.schemaChangeEvent(this.getCreateTableEvent(snapshotContext, table));
                }
                catch (Exception e2) {
                    throw new DebeziumException(e2);
                }
            });
        }
    }

    @Override
    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, Table table) throws SQLException {
        return SchemaChangeEvent.ofCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table.id().schema(), this.jdbcConnection.getTableMetadataDdl(table.id()), table, true);
    }

    @Override
    protected Instant getSnapshotSourceTimestamp(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, TableId tableId) {
        try {
            Optional<OffsetDateTime> snapshotTs = this.jdbcConnection.getScnToTimestamp(((OracleOffsetContext)snapshotContext.offset).getScn());
            if (!snapshotTs.isPresent()) {
                throw new ConnectException("Failed reading SCN timestamp from source database");
            }
            return snapshotTs.get().toInstant();
        }
        catch (SQLException e2) {
            throw new ConnectException("Failed reading SCN timestamp from source database", e2);
        }
    }

    @Override
    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext, TableId tableId, List<String> columns) {
        OracleOffsetContext offset = (OracleOffsetContext)snapshotContext.offset;
        String snapshotOffset = offset.getScn().toString();
        String snapshotSelectColumns = columns.stream().collect(Collectors.joining(", "));
        assert (snapshotOffset != null);
        return Optional.of(String.format("SELECT %s FROM %s AS OF SCN %s", snapshotSelectColumns, OracleSnapshotChangeEventSource.quote(tableId), snapshotOffset));
    }

    @Override
    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {
        if (this.connectorConfig.getPdbName() != null) {
            this.jdbcConnection.resetSessionToCdb();
        }
    }

    private static String quote(TableId tableId) {
        return TableId.parse(tableId.schema() + "." + tableId.table(), true).toDoubleQuotedString();
    }

    private static class OracleSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> {
        private Savepoint preSchemaSnapshotSavepoint;

        public OracleSnapshotContext(OraclePartition partition, String catalogName) throws SQLException {
            super(partition, catalogName);
        }
    }
}

