/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.NeverCompletingChannelFuture;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class PartitionRequestClientFactoryTest {
    private static final int SERVER_PORT = NetUtils.getAvailablePort();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInterruptsNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            AwaitingNettyClient nettyClient = new AwaitingNettyClient(nettyServerAndClient.client());
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)nettyClient, 0);
            nettyClient.awaitForInterrupts = true;
            this.connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(0));
            nettyClient.awaitForInterrupts = false;
            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0));
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory factory, ConnectionID connectionId) throws Exception {
        CompletableFuture started = new CompletableFuture();
        CompletableFuture interrupted = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                started.complete(null);
                factory.createPartitionRequestClient(connectionId);
            }
            catch (InterruptedException e) {
                interrupted.complete(null);
            }
            catch (Exception e) {
                interrupted.completeExceptionally(e);
            }
        });
        thread.start();
        started.get();
        thread.interrupt();
        interrupted.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionsAreNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new UnstableNettyClient(nettyServerAndClient.client(), 1), 0);
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(0);
            try {
                factory.createPartitionRequestClient(connectionID);
                Assert.fail((String)"Expected the first request to fail.");
            }
            catch (RemoteTransportException remoteTransportException) {
                // empty catch block
            }
            factory.createPartitionRequestClient(connectionID);
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    @Test
    public void testNettyClientConnectRetry() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2);
        factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    @Test(expected=IOException.class)
    public void testFailureReportedToSubsequentRequests() throws Exception {
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new FailingNettyClient(), 2);
        try {
            factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
        }
        catch (Exception exception) {
            // empty catch block
        }
        factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IOException.class)
    public void testNettyClientConnectRetryFailure() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 3);
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2);
            factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
        }
        finally {
            serverAndClient.client().shutdown();
            serverAndClient.server().shutdown();
        }
    }

    @Test
    public void testNettyClientConnectRetryMultipleThread() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2);
        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
        ArrayList<Future<NettyPartitionRequestClient>> futures = new ArrayList<Future<NettyPartitionRequestClient>>();
        for (int i = 0; i < 10; ++i) {
            Future<NettyPartitionRequestClient> future = threadPoolExecutor.submit(() -> {
                NettyPartitionRequestClient client = null;
                try {
                    client = factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
                }
                catch (Exception e) {
                    Assert.fail((String)e.getMessage());
                }
                return client;
            });
            futures.add(future);
        }
        futures.forEach(runnableFuture -> {
            try {
                NettyPartitionRequestClient client = (NettyPartitionRequestClient)runnableFuture.get();
                Assert.assertNotNull((Object)client);
            }
            catch (Exception e) {
                System.out.println(e.getMessage());
                Assert.fail();
            }
        });
        threadPoolExecutor.shutdown();
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler)Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(NettyProtocol protocol) throws IOException {
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32768, 1, new Configuration());
        NettyServer server = new NettyServer(config);
        NettyClient client = new NettyClient(config);
        boolean success = false;
        try {
            NettyBufferPool bufferPool = new NettyBufferPool(1);
            server.init(protocol, bufferPool);
            client.init(protocol, bufferPool);
            success = true;
        }
        finally {
            if (!success) {
                server.shutdown();
                client.shutdown();
            }
        }
        return new Tuple2((Object)server, (Object)client);
    }

    private static class UncaughtTestExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private final List<Throwable> errors = new ArrayList<Throwable>(1);

        private UncaughtTestExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.errors.add(e);
        }

        private List<Throwable> getErrors() {
            return this.errors;
        }
    }

    private static class CountDownLatchOnConnectHandler
    extends ChannelOutboundHandlerAdapter {
        private final CountDownLatch syncOnConnect;

        public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
            this.syncOnConnect = syncOnConnect;
        }

        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            this.syncOnConnect.countDown();
        }
    }

    private static class AwaitingNettyClient
    extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient client) {
            super(null);
            this.client = client;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(serverSocketAddress);
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }
    }

    private static class FailingNettyClient
    extends NettyClient {
        FailingNettyClient() {
            super(null);
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    private static class UnstableNettyClient
    extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int retry) {
            super(null);
            this.nettyClient = nettyClient;
            this.retry = retry;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.retry > 0) {
                --this.retry;
                throw new ChannelException("Simulate connect failure");
            }
            return this.nettyClient.connect(serverSocketAddress);
        }
    }
}

