package io.debezium.connector.postgresql.connection.wal2json;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.TransactionMessage;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.function.Function;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.class */
public class StreamingWal2JsonMessageDecoder extends AbstractMessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class);
    private static final byte TAB = 9;
    private static final byte CR = 13;
    private static final byte SPACE = 32;
    private static final byte COMMA = 44;
    private static final byte RIGHT_BRACKET = 93;
    private static final byte LEFT_BRACE = 123;
    private static final byte RIGHT_BRACE = 125;
    private static final long UNDEFINED_LONG = -1;
    private final DateTimeFormat dateTime = DateTimeFormat.get();
    private boolean containsMetadata = false;
    private boolean messageInProgress = false;
    private byte[] currentChunk;
    private Long txId;
    private Instant commitTime;

    @Override // io.debezium.connector.postgresql.connection.AbstractMessageDecoder
    public void processNotEmptyMessage(ByteBuffer byteBuffer, ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
        try {
            if (!byteBuffer.hasArray()) {
                throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
            }
            byte[] array = byteBuffer.array();
            byte[] copyOfRange = Arrays.copyOfRange(array, byteBuffer.arrayOffset(), array.length + 2);
            int length = copyOfRange.length - 1;
            copyOfRange[length - 1] = 32;
            copyOfRange[length] = 32;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Chunk arrived from database {}", new String(copyOfRange));
            }
            if (this.messageInProgress) {
                nonInitialChunk(replicationMessageProcessor, typeRegistry, copyOfRange);
            } else if (getFirstNonWhiteChar(copyOfRange) != LEFT_BRACE) {
                outOfOrderChunk(copyOfRange);
                nonInitialChunk(replicationMessageProcessor, typeRegistry, copyOfRange);
            } else {
                if (getLastNonWhiteChar(copyOfRange) != RIGHT_BRACE) {
                    copyOfRange[length - 1] = RIGHT_BRACKET;
                    copyOfRange[length] = RIGHT_BRACE;
                }
                Document read = DocumentReader.defaultReader().read(copyOfRange);
                if (read.has("kind")) {
                    outOfOrderChunk(copyOfRange);
                    nonInitialChunk(replicationMessageProcessor, typeRegistry, copyOfRange);
                } else {
                    this.txId = read.getLong("xid");
                    this.commitTime = this.dateTime.systemTimestampToInstant(read.getString("timestamp"));
                    this.messageInProgress = true;
                    this.currentChunk = null;
                    replicationMessageProcessor.process(new TransactionMessage(ReplicationMessage.Operation.BEGIN, this.txId.longValue(), this.commitTime));
                }
            }
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    protected void nonInitialChunk(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor, TypeRegistry typeRegistry, byte[] bArr) throws IOException, SQLException, InterruptedException {
        byte firstNonWhiteChar = getFirstNonWhiteChar(bArr);
        if (firstNonWhiteChar == LEFT_BRACE) {
            this.currentChunk = bArr;
            return;
        }
        if (firstNonWhiteChar == 44) {
            if (this.currentChunk != null) {
                doProcessMessage(replicationMessageProcessor, typeRegistry, this.currentChunk, false);
            }
            replaceFirstNonWhiteChar(bArr, (byte) 32);
            this.currentChunk = bArr;
            return;
        }
        if (firstNonWhiteChar != RIGHT_BRACKET) {
            throw new ConnectException("Chunk arrived in unexpected state");
        }
        doProcessMessage(replicationMessageProcessor, typeRegistry, this.currentChunk, true);
        this.messageInProgress = false;
        replicationMessageProcessor.process(new TransactionMessage(ReplicationMessage.Operation.COMMIT, this.txId.longValue(), this.commitTime));
    }

    protected void outOfOrderChunk(byte[] bArr) {
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("Got out of order chunk {}, recording artifical TX", new String(bArr));
        }
        this.txId = -1L;
        this.commitTime = Instant.now();
        this.messageInProgress = true;
        this.currentChunk = null;
    }

    private byte getLastNonWhiteChar(byte[] bArr) throws IllegalArgumentException {
        for (int length = bArr.length - 1; length >= 0; length--) {
            if (!isWhitespace(bArr[length])) {
                return bArr[length];
            }
        }
        throw new IllegalArgumentException("No non-white char");
    }

    private byte getFirstNonWhiteChar(byte[] bArr) throws IllegalArgumentException {
        for (int i = 0; i < bArr.length; i++) {
            if (!isWhitespace(bArr[i])) {
                return bArr[i];
            }
        }
        throw new IllegalArgumentException("No non-white char");
    }

    private void replaceFirstNonWhiteChar(byte[] bArr, byte b) {
        for (int i = 0; i < bArr.length; i++) {
            if (!isWhitespace(bArr[i])) {
                bArr[i] = b;
                return;
            }
        }
    }

    private boolean isWhitespace(byte b) {
        return (b >= 9 && b <= 13) || b == 32;
    }

    private void doProcessMessage(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor, TypeRegistry typeRegistry, byte[] bArr, boolean z) throws IOException, SQLException, InterruptedException {
        if (bArr == null) {
            LOGGER.trace("Empty change arrived");
            replicationMessageProcessor.process(new ReplicationMessage.NoopMessage(this.txId, this.commitTime));
        } else {
            Document read = DocumentReader.floatNumbersAsTextReader().read(bArr);
            LOGGER.trace("Change arrived for decoding {}", read);
            replicationMessageProcessor.process(new Wal2JsonReplicationMessage(this.txId, this.commitTime, read, this.containsMetadata, z, typeRegistry));
        }
    }

    @Override // io.debezium.connector.postgresql.connection.MessageDecoder
    public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder chainedLogicalStreamBuilder, Function<Integer, Boolean> function) {
        return optionsWithoutMetadata(chainedLogicalStreamBuilder, function).withSlotOption("include-not-null", "true");
    }

    @Override // io.debezium.connector.postgresql.connection.MessageDecoder
    public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder chainedLogicalStreamBuilder, Function<Integer, Boolean> function) {
        return chainedLogicalStreamBuilder.withSlotOption("pretty-print", 1).withSlotOption("write-in-chunks", 1).withSlotOption("include-xids", 1).withSlotOption("include-timestamp", 1);
    }

    @Override // io.debezium.connector.postgresql.connection.MessageDecoder
    public void setContainsMetadata(boolean z) {
        this.containsMetadata = z;
    }
}
