package org.apache.flink.iteration.operator;

import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.iteration.operator.HeadOperator;
import org.apache.flink.iteration.operator.event.CoordinatorCheckpointEvent;
import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;

/* loaded from: input_file:org/apache/flink/iteration/operator/HeadOperatorCheckpointAligner.class */
class HeadOperatorCheckpointAligner {
    private final TreeMap<Long, CheckpointAlignment> checkpointAlignmments = new TreeMap<>();
    private long latestCheckpointFromCoordinator;
    private long latestAbortedCheckpoint;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/operator/HeadOperatorCheckpointAligner$CheckpointAlignment.class */
    public static class CheckpointAlignment {
        final List<GloballyAlignedEvent> pendingGlobalEvents = new ArrayList();
        boolean notifiedFromChannels;
        boolean notifiedFromCoordinator;

        public CheckpointAlignment(boolean z, boolean z2) {
            this.notifiedFromChannels = z;
            this.notifiedFromCoordinator = z2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitTillCoordinatorNotified(HeadOperator.HeadOperatorStatus headOperatorStatus, long j, RunnableWithException runnableWithException) throws Exception {
        CheckpointAlignment checkpointAlignment = (CheckpointAlignment) this.checkpointAlignmments.computeIfAbsent(Long.valueOf(j), l -> {
            return new CheckpointAlignment(true, headOperatorStatus != HeadOperator.HeadOperatorStatus.RUNNING);
        });
        while (!checkpointAlignment.notifiedFromCoordinator) {
            runnableWithException.run();
        }
        checkpointAlignment.notifiedFromChannels = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void coordinatorNotify(CoordinatorCheckpointEvent coordinatorCheckpointEvent) {
        Preconditions.checkState(coordinatorCheckpointEvent.getCheckpointId() > this.latestCheckpointFromCoordinator);
        this.latestCheckpointFromCoordinator = coordinatorCheckpointEvent.getCheckpointId();
        if (this.latestCheckpointFromCoordinator <= this.latestAbortedCheckpoint) {
            return;
        }
        ((CheckpointAlignment) this.checkpointAlignmments.computeIfAbsent(Long.valueOf(coordinatorCheckpointEvent.getCheckpointId()), l -> {
            return new CheckpointAlignment(false, true);
        })).notifiedFromCoordinator = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<GloballyAlignedEvent> checkHoldingGloballyAlignedEvent(GloballyAlignedEvent globallyAlignedEvent) {
        CheckpointAlignment checkpointAlignment = this.checkpointAlignmments.get(Long.valueOf(this.latestCheckpointFromCoordinator));
        if (checkpointAlignment == null || checkpointAlignment.notifiedFromChannels) {
            return Optional.of(globallyAlignedEvent);
        }
        checkpointAlignment.pendingGlobalEvents.add(globallyAlignedEvent);
        return Optional.empty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<GloballyAlignedEvent> onStateSnapshot(long j) {
        CheckpointAlignment remove = this.checkpointAlignmments.remove(Long.valueOf(j));
        Preconditions.checkState(remove.notifiedFromCoordinator && remove.notifiedFromChannels, "Checkpoint " + j + " is not fully aligned");
        return remove.pendingGlobalEvents;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<GloballyAlignedEvent> onCheckpointAborted(long j) {
        Preconditions.checkState(j > this.latestAbortedCheckpoint);
        this.latestAbortedCheckpoint = j;
        NavigableMap<Long, CheckpointAlignment> headMap = this.checkpointAlignmments.headMap(Long.valueOf(this.latestAbortedCheckpoint), true);
        ArrayList arrayList = new ArrayList();
        headMap.values().forEach(checkpointAlignment -> {
            arrayList.addAll(checkpointAlignment.pendingGlobalEvents);
        });
        headMap.clear();
        return arrayList;
    }
}
