/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ReactiveModeITCase
extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
    private static final Configuration configuration = ReactiveModeITCase.getReactiveModeConfiguration();
    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getReactiveModeConfiguration() {
        Configuration conf = new Configuration();
        conf.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        return conf;
    }

    @Before
    public void assumeDeclarativeResourceManagement() {
        Assume.assumeTrue((boolean)ClusterOptions.isDeclarativeResourceManagementEnabled((Configuration)configuration));
    }

    @Test
    public void testScaleLimitByMaxParallelism() throws Exception {
        this.startAdditionalTaskManager();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator input = env.addSource((SourceFunction)new FailOnParallelExecutionSource()).setMaxParallelism(1);
        input.addSink((SinkFunction)new DiscardingSink());
        env.executeAsync();
        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
    }

    @Test
    public void testScaleUpOnAdditionalTaskManager() throws Exception {
        ParallelismTrackingSource.resetParallelismTracker();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.addSource((SourceFunction)new ParallelismTrackingSource());
        input.addSink((SinkFunction)new DiscardingSink());
        env.executeAsync();
        ParallelismTrackingSource.waitForScaleUpToParallelism(2);
        this.miniClusterResource.getMiniCluster().startTaskManager();
        ParallelismTrackingSource.waitForScaleUpToParallelism(4);
    }

    @Test
    public void testScaleDownOnTaskManagerLoss() throws Exception {
        ParallelismTrackingSource.resetParallelismTracker();
        this.startAdditionalTaskManager();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        DataStreamSource input = env.addSource((SourceFunction)new ParallelismTrackingSource());
        input.addSink((SinkFunction)new DiscardingSink());
        env.executeAsync();
        ParallelismTrackingSource.waitForScaleUpToParallelism(4);
        this.miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
        ParallelismTrackingSource.waitForScaleUpToParallelism(2);
    }

    private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
        return ((ClusterOverview)this.miniClusterResource.getMiniCluster().requestClusterOverview().get()).getNumTaskManagersConnected();
    }

    private void startAdditionalTaskManager() throws Exception {
        this.miniClusterResource.getMiniCluster().startTaskManager();
        CommonTestUtils.waitUntilCondition(() -> this.getNumberOfConnectedTaskManagers() == 2, (Deadline)Deadline.fromNow((Duration)Duration.ofMillis(10000L)));
    }

    private static class FailOnParallelExecutionSource
    extends RichParallelSourceFunction<String> {
        private volatile boolean running = true;
        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();

        private FailOnParallelExecutionSource() {
        }

        public static void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
            tracker.waitForScaleUpToParallelism(parallelism);
        }

        public static void resetParallelismTracker() {
            tracker.reset();
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
                throw new IllegalStateException("This is not supposed to be executed in parallel, despite extending the right base class.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            tracker.reportNewInstance();
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)"test");
                }
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void close() throws Exception {
            tracker.reportStoppedInstance();
        }
    }

    private static class InstanceParallelismTracker {
        private final Object lock = new Object();
        private int instances = 0;

        private InstanceParallelismTracker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void reportStoppedInstance() {
            Object object = this.lock;
            synchronized (object) {
                --this.instances;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void reportNewInstance() {
            Object object = this.lock;
            synchronized (object) {
                ++this.instances;
                this.lock.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
            Object object = this.lock;
            synchronized (object) {
                while (this.instances != parallelism) {
                    this.lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void reset() {
            Object object = this.lock;
            synchronized (object) {
                this.instances = 0;
            }
        }
    }

    private static class ParallelismTrackingSource
    extends RichParallelSourceFunction<String> {
        private volatile boolean running = true;
        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();

        private ParallelismTrackingSource() {
        }

        public static void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
            tracker.waitForScaleUpToParallelism(parallelism);
        }

        public static void resetParallelismTracker() {
            tracker.reset();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            tracker.reportNewInstance();
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)"test");
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void close() throws Exception {
            tracker.reportStoppedInstance();
        }
    }
}

