/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.io.File;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class AdaptiveSchedulerITCase
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int PARALLELISM = 4;
    private static final Configuration configuration = AdaptiveSchedulerITCase.getConfiguration();
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration conf = new Configuration();
        conf.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        conf.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, (Object)true);
        return conf;
    }

    @Before
    public void ensureAdaptiveSchedulerEnabled() {
        Assume.assumeTrue((boolean)ClusterOptions.isAdaptiveSchedulerEnabled((Configuration)configuration));
    }

    @After
    public void cancelRunningJobs() {
        MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobs();
    }

    @Test
    public void testGlobalFailoverCanRecoverState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource input = env.addSource((SourceFunction)new SimpleSource());
        input.addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    @Test
    public void testStopWithSavepointNoError() throws Exception {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        File savepointDirectory = this.tempFolder.newFolder("savepoint");
        String savepoint = (String)client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get();
        Assert.assertThat((Object)savepoint, (Matcher)CoreMatchers.containsString((String)savepointDirectory.getAbsolutePath()));
        Assert.assertThat(client.getJobStatus().get(), (Matcher)CoreMatchers.is((Object)JobStatus.FINISHED));
    }

    @Test
    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        try {
            client.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath()).get();
            Assert.fail((String)"Expect exception");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
        }
        CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get() == JobStatus.RUNNING, (Deadline)Deadline.fromNow((Duration)Duration.of(1L, ChronoUnit.MINUTES)));
    }

    @Test
    public void testStopWithSavepointFailOnStop() throws Exception {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        try {
            client.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath()).get();
            Assert.fail((String)"Expect exception");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
        }
        CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get() == JobStatus.RUNNING, (Deadline)Deadline.fromNow((Duration)Duration.of(1L, ChronoUnit.MINUTES)));
    }

    @Test
    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        env.setParallelism(4);
        env.addSource((SourceFunction)new DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY)).addSink((SinkFunction)new DiscardingSink());
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        DummySource.resetForParallelism(4);
        File savepointDirectory = this.tempFolder.newFolder("savepoint");
        try {
            client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get();
            Assert.fail((String)"Expect failure of operation");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
        }
        DummySource.awaitRunning();
        File[] files = savepointDirectory.listFiles();
        if (files.length > 0) {
            Assert.fail((String)("Found unexpected files: " + Arrays.stream(files).map(File::getAbsolutePath).collect(Collectors.joining(", "))));
        }
        String savepoint = (String)client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get();
        Assert.assertThat((Object)savepoint, (Matcher)CoreMatchers.containsString((String)savepointDirectory.getAbsolutePath()));
    }

    private static StreamExecutionEnvironment getEnvWithSource(StopWithSavepointTestBehavior behavior) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new DummySource(behavior)).addSink((SinkFunction)new DiscardingSink());
        return env;
    }

    public static final class SimpleSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointListener,
    CheckpointedFunction {
        private static final ListStateDescriptor<Boolean> unionStateListDescriptor = new ListStateDescriptor("state", Boolean.class);
        private volatile boolean running = true;
        @Nullable
        private ListState<Boolean> unionListState = null;
        private boolean hasFailedBefore = false;
        private boolean fail = false;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running && !this.hasFailedBefore) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.getRuntimeContext().getIndexOfThisSubtask());
                    Thread.sleep(5L);
                }
                if (!this.fail) continue;
                throw new FlinkException("Test failure.");
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.fail = true;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.unionListState = context.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
            for (Boolean previousState : (Iterable)this.unionListState.get()) {
                this.hasFailedBefore |= previousState.booleanValue();
            }
            this.unionListState.clear();
            this.unionListState.add((Object)true);
        }
    }

    private static final class DummySource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private final StopWithSavepointTestBehavior behavior;
        private volatile boolean running = true;
        private static volatile CountDownLatch instancesRunning;
        private volatile boolean checkpointComplete = false;

        public DummySource(StopWithSavepointTestBehavior behavior) {
            this.behavior = behavior;
        }

        private static void resetForParallelism(int para) {
            instancesRunning = new CountDownLatch(para);
        }

        private static void awaitRunning() throws InterruptedException {
            Preconditions.checkNotNull((Object)instancesRunning);
            instancesRunning.await();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Preconditions.checkNotNull((Object)instancesRunning);
            instancesRunning.countDown();
            int i = Integer.MIN_VALUE;
            while (this.running) {
                Thread.sleep(10L);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)i++);
                }
            }
        }

        public void cancel() {
            this.running = false;
            if (this.checkpointComplete && this.behavior == StopWithSavepointTestBehavior.FAIL_ON_STOP) {
                throw new RuntimeException(this.behavior.name());
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT) {
                throw new RuntimeException(this.behavior.name());
            }
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY && context.getCheckpointId() == 1L) {
                throw new RuntimeException(this.behavior.name());
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.checkpointComplete = true;
        }
    }

    private static enum StopWithSavepointTestBehavior {
        NO_FAILURE,
        FAIL_ON_CHECKPOINT,
        FAIL_ON_STOP,
        FAIL_ON_FIRST_CHECKPOINT_ONLY;

    }
}

