package org.apache.flink.iteration.datacache.nonkeyed;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.class */
public class DataCacheWriter<T> {
    private final TypeSerializer<T> serializer;
    private final FileSystem fileSystem;
    private final SupplierWithException<Path, IOException> pathGenerator;
    private final List<Segment> finishSegments;
    private DataCacheWriter<T>.SegmentWriter currentSegment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter$SegmentWriter.class */
    public class SegmentWriter {
        private final Path path;
        private final FSDataOutputStream outputStream;
        private final DataOutputView outputView;
        private int currentSegmentCount;

        public SegmentWriter(Path path) throws IOException {
            this.path = path;
            this.outputStream = DataCacheWriter.this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
            this.outputView = new DataOutputViewStreamWrapper(this.outputStream);
        }

        public void addRecord(T t) throws IOException {
            DataCacheWriter.this.serializer.serialize(t, this.outputView);
            this.currentSegmentCount++;
        }

        public Optional<Segment> finish() throws IOException {
            this.outputStream.flush();
            long pos = this.outputStream.getPos();
            this.outputStream.close();
            if (this.currentSegmentCount > 0) {
                return Optional.of(new Segment(this.path, this.currentSegmentCount, pos));
            }
            DataCacheWriter.this.fileSystem.delete(this.path, false);
            return Optional.empty();
        }
    }

    public DataCacheWriter(TypeSerializer<T> typeSerializer, FileSystem fileSystem, SupplierWithException<Path, IOException> supplierWithException) throws IOException {
        this(typeSerializer, fileSystem, supplierWithException, Collections.emptyList());
    }

    public DataCacheWriter(TypeSerializer<T> typeSerializer, FileSystem fileSystem, SupplierWithException<Path, IOException> supplierWithException, List<Segment> list) throws IOException {
        this.serializer = typeSerializer;
        this.fileSystem = fileSystem;
        this.pathGenerator = supplierWithException;
        this.finishSegments = new ArrayList(list);
        this.currentSegment = new SegmentWriter((Path) supplierWithException.get());
    }

    public void addRecord(T t) throws IOException {
        this.currentSegment.addRecord(t);
    }

    public void finishCurrentSegment() throws IOException {
        finishCurrentSegment(true);
    }

    public List<Segment> finish() throws IOException {
        finishCurrentSegment(false);
        return this.finishSegments;
    }

    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    public List<Segment> getFinishSegments() {
        return this.finishSegments;
    }

    private void finishCurrentSegment(boolean z) throws IOException {
        if (this.currentSegment != null) {
            Optional<Segment> finish = this.currentSegment.finish();
            List<Segment> list = this.finishSegments;
            list.getClass();
            finish.ifPresent((v1) -> {
                r1.add(v1);
            });
            this.currentSegment = null;
        }
        if (z) {
            this.currentSegment = new SegmentWriter((Path) this.pathGenerator.get());
        }
    }

    public void cleanup() throws IOException {
        finishCurrentSegment();
        Iterator<Segment> it = this.finishSegments.iterator();
        while (it.hasNext()) {
            this.fileSystem.delete(it.next().getPath(), false);
        }
    }
}
