/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.CountingAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class SortMergeSubpartitionReaderTest
extends TestLogger {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public Timeout timeout = new Timeout(60L, TimeUnit.SECONDS);

    @Before
    public void before() throws Exception {
        Random random = new Random();
        random.nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(this.temporaryFolder.newFile().getAbsolutePath(), 10, 10, 1024, dataBytes);
        this.dataFileChannel = SortMergeSubpartitionReaderTest.openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = SortMergeSubpartitionReaderTest.openFileChannel(this.partitionedFile.getIndexFilePath());
    }

    @After
    public void after() {
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{this.dataFileChannel, this.indexFileChannel});
        this.partitionedFile.deleteQuietly();
    }

    @Test
    public void testReadBuffers() throws Exception {
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
        Assert.assertEquals((long)0L, (long)listener.numNotifications);
        Assert.assertEquals((long)0L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(2);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assert.assertEquals((long)1L, (long)listener.numNotifications);
        Assert.assertEquals((long)2L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertEquals((long)0L, (long)segments.size());
        segments = SortMergeSubpartitionReaderTest.createsMemorySegments(2);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assert.assertEquals((long)1L, (long)listener.numNotifications);
        Assert.assertEquals((long)4L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertEquals((long)0L, (long)segments.size());
        while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) {
            ((ResultSubpartition.BufferAndBacklog)Preconditions.checkNotNull((Object)subpartitionReader.getNextBuffer())).buffer().recycleBuffer();
        }
        segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assert.assertEquals((long)2L, (long)listener.numNotifications);
        Assert.assertEquals((long)6L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertEquals((long)4L, (long)segments.size());
    }

    @Test
    public void testPollBuffers() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        Assert.assertNull((Object)subpartitionReader.getNextBuffer());
        Assert.assertFalse((boolean)subpartitionReader.isAvailable(Integer.MAX_VALUE));
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        for (int i = 9; i >= 0; --i) {
            Assert.assertTrue((boolean)subpartitionReader.isAvailable(i));
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = (ResultSubpartition.BufferAndBacklog)Preconditions.checkNotNull((Object)subpartitionReader.getNextBuffer());
            Assert.assertEquals((Object)ByteBuffer.wrap(dataBytes), (Object)bufferAndBacklog.buffer().getNioBufferReadable());
            Assert.assertEquals((long)bufferAndBacklog.buffersInBacklog(), (long)(i == 0 ? 0L : (long)(i - 1)));
            Buffer.DataType dataType = i == 0 ? Buffer.DataType.NONE : (i > 1 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER);
            Assert.assertEquals((Object)dataType, (Object)bufferAndBacklog.getNextDataType());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFail() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            CountingAvailabilityListener listener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
            subpartitionReader.readBuffers(segments, segments::add);
            Assert.assertEquals((long)1L, (long)listener.numNotifications);
            Assert.assertEquals((long)5L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
            subpartitionReader.fail((Throwable)new RuntimeException("Test exception."));
            Assert.assertTrue((boolean)subpartitionReader.getReleaseFuture().isDone());
            Assert.assertEquals((long)0L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
            Assert.assertTrue((boolean)subpartitionReader.isAvailable(0));
            Assert.assertTrue((boolean)subpartitionReader.isReleased());
            Assert.assertEquals((long)2L, (long)listener.numNotifications);
            Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        }
        finally {
            Assert.assertEquals((long)numSegments, (long)segments.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseAllResources() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            CountingAvailabilityListener listener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(listener);
            subpartitionReader.readBuffers(segments, segments::add);
            Assert.assertEquals((long)1L, (long)listener.numNotifications);
            Assert.assertEquals((long)5L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
            subpartitionReader.releaseAllResources();
            Assert.assertTrue((boolean)subpartitionReader.getReleaseFuture().isDone());
            Assert.assertEquals((long)0L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
            Assert.assertTrue((boolean)subpartitionReader.isAvailable(0));
            Assert.assertTrue((boolean)subpartitionReader.isReleased());
            Assert.assertEquals((long)1L, (long)listener.numNotifications);
            Assert.assertNull((Object)subpartitionReader.getFailureCause());
        }
        finally {
            Assert.assertEquals((long)numSegments, (long)segments.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IllegalStateException.class)
    public void testReadBuffersAfterReleased() throws Exception {
        int numSegments = 5;
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(numSegments);
        try {
            SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
            subpartitionReader.readBuffers(segments, segments::add);
            subpartitionReader.releaseAllResources();
            subpartitionReader.readBuffers(segments, segments::add);
        }
        finally {
            Assert.assertEquals((long)numSegments, (long)segments.size());
        }
    }

    @Test
    public void testPollBuffersAfterReleased() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        Queue<MemorySegment> segments = SortMergeSubpartitionReaderTest.createsMemorySegments(10);
        subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assert.assertTrue((boolean)subpartitionReader.isAvailable(Integer.MAX_VALUE));
        subpartitionReader.releaseAllResources();
        Assert.assertNull((Object)subpartitionReader.getNextBuffer());
    }

    private SortMergeSubpartitionReader createSortMergeSubpartitionReader(BufferAvailabilityListener listener) throws Exception {
        PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, 0, this.dataFileChannel, this.indexFileChannel);
        Assert.assertTrue((boolean)fileReader.hasRemaining());
        return new SortMergeSubpartitionReader(listener, fileReader);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static Queue<MemorySegment> createsMemorySegments(int numSegments) {
        ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>();
        for (int i = 0; i < numSegments; ++i) {
            segments.add(MemorySegmentFactory.allocateUnpooledSegment((int)1024));
        }
        return segments;
    }
}

