package org.apache.flink.iteration.checkpoint;

import java.io.IOException;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheSnapshot;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/iteration/checkpoint/Checkpoints.class */
public class Checkpoints<T> implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
    private final TypeSerializer<T> typeSerializer;
    private final FileSystem fileSystem;
    private final SupplierWithException<Path, IOException> pathSupplier;
    private final ConcurrentHashMap<Long, Tuple2<Checkpoints<T>.PendingCheckpoint, Boolean>> uncompletedCheckpoints = new ConcurrentHashMap<>();
    private final TreeMap<Long, Checkpoints<T>.PendingCheckpoint> sortedUncompletedCheckpoints = new TreeMap<>();
    private long latestCompletedCheckpointId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/checkpoint/Checkpoints$PendingCheckpoint.class */
    public class PendingCheckpoint {
        final DataCacheWriter<T> dataCacheWriter;
        final OperatorStateCheckpointOutputStream checkpointOutputStream;
        final ResourceGuard.Lease snapshotLease;

        public PendingCheckpoint(DataCacheWriter<T> dataCacheWriter, OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream, ResourceGuard.Lease lease) {
            this.dataCacheWriter = dataCacheWriter;
            this.checkpointOutputStream = operatorStateCheckpointOutputStream;
            this.snapshotLease = lease;
        }
    }

    public Checkpoints(TypeSerializer<T> typeSerializer, FileSystem fileSystem, SupplierWithException<Path, IOException> supplierWithException) {
        this.typeSerializer = typeSerializer;
        this.fileSystem = fileSystem;
        Preconditions.checkState(!fileSystem.isDistributedFS(), "Currently only local fs is supported");
        this.pathSupplier = supplierWithException;
    }

    public TypeSerializer<T> getTypeSerializer() {
        return this.typeSerializer;
    }

    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    public SupplierWithException<Path, IOException> getPathSupplier() {
        return this.pathSupplier;
    }

    public void startLogging(long j, OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream) throws IOException {
        if (j <= this.latestCompletedCheckpointId) {
            return;
        }
        Tuple2<Checkpoints<T>.PendingCheckpoint, Boolean> computeIfAbsent = this.uncompletedCheckpoints.computeIfAbsent(Long.valueOf(j), l -> {
            try {
                return new Tuple2(new PendingCheckpoint(new DataCacheWriter(this.typeSerializer, this.fileSystem, this.pathSupplier), operatorStateCheckpointOutputStream, operatorStateCheckpointOutputStream.acquireLease()), false);
            } catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        });
        if (((Boolean) computeIfAbsent.f1).booleanValue()) {
            return;
        }
        this.sortedUncompletedCheckpoints.put(Long.valueOf(j), computeIfAbsent.f0);
    }

    public void abort(long j) {
        this.uncompletedCheckpoints.compute(Long.valueOf(j), (l, tuple2) -> {
            if (tuple2 == null) {
                return new Tuple2((Object) null, true);
            }
            ((PendingCheckpoint) tuple2.f0).snapshotLease.close();
            return new Tuple2(tuple2.f0, true);
        });
    }

    public void append(T t) throws IOException {
        Iterator<Checkpoints<T>.PendingCheckpoint> it = this.sortedUncompletedCheckpoints.values().iterator();
        while (it.hasNext()) {
            it.next().dataCacheWriter.addRecord(t);
        }
    }

    public void commitCheckpointsUntil(long j) {
        if (this.latestCompletedCheckpointId < j) {
            this.latestCompletedCheckpointId = j;
        }
        NavigableMap<Long, Checkpoints<T>.PendingCheckpoint> headMap = this.sortedUncompletedCheckpoints.headMap(Long.valueOf(j), true);
        headMap.values().forEach(pendingCheckpoint -> {
            try {
                try {
                    pendingCheckpoint.dataCacheWriter.finish();
                    DataCacheSnapshot dataCacheSnapshot = new DataCacheSnapshot(this.fileSystem, null, pendingCheckpoint.dataCacheWriter.getFinishSegments());
                    pendingCheckpoint.checkpointOutputStream.startNewPartition();
                    dataCacheSnapshot.writeTo(pendingCheckpoint.checkpointOutputStream);
                    pendingCheckpoint.dataCacheWriter.cleanup();
                    pendingCheckpoint.snapshotLease.close();
                } catch (Exception e) {
                    LOG.error("Failed to commit checkpoint until " + j, e);
                    throw new FlinkRuntimeException(e);
                }
            } catch (Throwable th) {
                pendingCheckpoint.snapshotLease.close();
                throw th;
            }
        });
        headMap.clear();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.sortedUncompletedCheckpoints.forEach((l, pendingCheckpoint) -> {
            pendingCheckpoint.snapshotLease.close();
            try {
                pendingCheckpoint.dataCacheWriter.cleanup();
            } catch (IOException e) {
                LOG.error("Failed to cleanup " + l, e);
            }
        });
        this.sortedUncompletedCheckpoints.clear();
        this.uncompletedCheckpoints.clear();
    }
}
