/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RecordWriterTest {
    private final boolean isBroadcastWriter;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public RecordWriterTest() {
        this(false);
    }

    RecordWriterTest(boolean isBroadcastWriter) {
        this.isBroadcastWriter = isBroadcastWriter;
    }

    @Test
    public void testBroadcastEventNoRecords() throws Exception {
        int numberOfChannels = 4;
        int bufferSize = 32;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numberOfChannels);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        CheckpointBarrier barrier = new CheckpointBarrier(2148402839L, 2166311875L, CheckpointOptions.forCheckpointWithDefaultLocation());
        writer.broadcastEvent((AbstractEvent)barrier);
        Assert.assertEquals((long)0L, (long)partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        for (int i = 0; i < numberOfChannels; ++i) {
            Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(i));
            ResultSubpartitionView view = partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
            Assert.assertTrue((boolean)boe.isEvent());
            Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            Assert.assertFalse((boolean)view.isAvailable(Integer.MAX_VALUE));
        }
    }

    @Test
    public void testBroadcastEventMixedRecords() throws Exception {
        XORShiftRandom rand = new XORShiftRandom();
        int numberOfChannels = 4;
        int bufferSize = 32;
        int lenBytes = 4;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numberOfChannels);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        CheckpointBarrier barrier = new CheckpointBarrier(2147484939L, 2147483846L, CheckpointOptions.forCheckpointWithDefaultLocation());
        byte[] bytes = new byte[bufferSize / 2];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize + 1];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize - lenBytes];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        writer.broadcastEvent((AbstractEvent)barrier);
        if (this.isBroadcastWriter) {
            Assert.assertEquals((long)3L, (long)partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            for (int i = 0; i < numberOfChannels; ++i) {
                Assert.assertEquals((long)4L, (long)partition.getNumberOfQueuedBuffers(i));
                ResultSubpartitionView view = partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                for (int j = 0; j < 3; ++j) {
                    Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer());
                }
                BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            }
        } else {
            Assert.assertEquals((long)4L, (long)partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            ResultSubpartitionView[] views = new ResultSubpartitionView[4];
            Assert.assertEquals((long)2L, (long)partition.getNumberOfQueuedBuffers(0));
            views[0] = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer());
            Assert.assertEquals((long)3L, (long)partition.getNumberOfQueuedBuffers(1));
            views[1] = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
            Assert.assertEquals((long)2L, (long)partition.getNumberOfQueuedBuffers(2));
            views[2] = partition.createSubpartitionView(2, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer());
            views[3] = partition.createSubpartitionView(3, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(3));
            for (int i = 0; i < numberOfChannels; ++i) {
                BufferOrEvent boe = RecordWriterTest.parseBuffer(views[i].getNextBuffer().buffer(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)barrier, (Object)boe.getEvent());
            }
        }
    }

    @Test
    public void testBroadcastEventBufferReferenceCounting() throws Exception {
        int i;
        int bufferSize = 32768;
        int numSubpartitions = 2;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numSubpartitions);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        Buffer[] buffers = new Buffer[numSubpartitions];
        for (i = 0; i < numSubpartitions; ++i) {
            Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(i));
            ResultSubpartitionView view = partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            buffers[i] = view.getNextBuffer().buffer();
            Assert.assertTrue((boolean)RecordWriterTest.parseBuffer(buffers[i], i).isEvent());
        }
        for (i = 0; i < numSubpartitions; ++i) {
            Assert.assertTrue((boolean)buffers[i].isRecycled());
        }
    }

    @Test
    public void testBroadcastEventBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(true);
    }

    @Test
    public void testBroadcastEmitBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(false);
    }

    @Test
    public void testBroadcastEmitRecord() throws Exception {
        int numberOfChannels = 4;
        int bufferSize = 32;
        int numValues = 8;
        int serializationLength = 4;
        ResultPartition partition = RecordWriterTest.createResultPartition(32, 4);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
        Util.MockRecords records = Util.randomRecords((int)8, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT);
        for (SerializationTestType record : records) {
            serializedRecords.add(record);
            writer.broadcastEmit((IOReadableWritable)record);
        }
        int numRequiredBuffers = 2;
        if (this.isBroadcastWriter) {
            Assert.assertEquals((long)2L, (long)partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        } else {
            Assert.assertEquals((long)8L, (long)partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        }
        for (int i = 0; i < 4; ++i) {
            Assert.assertEquals((long)2L, (long)partition.getNumberOfQueuedBuffers(i));
            ResultSubpartitionView view = partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            this.verifyDeserializationResults(view, (RecordDeserializer<SerializationTestType>)deserializer, (ArrayDeque<SerializationTestType>)serializedRecords.clone(), 2, 8);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNot() throws Exception {
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        ResultPartition resultPartition = new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        resultPartition.setup();
        RecordWriter recordWriter = this.createRecordWriter((ResultPartitionWriter)resultPartition);
        try {
            Assert.assertTrue((boolean)recordWriter.getAvailableFuture().isDone());
            BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0);
            Assert.assertNotNull((Object)bufferBuilder);
            Assert.assertFalse((boolean)recordWriter.getAvailableFuture().isDone());
            Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
            buffer.recycleBuffer();
            Assert.assertTrue((boolean)recordWriter.getAvailableFuture().isDone());
            Assert.assertEquals((Object)RecordWriter.AVAILABLE, (Object)recordWriter.getAvailableFuture());
        }
        finally {
            localPool.lazyDestroy();
            globalPool.destroy();
        }
    }

    private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
        ResultPartition partition = RecordWriterTest.createResultPartition(4096, 2);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        if (broadcastEvent) {
            writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        } else {
            writer.broadcastEmit((IOReadableWritable)new IntValue(0));
        }
        Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(0));
        Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(1));
        ResultSubpartitionView view0 = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        ResultSubpartitionView view1 = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer1 = view0.getNextBuffer().buffer();
        Buffer buffer2 = view1.getNextBuffer().buffer();
        Assert.assertEquals((long)0L, (long)buffer1.getReaderIndex());
        Assert.assertEquals((long)0L, (long)buffer2.getReaderIndex());
        buffer1.setReaderIndex(1);
        Assert.assertEquals((String)"Buffer 2 shares the same reader index as buffer 1", (long)0L, (long)buffer2.getReaderIndex());
    }

    protected void verifyDeserializationResults(ResultSubpartitionView view, RecordDeserializer<SerializationTestType> deserializer, ArrayDeque<SerializationTestType> expectedRecords, int numRequiredBuffers, int numValues) throws Exception {
        int assertRecords = 0;
        for (int j = 0; j < numRequiredBuffers; ++j) {
            Buffer buffer = view.getNextBuffer().buffer();
            deserializer.setNextBuffer(buffer);
            assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
        }
        Assert.assertFalse((boolean)view.isAvailable(Integer.MAX_VALUE));
        Assert.assertEquals((long)numValues, (long)assertRecords);
    }

    private RecordWriter createRecordWriter(ResultPartitionWriter writer) {
        if (this.isBroadcastWriter) {
            return new RecordWriterBuilder().setChannelSelector((ChannelSelector)new OutputEmitter(ShipStrategyType.BROADCAST, 0)).build(writer);
        }
        return new RecordWriterBuilder().build(writer);
    }

    public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException {
        NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build();
        ResultPartition partition = PartitionTestUtils.createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions);
        partition.setup();
        return partition;
    }

    static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException {
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
        }
        AbstractEvent event = EventSerializer.fromBuffer((Buffer)buffer, (ClassLoader)RecordWriterTest.class.getClassLoader());
        buffer.recycleBuffer();
        return new BufferOrEvent(event, new InputChannelInfo(0, targetChannel));
    }

    private static class ByteArrayIO
    implements IOReadableWritable {
        private final byte[] bytes;

        public ByteArrayIO(byte[] bytes) {
            this.bytes = bytes;
        }

        public void write(DataOutputView out) throws IOException {
            out.write(this.bytes);
        }

        public void read(DataInputView in) throws IOException {
            in.readFully(this.bytes);
        }
    }
}

