/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FixedRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RescalingITCase
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int numTaskManagers = 2;
    private static final int slotsPerTaskManager = 2;
    private static final int numSlots = 4;
    private final String backend;
    private final int buffersPerChannel;
    private String currentBackend = null;
    private static MiniClusterWithClientResource cluster;
    @ClassRule
    public static TemporaryFolder temporaryFolder;

    @Parameterized.Parameters(name="backend = {0}, buffersPerChannel = {1}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"filesystem", 2}, {"rocksdb", 0}, {"filesystem", 0}, {"rocksdb", 2});
    }

    public RescalingITCase(String backend, int buffersPerChannel) {
        this.backend = backend;
        this.buffersPerChannel = buffersPerChannel;
    }

    @Before
    public void setup() throws Exception {
        if (this.currentBackend != this.backend) {
            RescalingITCase.shutDownExistingCluster();
            this.currentBackend = this.backend;
            Configuration config = new Configuration();
            File checkpointDir = temporaryFolder.newFolder();
            File savepointDir = temporaryFolder.newFolder();
            config.setString(StateBackendOptions.STATE_BACKEND, this.currentBackend);
            config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
            config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
            config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, this.buffersPerChannel);
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(4).build());
            cluster.before();
        }
    }

    @AfterClass
    public static void shutDownExistingCluster() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    @Test
    public void testSavepointRescalingInKeyedState() throws Exception {
        this.testSavepointRescalingKeyedState(false, false);
    }

    @Test
    public void testSavepointRescalingOutKeyedState() throws Exception {
        this.testSavepointRescalingKeyedState(true, false);
    }

    @Test
    public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
        this.testSavepointRescalingKeyedState(false, true);
    }

    @Test
    public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
        this.testSavepointRescalingKeyedState(true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception {
        int numberKeys = 42;
        int numberElements = 1000;
        int numberElements2 = 500;
        int parallelism = scaleOut ? 2 : 4;
        int parallelism2 = scaleOut ? 4 : 2;
        int maxParallelism = 13;
        Duration timeout = Duration.ofMinutes(3L);
        Deadline deadline = Deadline.now().plus(timeout);
        ClusterClient client = cluster.getClusterClient();
        try {
            JobGraph jobGraph = RescalingITCase.createJobGraphWithKeyedState(parallelism, 13, 42, 1000, false, 100);
            JobID jobID = jobGraph.getJobID();
            client.submitJob(jobGraph).get();
            Assert.assertTrue((boolean)SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            Set actualResult = CollectionSink.getElementsSet();
            HashSet<Tuple2> expectedResult = new HashSet<Tuple2>();
            for (int key = 0; key < 42; ++key) {
                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)key, (int)13);
                expectedResult.add(Tuple2.of((Object)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)13, (int)parallelism, (int)keyGroupIndex), (Object)(1000 * key)));
            }
            Assert.assertEquals(expectedResult, actualResult);
            CollectionSink.clearElementsSet();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
            CompletableFuture savepointPathFuture = client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
            String savepointPath = (String)savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            client.cancel(jobID).get();
            while (!RescalingITCase.getRunningJobs(client).isEmpty()) {
                Thread.sleep(50L);
            }
            int restoreMaxParallelism = deriveMaxParallelism ? -1 : 13;
            JobGraph scaledJobGraph = RescalingITCase.createJobGraphWithKeyedState(parallelism2, restoreMaxParallelism, 42, 500, true, 100);
            scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
            TestUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)scaledJobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            Set actualResult2 = CollectionSink.getElementsSet();
            HashSet<Tuple2> expectedResult2 = new HashSet<Tuple2>();
            for (int key = 0; key < 42; ++key) {
                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)key, (int)13);
                expectedResult2.add(Tuple2.of((Object)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)13, (int)parallelism2, (int)keyGroupIndex), (Object)(key * 1500)));
            }
            Assert.assertEquals(expectedResult2, actualResult2);
        }
        finally {
            CollectionSink.clearElementsSet();
        }
    }

    @Test
    public void testSavepointRescalingNonPartitionedStateCausesException() throws Exception {
        block3: {
            int parallelism = 2;
            int parallelism2 = 4;
            int maxParallelism = 13;
            Duration timeout = Duration.ofMinutes(3L);
            Deadline deadline = Deadline.now().plus(timeout);
            ClusterClient client = cluster.getClusterClient();
            try {
                JobGraph jobGraph = RescalingITCase.createJobGraphWithOperatorState(2, 13, OperatorCheckpointMethod.NON_PARTITIONED);
                StateSourceBase.canFinishLatch = new CountDownLatch(1);
                JobID jobID = jobGraph.getJobID();
                client.submitJob(jobGraph).get();
                CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
                StateSourceBase.workStartedLatch.await();
                CompletableFuture savepointPathFuture = client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
                String savepointPath = (String)savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                StateSourceBase.canFinishLatch.countDown();
                client.cancel(jobID).get();
                while (!RescalingITCase.getRunningJobs(client).isEmpty()) {
                    Thread.sleep(50L);
                }
                JobGraph scaledJobGraph = RescalingITCase.createJobGraphWithOperatorState(4, 13, OperatorCheckpointMethod.NON_PARTITIONED);
                scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
                TestUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)scaledJobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (JobExecutionException exception) {
                if (exception.getCause() instanceof IllegalStateException) break block3;
                throw exception;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointRescalingWithKeyedAndNonPartitionedState() throws Exception {
        int numberKeys = 42;
        int numberElements = 1000;
        int numberElements2 = 500;
        int parallelism = 2;
        int parallelism2 = 4;
        int maxParallelism = 13;
        Duration timeout = Duration.ofMinutes(3L);
        Deadline deadline = Deadline.now().plus(timeout);
        ClusterClient client = cluster.getClusterClient();
        try {
            JobGraph jobGraph = RescalingITCase.createJobGraphWithKeyedAndNonPartitionedOperatorState(parallelism, maxParallelism, parallelism, numberKeys, numberElements, false, 100);
            JobID jobID = jobGraph.getJobID();
            StateSourceBase.canFinishLatch = new CountDownLatch(1);
            client.submitJob(jobGraph).get();
            Assert.assertTrue((boolean)SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            Set actualResult = CollectionSink.getElementsSet();
            HashSet<Tuple2> expectedResult = new HashSet<Tuple2>();
            for (int key = 0; key < numberKeys; ++key) {
                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)key, (int)maxParallelism);
                expectedResult.add(Tuple2.of((Object)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism, (int)keyGroupIndex), (Object)(numberElements * key)));
            }
            Assert.assertEquals(expectedResult, actualResult);
            CollectionSink.clearElementsSet();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
            CompletableFuture savepointPathFuture = client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
            String savepointPath = (String)savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            StateSourceBase.canFinishLatch.countDown();
            client.cancel(jobID).get();
            while (!RescalingITCase.getRunningJobs(client).isEmpty()) {
                Thread.sleep(50L);
            }
            JobGraph scaledJobGraph = RescalingITCase.createJobGraphWithKeyedAndNonPartitionedOperatorState(parallelism2, maxParallelism, parallelism, numberKeys, numberElements + numberElements2, true, 100);
            scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
            TestUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)scaledJobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
            Set actualResult2 = CollectionSink.getElementsSet();
            HashSet<Tuple2> expectedResult2 = new HashSet<Tuple2>();
            for (int key = 0; key < numberKeys; ++key) {
                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)key, (int)maxParallelism);
                expectedResult2.add(Tuple2.of((Object)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism2, (int)keyGroupIndex), (Object)(key * (numberElements + numberElements2))));
            }
            Assert.assertEquals(expectedResult2, actualResult2);
        }
        finally {
            CollectionSink.clearElementsSet();
        }
    }

    @Test
    public void testSavepointRescalingInPartitionedOperatorState() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION);
    }

    @Test
    public void testSavepointRescalingOutPartitionedOperatorState() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION);
    }

    @Test
    public void testSavepointRescalingInBroadcastOperatorState() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST);
    }

    @Test
    public void testSavepointRescalingOutBroadcastOperatorState() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST);
    }

    @Test
    public void testSavepointRescalingInPartitionedOperatorStateList() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(false, OperatorCheckpointMethod.LIST_CHECKPOINTED);
    }

    @Test
    public void testSavepointRescalingOutPartitionedOperatorStateList() throws Exception {
        this.testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.LIST_CHECKPOINTED);
    }

    public void testSavepointRescalingPartitionedOperatorState(boolean scaleOut, OperatorCheckpointMethod checkpointMethod) throws Exception {
        int parallelism = scaleOut ? 4 : 2;
        int parallelism2 = scaleOut ? 2 : 4;
        int maxParallelism = 13;
        Duration timeout = Duration.ofMinutes(3L);
        Deadline deadline = Deadline.now().plus(timeout);
        ClusterClient client = cluster.getClusterClient();
        int counterSize = Math.max(parallelism, parallelism2);
        if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION || checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
            PartitionedStateSource.access$302(new int[counterSize]);
            PartitionedStateSource.access$402(new int[counterSize]);
        } else {
            PartitionedStateSourceListCheckpointed.access$502(new int[counterSize]);
            PartitionedStateSourceListCheckpointed.access$602(new int[counterSize]);
        }
        JobGraph jobGraph = RescalingITCase.createJobGraphWithOperatorState(parallelism, 13, checkpointMethod);
        StateSourceBase.canFinishLatch = new CountDownLatch(1);
        JobID jobID = jobGraph.getJobID();
        client.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobGraph.getJobID(), (boolean)false);
        StateSourceBase.workStartedLatch.await();
        CompletableFuture savepointPathFuture = FutureUtils.retryWithDelay(() -> client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL), (RetryStrategy)new FixedRetryStrategy((int)deadline.timeLeft().getSeconds() / 10, Duration.ofSeconds(10L)), throwable -> true, (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
        String savepointPath = (String)savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        StateSourceBase.canFinishLatch.countDown();
        client.cancel(jobID).get();
        while (!RescalingITCase.getRunningJobs(client).isEmpty()) {
            Thread.sleep(50L);
        }
        JobGraph scaledJobGraph = RescalingITCase.createJobGraphWithOperatorState(parallelism2, 13, checkpointMethod);
        scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
        TestUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)scaledJobGraph, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader());
        int sumExp = 0;
        int sumAct = 0;
        if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION) {
            for (int c : PartitionedStateSource.checkCorrectSnapshot) {
                sumExp += c;
            }
            for (int c : PartitionedStateSource.checkCorrectRestore) {
                sumAct += c;
            }
        } else if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
            for (int c : PartitionedStateSource.checkCorrectSnapshot) {
                sumExp += c;
            }
            for (int c : PartitionedStateSource.checkCorrectRestore) {
                sumAct += c;
            }
            sumExp *= parallelism2;
        } else {
            for (int c : PartitionedStateSourceListCheckpointed.checkCorrectSnapshot) {
                sumExp += c;
            }
            for (int c : PartitionedStateSourceListCheckpointed.checkCorrectRestore) {
                sumAct += c;
            }
        }
        Assert.assertEquals((long)sumExp, (long)sumAct);
    }

    private static JobGraph createJobGraphWithOperatorState(int parallelism, int maxParallelism, OperatorCheckpointMethod checkpointMethod) {
        StateSourceBase src;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getConfig().setMaxParallelism(maxParallelism);
        env.setRestartStrategy(RestartStrategies.noRestart());
        StateSourceBase.workStartedLatch = new CountDownLatch(parallelism);
        switch (checkpointMethod) {
            case CHECKPOINTED_FUNCTION: {
                src = new PartitionedStateSource(false);
                break;
            }
            case CHECKPOINTED_FUNCTION_BROADCAST: {
                src = new PartitionedStateSource(true);
                break;
            }
            case LIST_CHECKPOINTED: {
                src = new PartitionedStateSourceListCheckpointed();
                break;
            }
            case NON_PARTITIONED: {
                src = new NonPartitionedStateSource();
                break;
            }
            default: {
                throw new IllegalArgumentException();
            }
        }
        DataStreamSource input = env.addSource((SourceFunction)src);
        input.addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static JobGraph createJobGraphWithKeyedState(int parallelism, int maxParallelism, int numberKeys, int numberElements, boolean terminateAfterEmission, int checkpointingInterval) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        if (0 < maxParallelism) {
            env.getConfig().setMaxParallelism(maxParallelism);
        }
        env.enableCheckpointing((long)checkpointingInterval);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().setUseSnapshotCompression(true);
        KeyedStream input = env.addSource((SourceFunction)new SubtaskIndexSource(numberKeys, numberElements, terminateAfterEmission)).keyBy((KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = -7952298871120320940L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        });
        SubtaskIndexFlatMapper.workCompletedLatch = new CountDownLatch(numberKeys);
        SingleOutputStreamOperator result = input.flatMap((FlatMapFunction)new SubtaskIndexFlatMapper(numberElements));
        result.addSink(new CollectionSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static JobGraph createJobGraphWithKeyedAndNonPartitionedOperatorState(int parallelism, int maxParallelism, int fixedParallelism, int numberKeys, int numberElements, boolean terminateAfterEmission, int checkpointingInterval) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getConfig().setMaxParallelism(maxParallelism);
        env.enableCheckpointing((long)checkpointingInterval);
        env.setRestartStrategy(RestartStrategies.noRestart());
        KeyedStream input = env.addSource((SourceFunction)new SubtaskIndexNonPartitionedStateSource(numberKeys, numberElements, terminateAfterEmission)).setParallelism(fixedParallelism).keyBy((KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = -7952298871120320940L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        });
        SubtaskIndexFlatMapper.workCompletedLatch = new CountDownLatch(numberKeys);
        SingleOutputStreamOperator result = input.flatMap((FlatMapFunction)new SubtaskIndexFlatMapper(numberElements));
        result.addSink(new CollectionSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
        Collection statusMessages = (Collection)client.listJobs().get();
        return statusMessages.stream().filter(status -> !status.getJobState().isGloballyTerminalState()).map(JobStatusMessage::getJobId).collect(Collectors.toList());
    }

    static {
        temporaryFolder = new TemporaryFolder();
    }

    private static class PartitionedStateSource
    extends StateSourceBase
    implements CheckpointedFunction {
        private static final long serialVersionUID = -359715965103593462L;
        private static final int NUM_PARTITIONS = 7;
        private transient ListState<Integer> counterPartitions;
        private boolean broadcast;
        private static int[] checkCorrectSnapshot;
        private static int[] checkCorrectRestore;

        public PartitionedStateSource(boolean broadcast) {
            this.broadcast = broadcast;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.counterPartitions.clear();
            PartitionedStateSource.checkCorrectSnapshot[this.getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            int div = this.counter / 7;
            int mod = this.counter % 7;
            for (int i = 0; i < 7; ++i) {
                int partitionValue = div;
                if (mod > 0) {
                    --mod;
                    ++partitionValue;
                }
                this.counterPartitions.add((Object)partitionValue);
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.counterPartitions = this.broadcast ? context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("counter_partitions", (TypeSerializer)IntSerializer.INSTANCE)) : context.getOperatorStateStore().getListState(new ListStateDescriptor("counter_partitions", (TypeSerializer)IntSerializer.INSTANCE));
            if (context.isRestored()) {
                Iterator iterator = ((Iterable)this.counterPartitions.get()).iterator();
                while (iterator.hasNext()) {
                    int v = (Integer)iterator.next();
                    this.counter += v;
                }
                PartitionedStateSource.checkCorrectRestore[this.getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            }
        }

        static /* synthetic */ int[] access$302(int[] x0) {
            checkCorrectSnapshot = x0;
            return x0;
        }

        static /* synthetic */ int[] access$402(int[] x0) {
            checkCorrectRestore = x0;
            return x0;
        }
    }

    private static class PartitionedStateSourceListCheckpointed
    extends StateSourceBase
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = -4357864582992546L;
        private static final int NUM_PARTITIONS = 7;
        private static int[] checkCorrectSnapshot;
        private static int[] checkCorrectRestore;

        private PartitionedStateSourceListCheckpointed() {
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            PartitionedStateSourceListCheckpointed.checkCorrectSnapshot[this.getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
            int div = this.counter / 7;
            int mod = this.counter % 7;
            ArrayList<Integer> split = new ArrayList<Integer>();
            for (int i = 0; i < 7; ++i) {
                int partitionValue = div;
                if (mod > 0) {
                    --mod;
                    ++partitionValue;
                }
                split.add(partitionValue);
            }
            return split;
        }

        public void restoreState(List<Integer> state) throws Exception {
            for (Integer v : state) {
                this.counter += v.intValue();
            }
            PartitionedStateSourceListCheckpointed.checkCorrectRestore[this.getRuntimeContext().getIndexOfThisSubtask()] = this.counter;
        }

        static /* synthetic */ int[] access$502(int[] x0) {
            checkCorrectSnapshot = x0;
            return x0;
        }

        static /* synthetic */ int[] access$602(int[] x0) {
            checkCorrectRestore = x0;
            return x0;
        }
    }

    private static class NonPartitionedStateSource
    extends StateSourceBase
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = -8108185918123186841L;

        private NonPartitionedStateSource() {
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.counter);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (!state.isEmpty()) {
                this.counter = state.get(0);
            }
        }
    }

    private static class StateSourceBase
    extends RichParallelSourceFunction<Integer> {
        private static final long serialVersionUID = 7512206069681177940L;
        private static CountDownLatch workStartedLatch = new CountDownLatch(1);
        private static CountDownLatch canFinishLatch = new CountDownLatch(0);
        protected volatile int counter = 0;
        protected volatile boolean running = true;

        private StateSourceBase() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Object lock = ctx.getCheckpointLock();
            while (this.running) {
                Object object = lock;
                synchronized (object) {
                    ++this.counter;
                    ctx.collect((Object)1);
                }
                Thread.sleep(2L);
                if (this.counter == 10) {
                    workStartedLatch.countDown();
                }
                if (this.counter < 500) continue;
            }
            canFinishLatch.await();
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class CollectionSink<IN>
    implements SinkFunction<IN> {
        private static Set<Object> elements = Collections.newSetFromMap(new ConcurrentHashMap());
        private static final long serialVersionUID = -1652452958040267745L;

        private CollectionSink() {
        }

        public static <IN> Set<IN> getElementsSet() {
            return elements;
        }

        public static void clearElementsSet() {
            elements.clear();
        }

        public void invoke(IN value) throws Exception {
            elements.add(value);
        }
    }

    private static class SubtaskIndexFlatMapper
    extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 5273172591283191348L;
        private static CountDownLatch workCompletedLatch = new CountDownLatch(1);
        private transient ValueState<Integer> counter;
        private transient ValueState<Integer> sum;
        private final int numberElements;

        SubtaskIndexFlatMapper(int numberElements) {
            this.numberElements = numberElements;
        }

        public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
            int count = (Integer)this.counter.value() + 1;
            this.counter.update((Object)count);
            int s = (Integer)this.sum.value() + value;
            this.sum.update((Object)s);
            if (count % this.numberElements == 0) {
                out.collect((Object)Tuple2.of((Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)s));
                workCompletedLatch.countDown();
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.counter = context.getKeyedStateStore().getState(new ValueStateDescriptor("counter", Integer.class, (Object)0));
            this.sum = context.getKeyedStateStore().getState(new ValueStateDescriptor("sum", Integer.class, (Object)0));
        }
    }

    private static class SubtaskIndexNonPartitionedStateSource
    extends SubtaskIndexSource
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 8388073059042040203L;

        SubtaskIndexNonPartitionedStateSource(int numberKeys, int numberElements, boolean terminateAfterEmission) {
            super(numberKeys, numberElements, terminateAfterEmission);
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.counter);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.counter = state.get(0);
        }
    }

    private static class SubtaskIndexSource
    extends RichParallelSourceFunction<Integer> {
        private static final long serialVersionUID = -400066323594122516L;
        private final int numberKeys;
        private final int numberElements;
        private final boolean terminateAfterEmission;
        protected int counter = 0;
        private boolean running = true;

        SubtaskIndexSource(int numberKeys, int numberElements, boolean terminateAfterEmission) {
            this.numberKeys = numberKeys;
            this.numberElements = numberElements;
            this.terminateAfterEmission = terminateAfterEmission;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Object lock = ctx.getCheckpointLock();
            int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
            while (this.running) {
                if (this.counter < this.numberElements) {
                    Object object = lock;
                    synchronized (object) {
                        for (int value = subtaskIndex; value < this.numberKeys; value += this.getRuntimeContext().getNumberOfParallelSubtasks()) {
                            ctx.collect((Object)value);
                        }
                        ++this.counter;
                        continue;
                    }
                }
                if (this.terminateAfterEmission) {
                    this.running = false;
                    continue;
                }
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    static enum OperatorCheckpointMethod {
        NON_PARTITIONED,
        CHECKPOINTED_FUNCTION,
        CHECKPOINTED_FUNCTION_BROADCAST,
        LIST_CHECKPOINTED;

    }
}

