package com.ververica.cdc.connectors.postgres.source.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.Utils;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Objects;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.class */
public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresScanFetchTask.class);
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$PostgresChangeEventSourceContext.class */
    public class PostgresChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
        PostgresChangeEventSourceContext() {
        }

        public void finished() {
            PostgresScanFetchTask.this.taskRunning = false;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return PostgresScanFetchTask.this.taskRunning;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$PostgresSnapshotSplitReadTask.class */
    public static class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
        private final PostgresConnection jdbcConnection;
        private final PostgresReplicationConnection replicationConnection;
        private final PostgresConnectorConfig connectorConfig;
        private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
        private final SnapshotSplit snapshotSplit;
        private final PostgresOffsetContext offsetContext;
        private final PostgresSchema databaseSchema;
        private final SnapshotProgressListener<PostgresPartition> snapshotProgressListener;
        private final Clock clock;
        private final String slotName;
        private final String pluginName;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$PostgresSnapshotSplitReadTask$PostgresSnapshotContext.class */
        public static class PostgresSnapshotContext extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
            public PostgresSnapshotContext(PostgresPartition postgresPartition) throws SQLException {
                super(postgresPartition, "");
            }
        }

        public PostgresSnapshotSplitReadTask(PostgresConnection postgresConnection, PostgresReplicationConnection postgresReplicationConnection, PostgresConnectorConfig postgresConnectorConfig, PostgresSchema postgresSchema, PostgresOffsetContext postgresOffsetContext, JdbcSourceEventDispatcher jdbcSourceEventDispatcher, SnapshotProgressListener snapshotProgressListener, SnapshotSplit snapshotSplit, String str, String str2) {
            super(postgresConnectorConfig, snapshotProgressListener);
            this.jdbcConnection = postgresConnection;
            this.replicationConnection = postgresReplicationConnection;
            this.connectorConfig = postgresConnectorConfig;
            this.snapshotProgressListener = snapshotProgressListener;
            this.databaseSchema = postgresSchema;
            this.dispatcher = jdbcSourceEventDispatcher;
            this.snapshotSplit = snapshotSplit;
            this.offsetContext = postgresOffsetContext;
            this.clock = Clock.SYSTEM;
            this.slotName = str;
            this.pluginName = str2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public SnapshotResult<PostgresOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, PostgresOffsetContext postgresOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            PostgresSnapshotContext postgresSnapshotContext = (PostgresSnapshotContext) snapshotContext;
            postgresSnapshotContext.offset = this.offsetContext;
            createSlotForBackFillReadTask();
            Utils.refreshSchema(this.databaseSchema, this.jdbcConnection, true);
            PostgresOffset currentOffset = Utils.currentOffset(this.jdbcConnection);
            LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", currentOffset, this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setLowWatermark(currentOffset);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, currentOffset, WatermarkKind.LOW);
            LOG.info("Snapshot step 2 - Snapshotting data");
            createDataEvents(postgresSnapshotContext, this.snapshotSplit.getTableId());
            PostgresOffset currentOffset2 = Utils.currentOffset(this.jdbcConnection);
            LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", currentOffset2, this.snapshotSplit);
            ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setHighWatermark(currentOffset2);
            this.dispatcher.dispatchWatermarkEvent(snapshotContext.partition.getSourcePartition(), this.snapshotSplit, currentOffset2, WatermarkKind.HIGH);
            if (!PostgresScanFetchTask.isBackFillRequired(currentOffset, currentOffset2)) {
                dropSlotForBackFillReadTask();
            }
            return SnapshotResult.completed(postgresSnapshotContext.offset);
        }

        private void createDataEvents(PostgresSnapshotContext postgresSnapshotContext, TableId tableId) throws InterruptedException {
            EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
            LOG.info("Snapshotting table {}", tableId);
            createDataEventsForTable(postgresSnapshotContext, snapshotChangeEventReceiver, (Table) Objects.requireNonNull(this.databaseSchema.tableFor(tableId)));
            snapshotChangeEventReceiver.completeSnapshot();
        }

        /* JADX WARN: Failed to calculate best type for var: r17v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r18v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 17, insn: 0x0298: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:80:0x0298 */
        /* JADX WARN: Not initialized variable reg: 18, insn: 0x029d: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:82:0x029d */
        /* JADX WARN: Type inference failed for: r17v1, types: [java.sql.PreparedStatement] */
        /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r2v26, types: [P extends io.debezium.pipeline.spi.Partition, io.debezium.pipeline.spi.Partition] */
        /* JADX WARN: Type inference failed for: r3v25, types: [io.debezium.pipeline.spi.OffsetContext, O extends io.debezium.pipeline.spi.OffsetContext] */
        private void createDataEventsForTable(PostgresSnapshotContext postgresSnapshotContext, EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver, Table table) throws InterruptedException {
            long currentTimeInMillis = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", this.snapshotSplit.splitId(), table.id());
            String buildSplitScanQuery = PostgresQueryUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.debug("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), buildSplitScanQuery});
            try {
                try {
                    PreparedStatement readTableSplitDataStatement = PostgresQueryUtils.readTableSplitDataStatement(this.jdbcConnection, buildSplitScanQuery, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                    Throwable th = null;
                    ResultSet executeQuery = readTableSplitDataStatement.executeQuery();
                    Throwable th2 = null;
                    try {
                        try {
                            ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                            long j = 0;
                            Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                            while (executeQuery.next()) {
                                j++;
                                Object[] objArr = new Object[array.getGreatestColumnPosition()];
                                for (int i = 0; i < array.getColumns().length; i++) {
                                    objArr[array.getColumns()[i].position() - 1] = executeQuery.getObject(i + 1);
                                }
                                if (tableScanLogTimer.expired()) {
                                    LOG.info("Exported {} records for split '{}' after {}", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                                    this.snapshotProgressListener.rowsScanned(postgresSnapshotContext.partition, table.id(), j);
                                    tableScanLogTimer = getTableScanLogTimer();
                                }
                                ((PostgresOffsetContext) postgresSnapshotContext.offset).event(table.id(), this.clock.currentTime());
                                this.dispatcher.dispatchSnapshotEvent(postgresSnapshotContext.partition, table.id(), new SnapshotChangeRecordEmitter(postgresSnapshotContext.partition, postgresSnapshotContext.offset, objArr, this.clock), snapshotReceiver);
                            }
                            LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                            if (executeQuery != null) {
                                if (0 != 0) {
                                    try {
                                        executeQuery.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    executeQuery.close();
                                }
                            }
                            if (readTableSplitDataStatement != null) {
                                if (0 != 0) {
                                    try {
                                        readTableSplitDataStatement.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    readTableSplitDataStatement.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (executeQuery != null) {
                            if (th2 != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new FlinkRuntimeException("Snapshotting of table " + table.id() + " failed", e);
            }
        }

        private void createSlotForBackFillReadTask() {
            String str;
            SlotState slotState = null;
            try {
                try {
                    slotState = this.jdbcConnection.getReplicationSlotState(this.slotName, this.pluginName);
                } catch (SQLException e) {
                    LOG.info("Unable to load info of replication slot, will try to create the slot");
                }
                if (slotState == null) {
                    try {
                        this.replicationConnection.createReplicationSlot().orElse(null);
                    } catch (SQLException e2) {
                        str = "Creation of replication slot failed";
                        throw new FlinkRuntimeException(e2.getMessage().contains("already exists") ? str + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each." : "Creation of replication slot failed", e2);
                    }
                }
                PostgresObjectUtils.waitForReplicationSlotReady(30, this.jdbcConnection, this.slotName, this.pluginName);
            } catch (Throwable th) {
                throw new FlinkRuntimeException(th);
            }
        }

        private void dropSlotForBackFillReadTask() {
            try {
                this.replicationConnection.close(true);
            } catch (Throwable th) {
                throw new FlinkRuntimeException(th);
            }
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(PostgresPartition postgresPartition, PostgresOffsetContext postgresOffsetContext) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public PostgresSnapshotContext prepare(PostgresPartition postgresPartition) throws Exception {
            return new PostgresSnapshotContext(postgresPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask$SnapshotSplitChangeEventSourceContext.class */
    public static class SnapshotSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
        private PostgresOffset lowWatermark;
        private PostgresOffset highWatermark;

        SnapshotSplitChangeEventSourceContext() {
        }

        public PostgresOffset getLowWatermark() {
            return this.lowWatermark;
        }

        public void setLowWatermark(PostgresOffset postgresOffset) {
            this.lowWatermark = postgresOffset;
        }

        public PostgresOffset getHighWatermark() {
            return this.highWatermark;
        }

        public void setHighWatermark(PostgresOffset postgresOffset) {
            this.highWatermark = postgresOffset;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return (this.lowWatermark == null || this.highWatermark == null) ? false : true;
        }
    }

    public PostgresScanFetchTask(SnapshotSplit snapshotSplit) {
        this.split = snapshotSplit;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public SourceSplitBase getSplit() {
        return this.split;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public void close() {
        this.taskRunning = false;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.FetchTask
    public void execute(FetchTask.Context context) throws Exception {
        LOG.info("Execute ScanFetchTask for split: {}", this.split);
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        this.taskRunning = true;
        PostgresSnapshotSplitReadTask postgresSnapshotSplitReadTask = new PostgresSnapshotSplitReadTask(postgresSourceFetchTaskContext.getConnection(), (PostgresReplicationConnection) postgresSourceFetchTaskContext.getReplicationConnection(), postgresSourceFetchTaskContext.getDbzConnectorConfig(), postgresSourceFetchTaskContext.getDatabaseSchema(), postgresSourceFetchTaskContext.getOffsetContext(), postgresSourceFetchTaskContext.getDispatcher(), postgresSourceFetchTaskContext.getSnapshotChangeEventSourceMetrics(), this.split, ((PostgresSourceConfig) postgresSourceFetchTaskContext.getSourceConfig()).getSlotNameForBackfillTask(), postgresSourceFetchTaskContext.getPluginName());
        SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        if (postgresSnapshotSplitReadTask.execute(snapshotSplitChangeEventSourceContext, postgresSourceFetchTaskContext.getPartition(), postgresSourceFetchTaskContext.getOffsetContext()).isCompletedOrSkipped()) {
            executeBackfillTask(postgresSourceFetchTaskContext, snapshotSplitChangeEventSourceContext);
        } else {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for postgres split %s fail", this.split));
        }
    }

    private void executeBackfillTask(PostgresSourceFetchTaskContext postgresSourceFetchTaskContext, SnapshotSplitChangeEventSourceContext snapshotSplitChangeEventSourceContext) throws InterruptedException {
        StreamSplit streamSplit = new StreamSplit(this.split.splitId(), snapshotSplitChangeEventSourceContext.getLowWatermark(), snapshotSplitChangeEventSourceContext.getHighWatermark(), new ArrayList(), this.split.getTableSchemas(), 0);
        if (!isBackFillRequired(streamSplit.getStartingOffset(), streamSplit.getEndingOffset())) {
            LOG.info("Skip the backfill {} for split {}: low watermark >= high watermark", streamSplit, this.split);
            postgresSourceFetchTaskContext.getDispatcher().dispatchWatermarkEvent(postgresSourceFetchTaskContext.getPartition().getSourcePartition(), streamSplit, streamSplit.getEndingOffset(), WatermarkKind.END);
            this.taskRunning = false;
        } else {
            PostgresOffsetContext postgresOffsetContext = PostgresOffsetUtils.getPostgresOffsetContext(new PostgresOffsetContext.Loader(postgresSourceFetchTaskContext.getDbzConnectorConfig()), streamSplit.getStartingOffset());
            PostgresStreamFetchTask.StreamSplitReadTask streamSplitReadTask = new PostgresStreamFetchTask.StreamSplitReadTask(postgresSourceFetchTaskContext.getDbzConnectorConfig(), postgresSourceFetchTaskContext.getSnapShotter(), postgresSourceFetchTaskContext.getConnection(), postgresSourceFetchTaskContext.getDispatcher(), postgresSourceFetchTaskContext.getPostgresDispatcher(), postgresSourceFetchTaskContext.getErrorHandler(), postgresSourceFetchTaskContext.getTaskContext().getClock(), postgresSourceFetchTaskContext.getDatabaseSchema(), postgresSourceFetchTaskContext.getTaskContext(), postgresSourceFetchTaskContext.getReplicationConnection(), streamSplit);
            LOG.info("Execute backfillReadTask for split {} with slot name {}", this.split, ((PostgresSourceConfig) postgresSourceFetchTaskContext.getSourceConfig()).getSlotNameForBackfillTask());
            streamSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext) new PostgresChangeEventSourceContext(), postgresSourceFetchTaskContext.getPartition(), postgresOffsetContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isBackFillRequired(Offset offset, Offset offset2) {
        return offset2.isAfter(offset);
    }
}
