/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    @Rule
    public final TemporaryFolder folder = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    @Rule
    public SharedObjects sharedObjects = SharedObjects.create();
    private static OneShotLatch failingPipelineLatch;
    private static OneShotLatch succeedingPipelineLatch;
    private static final int ITER_TEST_PARALLELISM = 1;
    private static OneShotLatch[] iterTestSnapshotWait;
    private static OneShotLatch[] iterTestRestoreWait;
    private static int[] iterTestCheckpointVerify;

    @Before
    public void setUp() throws Exception {
        File testRoot = this.folder.newFolder();
        this.checkpointDir = new File(testRoot, "checkpoints");
        this.savepointDir = new File(testRoot, "savepoints");
        if (!this.checkpointDir.mkdir() || !this.savepointDir.mkdirs()) {
            Assert.fail((String)"Test setup failed: failed to create temporary directories.");
        }
    }

    @Test
    public void testStopWithSavepointForFlip27SourceWithDrain() throws Exception {
        this.testStopWithSavepointForFlip27Source(true);
    }

    @Test
    public void testStopWithSavepointForFlip27SourceWithoutDrain() throws Exception {
        this.testStopWithSavepointForFlip27Source(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testStopWithSavepointForFlip27Source(boolean drain) throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        BoundedPassThroughOperator operator = new BoundedPassThroughOperator(ChainingStrategy.ALWAYS);
        SingleOutputStreamOperator stream = env.fromSequence(0L, Long.MAX_VALUE).transform("pass-through", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, operator);
        stream.addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobID jobId = jobGraph.getJobID();
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            BoundedPassThroughOperator.resetForTest(1, true);
            client.submitJob(jobGraph).get();
            BoundedPassThroughOperator.getProgressLatch().await();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobId, (boolean)false);
            client.stopWithSavepoint(jobId, drain, null, SavepointFormatType.CANONICAL).get();
            if (drain) {
                Assert.assertTrue((boolean)BoundedPassThroughOperator.inputEnded);
            } else {
                Assert.assertFalse((boolean)BoundedPassThroughOperator.inputEnded);
            }
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopWithSavepointWithDrainCallsFinishBeforeSnapshotState() throws Exception {
        int sinkParallelism = 5;
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(sinkParallelism + 1).build());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.addSource((SourceFunction)new InfiniteTestSource()).setParallelism(1).name("Infinite Source").addSink(new FinishingSink()).setParallelism(sinkParallelism);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        cluster.before();
        try {
            ClusterClient client = cluster.getClusterClient();
            client.submitJob(jobGraph).get();
            SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID());
            client.stopWithSavepoint(jobGraph.getJobID(), true, this.savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopWithSavepointFailsOverToSavepoint() throws Throwable {
        int sinkParallelism = 5;
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(sinkParallelism + 1).build());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)10L));
        env.setParallelism(1);
        env.addSource((SourceFunction)new InfiniteTestSource()).name("Infinite Source").map((MapFunction)new FailingOnCompletedSavepointMapFunction(2L)).addSink((SinkFunction)new DiscardingSink()).setParallelism(sinkParallelism);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        cluster.before();
        try {
            ClusterClient client = cluster.getClusterClient();
            client.submitJob(jobGraph).get();
            SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID());
            cluster.getMiniCluster().triggerCheckpoint(jobGraph.getJobID()).get();
            CompletableFuture savepointCompleted = client.stopWithSavepoint(jobGraph.getJobID(), true, this.savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL);
            Throwable savepointException = ((ExecutionException)Assert.assertThrows(ExecutionException.class, savepointCompleted::get)).getCause();
            ExceptionUtils.assertThrowable((Throwable)savepointException, throwable -> throwable instanceof StopWithSavepointStoppingException && throwable.getMessage().startsWith("A savepoint has been created at: "));
            Assert.assertThat(client.getJobStatus(jobGraph.getJobID()).get(), (Matcher)CoreMatchers.either((Matcher)CoreMatchers.is((Object)JobStatus.FAILED)).or(CoreMatchers.is((Object)JobStatus.FAILING)));
        }
        finally {
            cluster.after();
        }
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        this.verifySavepoint(4, savepointPath);
        this.restoreJobAndVerifyState(savepointPath, clusterFactory, 4);
    }

    @Test
    public void testTriggerSavepointAndResumeWithClaim() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        this.verifySavepoint(4, savepointPath);
        this.restoreJobAndVerifyState(clusterFactory, 4, SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false, (RestoreMode)RestoreMode.CLAIM), cluster -> {
            cluster.after();
            Assert.assertFalse((String)"Savepoint not properly cleaned up.", (boolean)new File(new URI(savepointPath)).exists());
        });
    }

    @Test
    public void testTriggerSavepointAndResumeWithLegacyMode() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        this.verifySavepoint(4, savepointPath);
        this.restoreJobAndVerifyState(clusterFactory, 4, SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false, (RestoreMode)RestoreMode.LEGACY), cluster -> {
            cluster.after();
            Assert.assertTrue((String)"Savepoint unexpectedly cleaned up.", (boolean)new File(new URI(savepointPath)).exists());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="Disabling this test because it regularly fails on AZP. See FLINK-25427.")
    public void testTriggerSavepointAndResumeWithNoClaim() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend(true));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage(this.folder.newFolder().toURI());
        env.setParallelism(4);
        final SharedReference counter = this.sharedObjects.add((Object)new CountDownLatch(10000));
        env.fromSequence(1L, Long.MAX_VALUE).keyBy((KeySelector & Serializable)i -> i % 4L).process((KeyedProcessFunction)new KeyedProcessFunction<Long, Long, Long>(){
            private ListState<Long> last;

            public void open(Configuration parameters) {
                this.last = this.getRuntimeContext().getListState(new ListStateDescriptor("last", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
            }

            public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Long> out) throws Exception {
                this.last.add((Object)value);
                out.collect((Object)value);
            }
        }).addSink((SinkFunction)new SinkFunction<Long>(){

            public void invoke(Long value) {
                counter.consumeSync(CountDownLatch::countDown);
            }
        }).setParallelism(1);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        cluster.before();
        try {
            JobID jobID1 = new JobID();
            jobGraph.setJobID(jobID1);
            cluster.getClusterClient().submitJob(jobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobID1, (boolean)false);
            ((CountDownLatch)counter.get()).await();
            String firstCheckpoint = (String)cluster.getMiniCluster().triggerCheckpoint(jobID1).get();
            cluster.getClusterClient().cancel(jobID1).get();
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)firstCheckpoint, (boolean)false, (RestoreMode)RestoreMode.NO_CLAIM));
            JobID jobID2 = new JobID();
            jobGraph.setJobID(jobID2);
            cluster.getClusterClient().submitJob(jobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobID2, (boolean)false);
            String secondCheckpoint = (String)cluster.getMiniCluster().triggerCheckpoint(jobID2).get();
            cluster.getClusterClient().cancel(jobID2).get();
            FileUtils.deleteDirectory((File)Paths.get(new URI(firstCheckpoint)).getParent().toFile());
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)secondCheckpoint, (boolean)false, (RestoreMode)RestoreMode.NO_CLAIM));
            JobID jobID3 = new JobID();
            jobGraph.setJobID(jobID3);
            cluster.getClusterClient().submitJob(jobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobID3, (boolean)false);
        }
        finally {
            cluster.after();
        }
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Path oldPath = new Path(savepointPath);
        Path newPath = new Path(this.folder.newFolder().toURI().toString());
        new Path(savepointPath).getFileSystem().rename(oldPath, newPath);
        this.verifySavepoint(4, newPath.toUri().toString());
        this.restoreJobAndVerifyState(newPath.toUri().toString(), clusterFactory, 4);
    }

    @Test
    public void testShouldAddEntropyToSavepointPath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getCheckpointingWithEntropyConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Assert.assertThat((Object)this.savepointDir, SavepointITCase.hasEntropyInFileStateHandlePaths());
        this.restoreJobAndVerifyState(clusterFactory, 4, SavepointRestoreSettings.forPath((String)savepointPath), cluster -> {
            URI localURI = new URI(savepointPath.replace("test-entropy:/", "file:/"));
            Assert.assertTrue((String)"Savepoint has not been created", (boolean)new File(localURI).exists());
            cluster.getClusterClient().disposeSavepoint(savepointPath).get();
            Assert.assertFalse((String)"Savepoint not properly cleaned up.", (boolean)new File(localURI).exists());
        });
    }

    private Configuration getCheckpointingWithEntropyConfig() {
        String savepointPathWithEntropyPlaceholder = new File(this.savepointDir, "_entropy_").getPath();
        Configuration config = this.getFileBasedCheckpointsConfig("test-entropy://" + savepointPathWithEntropyPlaceholder);
        config.setString("s3.entropy.key", "_entropy_");
        return config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String submitJobAndTakeSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            client.submitJob(jobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobId, (boolean)false);
            StatefulCounter.getProgressLatch().await();
            String string = (String)client.cancelWithSavepoint(jobId, null, SavepointFormatType.CANONICAL).get();
            return string;
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    private void verifySavepoint(int parallelism, String savepointPath) throws URISyntaxException {
        File savepointDir = new File(new URI(savepointPath));
        Assert.assertTrue((String)"Savepoint directory does not exist.", (boolean)savepointDir.exists());
        Assert.assertTrue((String)"Savepoint did not create self-contained directory.", (boolean)savepointDir.isDirectory());
        Object[] savepointFiles = savepointDir.listFiles();
        if (savepointFiles != null) {
            String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: " + Arrays.toString(savepointFiles);
            Assert.assertEquals((String)errMsg, (long)(1 + parallelism), (long)savepointFiles.length);
        } else {
            Assert.fail((String)String.format("Returned savepoint path (%s) is not valid.", savepointPath));
        }
    }

    private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        this.restoreJobAndVerifyState(clusterFactory, parallelism, SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false), cluster -> {
            cluster.getClusterClient().disposeSavepoint(savepointPath).get();
            Assert.assertFalse((String)"Savepoint not properly cleaned up.", (boolean)new File(new URI(savepointPath)).exists());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreJobAndVerifyState(MiniClusterResourceFactory clusterFactory, int parallelism, SavepointRestoreSettings savepointRestoreSettings, PostCancelChecker postCancelChecks) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            client.submitJob(jobGraph).get();
            StatefulCounter.getRestoreLatch().await();
            StatefulCounter.getProgressLatch().await();
            client.cancel(jobId).get();
            FutureUtils.retrySuccessfulWithDelay(() -> client.getJobStatus(jobId), (Duration)Duration.ofMillis(50L), (Deadline)Deadline.now().plus(Duration.ofSeconds(30L)), status -> status == JobStatus.CANCELED, (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
            postCancelChecks.check(cluster);
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointForNonExistingJob() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobID jobID = new JobID();
        try {
            client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            ExceptionUtils.assertThrowable((Throwable)e, FlinkJobNotFoundException.class);
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)jobID.toString());
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobVertex vertex = new JobVertex("Blocking vertex");
        vertex.setInvokableClass(BlockingNoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{vertex});
        try {
            client.submitJob(graph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)graph.getJobID(), (boolean)false);
            client.triggerSavepoint(graph.getJobID(), null, SavepointFormatType.CANONICAL).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            ExceptionUtils.assertThrowable((Throwable)e, IllegalStateException.class);
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)graph.getJobID().toString());
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"is not a streaming job");
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    public void testTriggerSavepointWithoutCheckpointBaseLocations() throws Exception {
        MiniClusterWithClientResource cluster;
        block3: {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getCheckpointConfig().disableCheckpointing();
            env.setParallelism(1);
            env.addSource((SourceFunction)new IntegerStreamSource()).addSink((SinkFunction)new DiscardingSink());
            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
            Configuration config = this.getFileBasedCheckpointsConfig();
            config.addAll(jobGraph.getJobConfiguration());
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            String savepointPath = null;
            try {
                client.submitJob(jobGraph).get();
                CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
                savepointPath = (String)client.triggerSavepoint(jobGraph.getJobID(), null, SavepointFormatType.CANONICAL).get();
                Assert.assertNotNull((Object)savepointPath);
                client.cancel(jobGraph.getJobID()).get();
                Assert.assertEquals((long)0L, (long)Objects.requireNonNull(this.checkpointDir.listFiles()).length);
                if (null == savepointPath) break block3;
            }
            catch (Throwable throwable) {
                if (null != savepointPath) {
                    client.disposeSavepoint(savepointPath);
                }
                cluster.after();
                throw throwable;
            }
            client.disposeSavepoint(savepointPath);
        }
        cluster.after();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopSavepointWithBoundedInput() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
            MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BoundedPassThroughOperator operator = new BoundedPassThroughOperator(chainingStrategy);
            SingleOutputStreamOperator stream = env.addSource((SourceFunction)new InfiniteTestSource()).transform("pass-through", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, operator);
            stream.addSink((SinkFunction)new DiscardingSink());
            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
            JobID jobId = jobGraph.getJobID();
            MiniClusterWithClientResource cluster = clusterFactory.get();
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            try {
                BoundedPassThroughOperator.resetForTest(1, true);
                client.submitJob(jobGraph).get();
                BoundedPassThroughOperator.getProgressLatch().await();
                CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobId, (boolean)false);
                client.stopWithSavepoint(jobId, false, null, SavepointFormatType.CANONICAL).get();
                Assert.assertFalse((String)("input ended with chainingStrategy " + chainingStrategy), (boolean)BoundedPassThroughOperator.inputEnded);
            }
            finally {
                cluster.after();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int numTaskManagers = 1;
        int numSlotsPerTaskManager = 1;
        int parallelism = numTaskManagers * numSlotsPerTaskManager;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            int numberOfRetries = 1000;
            JobGraph jobGraph = this.createJobGraph(parallelism, numberOfRetries, 3600000L);
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"unknown path"));
            Assert.assertEquals((Object)"unknown path", (Object)jobGraph.getSavepointRestoreSettings().getRestorePath());
            LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
            try {
                TestUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)jobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (Exception e) {
                Optional expectedJobExecutionException = ExceptionUtils.findThrowable((Throwable)e, JobExecutionException.class);
                Optional expectedFileNotFoundException = ExceptionUtils.findThrowable((Throwable)e, FileNotFoundException.class);
                if (!expectedJobExecutionException.isPresent() || !expectedFileNotFoundException.isPresent()) {
                    throw e;
                }
            }
        }
        finally {
            cluster.after();
        }
    }

    @Test
    public void testStopWithSavepointFailingInSnapshotCreation() throws Exception {
        SavepointITCase.testStopWithFailingSourceInOnePipeline(new SnapshotFailingInfiniteTestSource(), this.folder.newFolder(), 2, SavepointITCase.assertInSnapshotCreationFailure(), true);
    }

    @Test
    public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception {
        CancelFailingInfiniteTestSource.checkpointCompleteTriggered = false;
        SavepointITCase.testStopWithFailingSourceInOnePipeline(new CancelFailingInfiniteTestSource(), this.folder.newFolder(), 0, (jobId, actualException) -> {
            Optional actualFlinkException = ExceptionUtils.findThrowable((Throwable)actualException, StopWithSavepointStoppingException.class);
            return actualFlinkException.map(e -> e.getMessage().startsWith("A savepoint has been created at:")).orElse(false);
        }, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted() throws Exception {
        int parallelism = 2;
        PathFailingFileSystem.resetFailingPath(this.savepointDir.getAbsolutePath() + ".*/_metadata");
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).build());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        env.addSource((SourceFunction)new InfiniteTestSource()).name("Infinite test source").addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        cluster.before();
        try {
            block5: {
                ClusterClient client = cluster.getClusterClient();
                client.submitJob(jobGraph).get();
                SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID());
                try {
                    client.stopWithSavepoint(jobGraph.getJobID(), true, "failPath://" + this.savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
                    Assert.fail((String)"The future should fail exceptionally.");
                }
                catch (ExecutionException ex) {
                    if (ExceptionUtils.findThrowableWithMessage((Throwable)ex, (String)"Expected IO exception").isPresent()) break block5;
                    throw ex;
                }
            }
            SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID());
        }
        finally {
            cluster.after();
        }
    }

    private static BiFunction<JobID, ExecutionException, Boolean> assertInSnapshotCreationFailure() {
        return (ignored, actualException) -> {
            if (ClusterOptions.isAdaptiveSchedulerEnabled((Configuration)new Configuration())) {
                return ExceptionUtils.findThrowable((Throwable)actualException, FlinkException.class).isPresent();
            }
            return ExceptionUtils.findThrowable((Throwable)actualException, CheckpointException.class).isPresent();
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void testStopWithFailingSourceInOnePipeline(InfiniteTestSource failingSource, File savepointDir, int expectedMaximumNumberOfRestarts, BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion, boolean shouldRestart) throws Exception {
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());
        failingPipelineLatch = new OneShotLatch();
        succeedingPipelineLatch = new OneShotLatch();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)expectedMaximumNumberOfRestarts, (long)0L));
        env.addSource((SourceFunction)failingSource).name("Failing Source").map((MapFunction & Serializable)value -> {
            failingPipelineLatch.trigger();
            return value;
        }).addSink((SinkFunction)new DiscardingSink());
        env.addSource((SourceFunction)new InfiniteTestSource()).name("Succeeding Source").map((MapFunction & Serializable)value -> {
            succeedingPipelineLatch.trigger();
            return value;
        }).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        cluster.before();
        try {
            ClusterClient client = cluster.getClusterClient();
            JobID jobID = (JobID)client.submitJob(jobGraph).get();
            failingPipelineLatch.await();
            succeedingPipelineLatch.await();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobID, (boolean)false);
            try {
                client.stopWithSavepoint(jobGraph.getJobID(), false, savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
                Assert.fail((String)"The future should fail exceptionally.");
            }
            catch (ExecutionException e) {
                ExceptionUtils.assertThrowable((Throwable)e, ex -> (Boolean)exceptionAssertion.apply(jobGraph.getJobID(), e));
            }
            if (shouldRestart) {
                SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID());
            }
        }
        finally {
            cluster.after();
        }
    }

    public static void waitUntilAllTasksAreRunning(RestClusterClient<?> restClusterClient, JobID jobId) throws Exception {
        JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
        JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
        params.jobPathParameter.resolve((Object)jobId);
        CommonTestUtils.waitUntilCondition(() -> (Boolean)((CompletableFuture)restClusterClient.sendRequest((MessageHeaders)detailsHeaders, (MessageParameters)params, (RequestBody)EmptyRequestBody.getInstance()).thenApply(detailsInfo -> SavepointITCase.allVerticesRunning(detailsInfo.getJobVerticesPerState()))).get());
    }

    private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
        return states.entrySet().stream().allMatch(entry -> {
            if (entry.getKey() == ExecutionState.RUNNING) {
                return (Integer)entry.getValue() > 0;
            }
            return (Integer)entry.getValue() == 0;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
        String savepointPath;
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 2;
        Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5L));
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        LOG.info("Flink configuration: " + config + ".");
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Shutting down Flink cluster.");
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            StatefulCounter statefulCounter = new StatefulCounter();
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction & Serializable)value -> 4 * value).shuffle().map((MapFunction)statefulCounter).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> 2 * value).addSink((SinkFunction)new DiscardingSink());
            JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
            JobID jobID = (JobID)client.submitJob(originalJobGraph).get();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobID, (boolean)false);
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            savepointPath = (String)client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL).get();
            LOG.info("Retrieved savepoint: " + savepointPath + ".");
        }
        finally {
            LOG.info("Shutting down Flink cluster.");
            cluster.after();
        }
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Restarting Flink cluster.");
        cluster.before();
        client = cluster.getClusterClient();
        try {
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter()).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> value).addSink((SinkFunction)new DiscardingSink());
            JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
            modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
            LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
            client.submitJob(modifiedJobGraph).get();
            Assert.assertTrue((boolean)StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        }
        finally {
            cluster.after();
        }
    }

    private JobGraph createJobGraph(int parallelism, int numberOfRetries, long restartDelay) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.disableOperatorChaining();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)numberOfRetries, (long)restartDelay));
        SingleOutputStreamOperator stream = env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter());
        stream.addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    public void testSavepointForJobWithIteration() throws Exception {
        MiniClusterWithClientResource cluster;
        block8: {
            for (int i = 0; i < 1; ++i) {
                SavepointITCase.iterTestSnapshotWait[i] = new OneShotLatch();
                SavepointITCase.iterTestRestoreWait[i] = new OneShotLatch();
                SavepointITCase.iterTestCheckpointVerify[i] = 0;
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            IntegerStreamSource source = new IntegerStreamSource();
            IterativeStream iteration = env.addSource((SourceFunction)source).flatMap((FlatMapFunction)new RichFlatMapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public void flatMap(Integer in, Collector<Integer> clctr) throws Exception {
                    clctr.collect((Object)in);
                }
            }).setParallelism(1).keyBy((KeySelector)new KeySelector<Integer, Object>(){
                private static final long serialVersionUID = 1L;

                public Object getKey(Integer value) throws Exception {
                    return value;
                }
            }).flatMap((FlatMapFunction)new DuplicateFilter()).setParallelism(1).iterate();
            SingleOutputStreamOperator iterationBody = iteration.map((MapFunction)new MapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public Integer map(Integer value) throws Exception {
                    return value;
                }
            }).setParallelism(1);
            iteration.closeWith((DataStream)iterationBody);
            StreamGraph streamGraph = env.getStreamGraph();
            JobGraph jobGraph = streamGraph.getJobGraph();
            Configuration config = this.getFileBasedCheckpointsConfig();
            config.addAll(jobGraph.getJobConfiguration());
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.ZERO);
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism()).build());
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            String savepointPath = null;
            try {
                client.submitJob(jobGraph).get();
                CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
                for (OneShotLatch latch : iterTestSnapshotWait) {
                    latch.await();
                }
                savepointPath = (String)client.triggerSavepoint(jobGraph.getJobID(), null, SavepointFormatType.CANONICAL).get();
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                jobGraph = streamGraph.getJobGraph();
                jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
                client.submitJob(jobGraph).get();
                for (OneShotLatch latch : iterTestRestoreWait) {
                    latch.await();
                }
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                if (null == savepointPath) break block8;
            }
            catch (Throwable throwable) {
                if (null != savepointPath) {
                    client.disposeSavepoint(savepointPath);
                }
                cluster.after();
                throw throwable;
            }
            client.disposeSavepoint(savepointPath);
        }
        cluster.after();
    }

    private Configuration getFileBasedCheckpointsConfig(String savepointDir) {
        Configuration config = new Configuration();
        config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        return config;
    }

    private Configuration getFileBasedCheckpointsConfig() {
        return this.getFileBasedCheckpointsConfig(this.savepointDir.toURI().toString());
    }

    private static Matcher<File> hasEntropyInFileStateHandlePaths() {
        return new TypeSafeDiagnosingMatcher<File>(){

            protected boolean matchesSafely(File savepointDir, Description mismatchDescription) {
                if (savepointDir == null) {
                    mismatchDescription.appendText("savepoint dir must not be null");
                    return false;
                }
                List filesWithoutEntropy = SavepointITCase.listRecursively(savepointDir.toPath().resolve("_entropy_"));
                java.nio.file.Path savepointDirWithEntropy = savepointDir.toPath().resolve("_resolved_");
                List filesWithEntropy = SavepointITCase.listRecursively(savepointDirWithEntropy);
                if (!filesWithoutEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are savepoint files with unresolved entropy placeholders");
                    return false;
                }
                if (!Files.exists(savepointDirWithEntropy, new LinkOption[0]) || filesWithEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are no savepoint files with added entropy");
                    return false;
                }
                return true;
            }

            public void describeTo(Description description) {
                description.appendText("all savepoint files should have added entropy");
            }
        };
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<java.nio.file.Path> listRecursively(java.nio.file.Path dir) {
        try {
            if (!Files.exists(dir, new LinkOption[0])) {
                return Collections.emptyList();
            }
            try (Stream<java.nio.file.Path> files = Files.walk(dir, FileVisitOption.FOLLOW_LINKS);){
                List<java.nio.file.Path> list = files.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).collect(Collectors.toList());
                return list;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        iterTestSnapshotWait = new OneShotLatch[1];
        iterTestRestoreWait = new OneShotLatch[1];
        iterTestCheckpointVerify = new int[1];
    }

    public static final class PathFailingFileSystemFactory
    implements FileSystemFactory {
        public String getScheme() {
            return "failPath";
        }

        public FileSystem create(URI fsUri) throws IOException {
            return new PathFailingFileSystem();
        }
    }

    public static class PathFailingFileSystem
    extends LocalFileSystem {
        public static final String SCHEME = "failPath";
        private static String failingPathRegex;

        public static void resetFailingPath(String regex) {
            failingPathRegex = regex;
        }

        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            this.failPath(f);
            return super.open(f, bufferSize);
        }

        public FSDataInputStream open(Path f) throws IOException {
            this.failPath(f);
            return super.open(f);
        }

        public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite) throws IOException {
            this.failPath(filePath);
            return super.create(filePath, overwrite);
        }

        public LocalRecoverableWriter createRecoverableWriter() throws IOException {
            throw new UnsupportedOperationException("This file system does not support recoverable writers.");
        }

        private void failPath(Path filePath) throws IOException {
            if (filePath.getPath().matches(failingPathRegex)) {
                throw new IOException("Expected IO exception for path: " + failingPathRegex);
            }
        }

        public URI getUri() {
            return URI.create("failPath:///");
        }
    }

    private static class MiniClusterResourceFactory {
        private final int numTaskManagers;
        private final int numSlotsPerTaskManager;
        private final Configuration config;

        private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
            this.numTaskManagers = numTaskManagers;
            this.numSlotsPerTaskManager = numSlotsPerTaskManager;
            this.config = config;
        }

        MiniClusterWithClientResource get() {
            return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.config).setNumberTaskManagers(this.numTaskManagers).setNumberSlotsPerTaskManager(this.numSlotsPerTaskManager).build());
        }
    }

    private static class DuplicateFilter
    extends RichFlatMapFunction<Integer, Integer> {
        static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor("seen", Boolean.class, (Object)false);
        private static final long serialVersionUID = 1L;
        private ValueState<Boolean> operatorState;

        private DuplicateFilter() {
        }

        public void open(Configuration configuration) {
            this.operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
        }

        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            if (!((Boolean)this.operatorState.value()).booleanValue()) {
                out.collect((Object)value);
                this.operatorState.update((Object)true);
            }
            if (30 == value) {
                iterTestSnapshotWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
            }
        }
    }

    private static final class IntegerStreamSource
    extends RichSourceFunction<Integer>
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;
        private volatile boolean isRestored = false;
        private int emittedCount = 0;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.emittedCount);
                }
                this.emittedCount = this.emittedCount < 100 ? ++this.emittedCount : 0;
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()] = this.emittedCount;
            return Collections.singletonList(this.emittedCount);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (!state.isEmpty()) {
                this.emittedCount = state.get(0);
            }
            Assert.assertEquals((long)iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()], (long)this.emittedCount);
            iterTestRestoreWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
        }
    }

    private static class StatefulCounter
    extends RichMapFunction<Integer, Integer>
    implements ListCheckpointed<byte[]> {
        private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
        private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
        private int numCollectedElements = 0;
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
        }

        public void open(Configuration parameters) throws Exception {
            if (this.data == null) {
                Random rand = new Random(this.getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[(int)((MemorySize)CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()).getBytes() + 1];
                rand.nextBytes(this.data);
            }
        }

        public Integer map(Integer value) throws Exception {
            int i = 0;
            while (i < this.data.length) {
                int n = i++;
                this.data[n] = (byte)(this.data[n] + 1);
            }
            if (this.numCollectedElements++ > 10) {
                progressLatch.countDown();
            }
            return value;
        }

        public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.data);
        }

        public void restoreState(List<byte[]> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.data = state.get(0);
            restoreLatch.countDown();
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static CountDownLatch getRestoreLatch() {
            return restoreLatch;
        }

        static void resetForTest(int parallelism) {
            progressLatch = new CountDownLatch(parallelism);
            restoreLatch = new CountDownLatch(parallelism);
        }
    }

    private static class SnapshotFailingInfiniteTestSource
    extends InfiniteTestSource
    implements CheckpointedFunction {
        private SnapshotFailingInfiniteTestSource() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            throw new Exception("Expected Exception happened during snapshot creation within test source");
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }
    }

    private static class CancelFailingInfiniteTestSource
    extends InfiniteTestSource
    implements CheckpointListener {
        private static volatile boolean checkpointCompleteTriggered = false;

        private CancelFailingInfiniteTestSource() {
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (!checkpointCompleteTriggered) {
                checkpointCompleteTriggered = true;
                throw new RuntimeException("Expected RuntimeException after snapshot creation.");
            }
            super.cancel();
        }
    }

    private static class InfiniteTestSource
    implements ParallelSourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;
        private volatile boolean suspended = false;
        private static final Collection<InfiniteTestSource> createdSources = new CopyOnWriteArrayList<InfiniteTestSource>();
        private volatile transient CompletableFuture<Void> completeFuture;

        private InfiniteTestSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            this.completeFuture = new CompletableFuture();
            createdSources.add(this);
            try {
                while (this.running) {
                    if (!this.suspended) {
                        Object object = ctx.getCheckpointLock();
                        synchronized (object) {
                            ctx.collect((Object)1);
                        }
                    }
                    Thread.sleep(1L);
                }
                this.completeFuture.complete(null);
            }
            catch (Exception e) {
                this.completeFuture.completeExceptionally(e);
                throw e;
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void suspend() {
            this.suspended = true;
        }

        public static void resetForTest() {
            createdSources.clear();
        }

        public CompletableFuture<Void> getCompleteFuture() {
            return this.completeFuture;
        }

        public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
            createdSources.forEach(InfiniteTestSource::cancel);
            CompletableFuture.allOf((CompletableFuture[])createdSources.stream().map(InfiniteTestSource::getCompleteFuture).toArray(CompletableFuture[]::new)).get();
        }

        public static void suspendAll() {
            createdSources.forEach(InfiniteTestSource::suspend);
        }
    }

    static class BoundedPassThroughOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T>,
    BoundedOneInput {
        static volatile CountDownLatch progressLatch;
        static volatile CountDownLatch snapshotAllowedLatch;
        static volatile CountDownLatch snapshotStartedLatch;
        static volatile boolean inputEnded;
        private transient boolean processed;

        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
            this.chainingStrategy = chainingStrategy;
        }

        private static void allowSnapshots() {
            snapshotAllowedLatch.countDown();
        }

        public static void awaitSnapshotStarted() throws InterruptedException {
            snapshotStartedLatch.await();
        }

        public void endInput() throws Exception {
            inputEnded = true;
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
            if (!this.processed) {
                this.processed = true;
                progressLatch.countDown();
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            snapshotStartedLatch.countDown();
            snapshotAllowedLatch.await();
            super.snapshotState(context);
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static void resetForTest(int parallelism, boolean allowSnapshots) {
            progressLatch = new CountDownLatch(parallelism);
            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : 1);
            snapshotStartedLatch = new CountDownLatch(parallelism);
            inputEnded = false;
        }
    }

    @FunctionalInterface
    static interface PostCancelChecker {
        public void check(MiniClusterWithClientResource var1) throws Exception;
    }

    private static final class FailingOnCompletedSavepointMapFunction
    extends RichMapFunction<Integer, Integer>
    implements CheckpointListener {
        private final long savepointId;

        private FailingOnCompletedSavepointMapFunction(long savepointId) {
            this.savepointId = savepointId;
        }

        public Integer map(Integer value) throws Exception {
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (checkpointId == this.savepointId) {
                throw new ExpectedTestException();
            }
        }
    }

    private static class FinishingSink<T>
    implements SinkFunction<T>,
    CheckpointedFunction {
        private boolean finishCalled;

        private FinishingSink() {
        }

        public void invoke(T value) throws Exception {
        }

        public void finish() throws Exception {
            this.finishCalled = true;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (!this.finishCalled) {
                Assert.fail((String)"Finish is expected to be called before taking the savepoint with drain");
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }
    }
}

