/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.AwaitableBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PipelinedSubpartitionWithReadViewTest {
    ResultPartition resultPartition;
    PipelinedSubpartition subpartition;
    AwaitableBufferAvailablityListener availablityListener;
    PipelinedSubpartitionView readView;
    @Parameterized.Parameter
    public boolean compressionEnabled;

    @Parameterized.Parameters(name="compressionEnabled = {0}")
    public static Boolean[] parameters() {
        return new Boolean[]{false, true};
    }

    @Before
    public void before() throws IOException {
        this.setup(ResultPartitionType.PIPELINED);
        this.subpartition = new PipelinedSubpartition(0, this.resultPartition);
        this.availablityListener = new AwaitableBufferAvailablityListener();
        this.readView = this.subpartition.createReadView((BufferAvailabilityListener)this.availablityListener);
    }

    @After
    public void tearDown() {
        this.readView.releaseAllResources();
        this.subpartition.release();
    }

    @Test(expected=IllegalStateException.class)
    public void testAddTwoNonFinishedBuffer() throws IOException {
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertNull((Object)this.readView.getNextBuffer());
    }

    @Test
    public void testRelease() {
        this.readView.releaseAllResources();
        this.resultPartition.close();
        Assert.assertFalse((boolean)this.resultPartition.getPartitionManager().getUnreleasedPartitions().contains(this.resultPartition.getPartitionId()));
    }

    @Test
    public void testAddEmptyNonFinishedBuffer() throws IOException {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        Assert.assertNull((Object)this.readView.getNextBuffer());
        bufferBuilder.finish();
        bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertNull((Object)this.readView.getNextBuffer());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testAddNonEmptyNotFinishedBuffer() throws Exception {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
    }

    @Test
    public void testUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        MatcherAssert.assertThat((Object)this.availablityListener.getNumNotifications(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.flush();
        MatcherAssert.assertThat((Object)oldNumNotifications, (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        Assert.assertEquals((long)oldNumNotifications, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)2L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
        this.subpartition.flush();
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 0, false, true);
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        Assert.assertEquals((long)(oldNumNotifications + 1L), (long)this.availablityListener.getNumNotifications());
        this.subpartition.flush();
        Assert.assertEquals((long)(oldNumNotifications + 1L), (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testMultipleEmptyBuffers() throws Exception {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)2L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, true);
    }

    @Test
    public void testEmptyFlush() {
        this.subpartition.flush();
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        Assert.assertEquals((long)1L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)0L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assert.assertEquals((long)32768L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        Assert.assertEquals((long)2L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)32768L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assert.assertEquals((long)65536L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.isAvailable(0));
        Assert.assertEquals((long)5L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)65536L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, true, 0, true, true);
        Assert.assertEquals((long)98304L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
        Assert.assertEquals((long)131072L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assert.assertEquals((long)163840L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)5L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)163840L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBarrierOvertaking() throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        this.subpartition.setChannelStateWriter((ChannelStateWriter)channelStateWriter);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumPriorityEvents());
        BufferConsumer eventBuffer = EventSerializer.toBufferConsumer((AbstractEvent)EndOfSuperstepEvent.INSTANCE, (boolean)false);
        this.subpartition.add(eventBuffer);
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumPriorityEvents());
        CheckpointOptions options = CheckpointOptions.unaligned((CheckpointStorageLocationReference)new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
        channelStateWriter.start(0L, options);
        BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(0L, 0L, options), (boolean)true);
        this.subpartition.add(barrierBuffer);
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumPriorityEvents());
        List inflight = channelStateWriter.getAddedOutput().get((Object)this.subpartition.getSubpartitionInfo());
        Assert.assertEquals(Arrays.asList(1, 2, 4), inflight.stream().map(Buffer::getSize).collect(Collectors.toList()));
        inflight.forEach(Buffer::recycleBuffer);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 2, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 2, true, 0, true, true);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, eventBuffer.getWrittenBytes(), EndOfSuperstepEvent.class, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 4, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testAvailabilityAfterPriority() throws Exception {
        this.subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        CheckpointOptions options = CheckpointOptions.unaligned((CheckpointStorageLocationReference)new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
        BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(0L, 0L, options), (boolean)true);
        this.subpartition.add(barrierBuffer);
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumPriorityEvents());
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 2, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(false, false);
    }

    @Test
    public void testBacklogConsistentWithConsumableBuffersForFlushedPartition() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(true, false);
    }

    @Test
    public void testBacklogConsistentWithConsumableBuffersForFinishedPartition() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(false, true);
    }

    private void testBacklogConsistentWithNumberOfConsumableBuffers(boolean isFlushRequested, boolean isFinished) throws Exception {
        int numberOfAddedBuffers = 5;
        for (int i = 1; i <= 5; ++i) {
            if (i < 5 || isFinished) {
                this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
                continue;
            }
            this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        }
        if (isFlushRequested) {
            this.subpartition.flush();
        }
        if (isFinished) {
            this.subpartition.finish();
        }
        int backlog = this.subpartition.getBuffersInBacklog();
        int numberOfConsumableBuffers = 0;
        try (CloseableRegistry closeableRegistry = new CloseableRegistry();){
            while (this.readView.isAvailable(Integer.MAX_VALUE)) {
                ResultSubpartition.BufferAndBacklog bufferAndBacklog = this.readView.getNextBuffer();
                Assert.assertNotNull((Object)bufferAndBacklog);
                if (bufferAndBacklog.buffer().isBuffer()) {
                    ++numberOfConsumableBuffers;
                }
                closeableRegistry.registerCloseable(() -> ((Buffer)bufferAndBacklog.buffer()).recycleBuffer());
            }
            MatcherAssert.assertThat((Object)backlog, (Matcher)Matchers.is((Object)numberOfConsumableBuffers));
        }
    }

    @Test
    public void testBlockedByCheckpointAndResumeConsumption() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        this.checkNumNotificationsAndAvailability(1);
        this.resumeConsumptionAndCheckAvailability(0, true);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
        this.blockSubpartitionByCheckpoint(2);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.flush();
        this.checkNumNotificationsAndAvailability(2);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        this.blockSubpartitionByCheckpoint(3);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.checkNumNotificationsAndAvailability(3);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
    }

    private void blockSubpartitionByCheckpoint(int numNotifications) throws IOException, InterruptedException {
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER));
        Assert.assertEquals((long)numNotifications, (long)this.availablityListener.getNumNotifications());
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
    }

    private void checkNumNotificationsAndAvailability(int numNotifications) throws IOException, InterruptedException {
        Assert.assertEquals((long)numNotifications, (long)this.availablityListener.getNumNotifications());
        Assert.assertFalse((boolean)this.readView.isAvailable(Integer.MAX_VALUE));
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    private void resumeConsumptionAndCheckAvailability(int availableCredit, boolean dataAvailable) {
        this.readView.resumeConsumption();
        Assert.assertEquals((Object)dataAvailable, (Object)this.readView.isAvailable(availableCredit));
    }

    static void assertNextBuffer(ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        PipelinedSubpartitionWithReadViewTest.assertNextBufferOrEvent(readView, expectedReadableBufferSize, true, null, expectedIsDataAvailable, expectedBuffersInBacklog, expectedIsEventAvailable, expectedRecycledAfterRecycle);
    }

    static void assertNextEvent(ResultSubpartitionView readView, int expectedReadableBufferSize, Class<? extends AbstractEvent> expectedEventClass, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        PipelinedSubpartitionWithReadViewTest.assertNextBufferOrEvent(readView, expectedReadableBufferSize, false, expectedEventClass, expectedIsDataAvailable, expectedBuffersInBacklog, expectedIsEventAvailable, expectedRecycledAfterRecycle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void assertNextBufferOrEvent(ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsBuffer, @Nullable Class<? extends AbstractEvent> expectedEventClass, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        Preconditions.checkArgument((expectedEventClass == null || !expectedIsBuffer ? 1 : 0) != 0);
        ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
        Assert.assertNotNull((Object)bufferAndBacklog);
        try {
            Assert.assertEquals((String)"buffer size", (long)expectedReadableBufferSize, (long)bufferAndBacklog.buffer().readableBytes());
            Assert.assertEquals((String)"buffer or event", (Object)expectedIsBuffer, (Object)bufferAndBacklog.buffer().isBuffer());
            if (expectedEventClass != null) {
                Assert.assertThat((Object)EventSerializer.fromBuffer((Buffer)bufferAndBacklog.buffer(), (ClassLoader)ClassLoader.getSystemClassLoader()), (Matcher)IsInstanceOf.instanceOf(expectedEventClass));
            }
            Assert.assertEquals((String)"data available", (Object)expectedIsDataAvailable, (Object)bufferAndBacklog.isDataAvailable());
            Assert.assertEquals((String)"data available", (Object)expectedIsDataAvailable, (Object)readView.isAvailable(Integer.MAX_VALUE));
            Assert.assertEquals((String)"backlog", (long)expectedBuffersInBacklog, (long)bufferAndBacklog.buffersInBacklog());
            Assert.assertEquals((String)"event available", (Object)expectedIsEventAvailable, (Object)bufferAndBacklog.isEventAvailable());
            Assert.assertEquals((String)"event available", (Object)expectedIsEventAvailable, (Object)readView.isAvailable(0));
            Assert.assertFalse((String)"not recycled", (boolean)bufferAndBacklog.buffer().isRecycled());
        }
        finally {
            bufferAndBacklog.buffer().recycleBuffer();
        }
        Assert.assertEquals((String)"recycled", (Object)expectedRecycledAfterRecycle, (Object)bufferAndBacklog.buffer().isRecycled());
    }

    static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
        Assert.assertNull((Object)readView.getNextBuffer());
    }

    void setup(ResultPartitionType resultPartitionType) throws IOException {
        this.resultPartition = PartitionTestUtils.createPartition(resultPartitionType, (FileChannelManager)NoOpFileChannelManager.INSTANCE, this.compressionEnabled, 32768);
        this.resultPartition.setup();
    }
}

