/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;

public class ChannelStateWriteRequestDispatcherImplTest {
    @Test
    public void testPartialInputChannelStateWrite() throws Exception {
        this.testBuffersRecycled(buffers -> ChannelStateWriteRequest.write((long)1L, (InputChannelInfo)new InputChannelInfo(1, 2), (CloseableIterator)CloseableIterator.ofElements(Buffer::recycleBuffer, (Object[])buffers)));
    }

    @Test
    public void testPartialResultSubpartitionStateWrite() throws Exception {
        this.testBuffersRecycled(buffers -> ChannelStateWriteRequest.write((long)1L, (ResultSubpartitionInfo)new ResultSubpartitionInfo(1, 2), (Buffer[])buffers));
    }

    private void testBuffersRecycled(Function<NetworkBuffer[], ChannelStateWriteRequest> requestBuilder) throws Exception {
        ChannelStateWriteRequestDispatcherImpl dispatcher = new ChannelStateWriteRequestDispatcherImpl(0, (CheckpointStorageWorkerView)new MemoryBackendCheckpointStorageAccess(new JobID(), null, null, 1), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        dispatcher.dispatch(ChannelStateWriteRequest.start((long)1L, (ChannelStateWriter.ChannelStateWriteResult)result, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        result.getResultSubpartitionStateHandles().completeExceptionally(new TestException());
        result.getInputChannelStateHandles().completeExceptionally(new TestException());
        NetworkBuffer[] buffers = new NetworkBuffer[]{this.buffer(), this.buffer()};
        dispatcher.dispatch(requestBuilder.apply(buffers));
        for (NetworkBuffer buffer : buffers) {
            Assert.assertTrue((boolean)buffer.isRecycled());
        }
    }

    private NetworkBuffer buffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)10), FreeingBufferRecycler.INSTANCE);
    }
}

