/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubmittedRecords {
    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
    final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new HashMap<Map<String, Object>, Deque<SubmittedRecord>>();
    private int numUnackedMessages = 0;
    private CountDownLatch messageDrainLatch;

    public SubmittedRecord submit(SourceRecord record) {
        return this.submit(record.sourcePartition(), record.sourceOffset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
        SubmittedRecord result = new SubmittedRecord(partition, offset);
        this.records.computeIfAbsent(result.partition(), p2 -> new LinkedList()).add(result);
        SubmittedRecords submittedRecords = this;
        synchronized (submittedRecords) {
            ++this.numUnackedMessages;
        }
        return result;
    }

    public boolean removeLastOccurrence(SubmittedRecord record) {
        Deque<SubmittedRecord> deque = this.records.get(record.partition());
        if (deque == null) {
            log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", (Object)record.partition());
            return false;
        }
        boolean result = deque.removeLastOccurrence(record);
        if (deque.isEmpty()) {
            this.records.remove(record.partition());
        }
        if (result) {
            this.messageAcked();
        } else {
            log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", (Object)record.partition());
        }
        return result;
    }

    public CommittableOffsets committableOffsets() {
        HashMap<Map<String, Object>, Map<String, Object>> offsets = new HashMap<Map<String, Object>, Map<String, Object>>();
        int totalCommittableMessages = 0;
        int totalUncommittableMessages = 0;
        int largestDequeSize = 0;
        Map<String, Object> largestDequePartition = null;
        for (Map.Entry<Map<String, Object>, Deque<SubmittedRecord>> entry : this.records.entrySet()) {
            Map<String, Object> partition = entry.getKey();
            Deque<SubmittedRecord> queuedRecords = entry.getValue();
            int initialDequeSize = queuedRecords.size();
            if (this.canCommitHead(queuedRecords)) {
                Map<String, Object> offset = this.committableOffset(queuedRecords);
                offsets.put(partition, offset);
            }
            int uncommittableMessages = queuedRecords.size();
            int committableMessages = initialDequeSize - uncommittableMessages;
            totalCommittableMessages += committableMessages;
            totalUncommittableMessages += uncommittableMessages;
            if (uncommittableMessages <= largestDequeSize) continue;
            largestDequeSize = uncommittableMessages;
            largestDequePartition = partition;
        }
        this.records.values().removeIf(Collection::isEmpty);
        return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, this.records.size(), largestDequeSize, largestDequePartition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
        CountDownLatch messageDrainLatch;
        SubmittedRecords submittedRecords = this;
        synchronized (submittedRecords) {
            this.messageDrainLatch = messageDrainLatch = new CountDownLatch(this.numUnackedMessages);
        }
        try {
            return messageDrainLatch.await(timeout, timeUnit);
        }
        catch (InterruptedException e2) {
            return false;
        }
    }

    private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) {
        Map result = null;
        while (this.canCommitHead(queuedRecords)) {
            result = queuedRecords.poll().offset();
        }
        return result;
    }

    private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) {
        return queuedRecords.peek() != null && queuedRecords.peek().acked();
    }

    private synchronized void messageAcked() {
        --this.numUnackedMessages;
        if (this.messageDrainLatch != null) {
            this.messageDrainLatch.countDown();
        }
    }

    static class CommittableOffsets {
        public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
        private final Map<Map<String, Object>, Map<String, Object>> offsets;
        private final int numCommittableMessages;
        private final int numUncommittableMessages;
        private final int numDeques;
        private final int largestDequeSize;
        private final Map<String, Object> largestDequePartition;

        CommittableOffsets(Map<Map<String, Object>, Map<String, Object>> offsets, int numCommittableMessages, int numUncommittableMessages, int numDeques, int largestDequeSize, Map<String, Object> largestDequePartition) {
            this.offsets = offsets != null ? new HashMap<Map<String, Object>, Map<String, Object>>(offsets) : Collections.emptyMap();
            this.numCommittableMessages = numCommittableMessages;
            this.numUncommittableMessages = numUncommittableMessages;
            this.numDeques = numDeques;
            this.largestDequeSize = largestDequeSize;
            this.largestDequePartition = largestDequePartition;
        }

        public Map<Map<String, Object>, Map<String, Object>> offsets() {
            return Collections.unmodifiableMap(this.offsets);
        }

        public int numCommittableMessages() {
            return this.numCommittableMessages;
        }

        public int numUncommittableMessages() {
            return this.numUncommittableMessages;
        }

        public int numDeques() {
            return this.numDeques;
        }

        public int largestDequeSize() {
            return this.largestDequeSize;
        }

        public Map<String, Object> largestDequePartition() {
            return this.largestDequePartition;
        }

        public boolean hasPending() {
            return this.numUncommittableMessages > 0;
        }

        public boolean isEmpty() {
            return this.numCommittableMessages == 0 && this.numUncommittableMessages == 0 && this.offsets.isEmpty();
        }

        public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) {
            HashMap<Map<String, Object>, Map<String, Object>> offsets = new HashMap<Map<String, Object>, Map<String, Object>>(this.offsets);
            offsets.putAll(newerOffsets.offsets);
            return new CommittableOffsets(offsets, this.numCommittableMessages + newerOffsets.numCommittableMessages, newerOffsets.numUncommittableMessages, newerOffsets.numDeques, newerOffsets.largestDequeSize, newerOffsets.largestDequePartition);
        }
    }

    class SubmittedRecord {
        private final Map<String, Object> partition;
        private final Map<String, Object> offset;
        private final AtomicBoolean acked;

        public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) {
            this.partition = partition;
            this.offset = offset;
            this.acked = new AtomicBoolean(false);
        }

        public void ack() {
            if (this.acked.compareAndSet(false, true)) {
                SubmittedRecords.this.messageAcked();
            }
        }

        private boolean acked() {
            return this.acked.get();
        }

        private Map<String, Object> partition() {
            return this.partition;
        }

        private Map<String, Object> offset() {
            return this.offset;
        }
    }
}

