/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;

public class AkkaRpcServiceTest
extends TestLogger {
    private static final Time TIMEOUT = Time.milliseconds((long)10000L);
    private static ActorSystem actorSystem;
    private static AkkaRpcService akkaRpcService;

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        akkaRpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture rpcTerminationFuture = akkaRpcService.stopService();
        CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava((Future)actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        actorSystem = null;
        akkaRpcService = null;
    }

    @Test
    public void testScheduleRunnable() throws Exception {
        OneShotLatch latch = new OneShotLatch();
        long delay = 100L;
        long start = System.nanoTime();
        ScheduledFuture scheduledFuture = akkaRpcService.scheduleRunnable(() -> ((OneShotLatch)latch).trigger(), 100L, TimeUnit.MILLISECONDS);
        scheduledFuture.get();
        Assert.assertTrue((boolean)latch.isTriggered());
        long stop = System.nanoTime();
        Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 100L ? 1 : 0) != 0);
    }

    @Test
    public void testExecuteRunnable() throws Exception {
        OneShotLatch latch = new OneShotLatch();
        akkaRpcService.execute(() -> ((OneShotLatch)latch).trigger());
        latch.await(30L, TimeUnit.SECONDS);
    }

    @Test
    public void testExecuteCallable() throws Exception {
        OneShotLatch latch = new OneShotLatch();
        int expected = 42;
        CompletableFuture result = akkaRpcService.execute(() -> {
            latch.trigger();
            return 42;
        });
        int actual = (Integer)result.get(30L, TimeUnit.SECONDS);
        Assert.assertEquals((long)42L, (long)actual);
        Assert.assertTrue((boolean)latch.isTriggered());
    }

    @Test
    public void testGetAddress() {
        Assert.assertEquals((Object)AkkaUtils.getAddress((ActorSystem)actorSystem).host().get(), (Object)akkaRpcService.getAddress());
    }

    @Test
    public void testGetPort() {
        Assert.assertEquals((Object)AkkaUtils.getAddress((ActorSystem)actorSystem).port().get(), (Object)akkaRpcService.getPort());
    }

    @Test(timeout=60000L)
    public void testTerminationFuture() throws Exception {
        AkkaRpcService rpcService = this.startAkkaRpcService();
        CompletableFuture terminationFuture = rpcService.getTerminationFuture();
        Assert.assertFalse((boolean)terminationFuture.isDone());
        rpcService.stopService();
        terminationFuture.get();
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServiceSimpleSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        OneShotLatch latch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.schedule(() -> ((OneShotLatch)latch).trigger(), 10L, TimeUnit.MILLISECONDS);
        future.get();
        Assert.assertTrue((boolean)latch.isTriggered());
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServicePeriodicSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleAtFixedRate(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        countDownLatch.await();
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        long finalTime = System.nanoTime() - currentTime;
        Assert.assertTrue((finalTime >= 40L ? 1 : 0) != 0);
        future.cancel(true);
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        countDownLatch.await();
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        long finalTime = System.nanoTime() - currentTime;
        Assert.assertTrue((finalTime >= 40L ? 1 : 0) != 0);
        future.cancel(true);
    }

    @Test
    public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        long delay = 10L;
        OneShotLatch futureTask = new OneShotLatch();
        OneShotLatch latch = new OneShotLatch();
        OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> {
            try {
                if (futureTask.isTriggered()) {
                    shouldNotBeTriggeredLatch.trigger();
                } else {
                    futureTask.trigger();
                    latch.await();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }, delay, delay, TimeUnit.MILLISECONDS);
        futureTask.await();
        future.cancel(false);
        latch.trigger();
        try {
            shouldNotBeTriggeredLatch.await(5L * delay, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The shouldNotBeTriggeredLatch should never be triggered.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAkkaRpcServiceShutDownWithRpcEndpoints() throws Exception {
        AkkaRpcService akkaRpcService = this.startAkkaRpcService();
        try {
            int numberActors = 5;
            CompletableFuture terminationFuture = akkaRpcService.getTerminationFuture();
            Collection<CompletableFuture<Void>> onStopFutures = this.startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService, 5);
            for (CompletableFuture<Void> onStopFuture : onStopFutures) {
                onStopFuture.complete(null);
            }
            terminationFuture.get();
            Assert.assertThat((Object)akkaRpcService.getActorSystem().whenTerminated().isCompleted(), (Matcher)Matchers.is((Object)true));
        }
        finally {
            RpcUtils.terminateRpcService((RpcService)akkaRpcService, (Time)TIMEOUT);
        }
    }

    @Test
    public void testAkkaRpcServiceShutDownWithFailingRpcEndpoints() throws Exception {
        AkkaRpcService akkaRpcService = this.startAkkaRpcService();
        int numberActors = 5;
        CompletableFuture terminationFuture = akkaRpcService.getTerminationFuture();
        Collection<CompletableFuture<Void>> onStopFutures = this.startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService, 5);
        Iterator<CompletableFuture<Void>> iterator = onStopFutures.iterator();
        for (int i = 0; i < 4; ++i) {
            iterator.next().complete(null);
        }
        iterator.next().completeExceptionally((Throwable)((Object)new OnStopException("onStop exception occurred.")));
        for (CompletableFuture<Void> onStopFuture : onStopFutures) {
            onStopFuture.complete(null);
        }
        try {
            terminationFuture.get();
            Assert.fail((String)"Expected the termination future to complete exceptionally.");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, OnStopException.class).isPresent(), (Matcher)Matchers.is((Object)true));
        }
        Assert.assertThat((Object)akkaRpcService.getActorSystem().whenTerminated().isCompleted(), (Matcher)Matchers.is((Object)true));
    }

    private Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStopEndpoints(AkkaRpcService akkaRpcService, int numberActors) throws Exception {
        ArrayList<CompletableFuture<Void>> onStopFutures = new ArrayList<CompletableFuture<Void>>(numberActors);
        CountDownLatch countDownLatch = new CountDownLatch(numberActors);
        for (int i = 0; i < numberActors; ++i) {
            CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
            CountingAsynchronousOnStopEndpoint endpoint = new CountingAsynchronousOnStopEndpoint((RpcService)akkaRpcService, onStopFuture, countDownLatch);
            endpoint.start();
            onStopFutures.add(onStopFuture);
        }
        CompletableFuture terminationFuture = akkaRpcService.stopService();
        Assert.assertThat((Object)terminationFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)akkaRpcService.getActorSystem().whenTerminated().isCompleted(), (Matcher)Matchers.is((Object)false));
        countDownLatch.await();
        return onStopFutures;
    }

    @Nonnull
    private AkkaRpcService startAkkaRpcService() {
        ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
        return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    private static class OnStopException
    extends FlinkException {
        private static final long serialVersionUID = 7136609202083168954L;

        public OnStopException(String message) {
            super(message);
        }
    }

    private static class CountingAsynchronousOnStopEndpoint
    extends AkkaRpcActorTest.AsynchronousOnStopEndpoint {
        private final CountDownLatch countDownLatch;

        protected CountingAsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture, CountDownLatch countDownLatch) {
            super(rpcService, onStopFuture);
            this.countDownLatch = countDownLatch;
        }

        @Override
        public CompletableFuture<Void> onStop() {
            this.countDownLatch.countDown();
            return super.onStop();
        }
    }
}

