/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.io.File;
import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SnapshotMigrationTestBase
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class);
    protected static final int DEFAULT_PARALLELISM = 4;

    protected static String getResourceFilename(String filename) {
        ClassLoader cl = SnapshotMigrationTestBase.class.getClassLoader();
        URL resource = cl.getResource(filename);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    protected SnapshotMigrationTestBase() throws Exception {
    }

    private Configuration getConfiguration() throws Exception {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 1);
        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
        UUID id = UUID.randomUUID();
        File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
        File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();
        if (!checkpointDir.exists() || !savepointDir.exists()) {
            throw new Exception("Test setup failed: failed to create (temporary) directories.");
        }
        LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
        LOG.info("Created savepoint directory: " + savepointDir + ".");
        config.setString(StateBackendOptions.STATE_BACKEND, "memory");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
        config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);
        return config;
    }

    @SafeVarargs
    protected final void executeAndSnapshot(StreamExecutionEnvironment env, String snapshotPath, SnapshotType snapshotType, Tuple2<String, Integer> ... expectedAccumulators) throws Exception {
        CompletableFuture snapshotPathFuture;
        Deadline deadLine = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        ClusterClient client = this.miniClusterResource.getClusterClient();
        if (snapshotType == SnapshotType.SAVEPOINT_NATIVE) {
            env.enableChangelogStateBackend(false);
        }
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobID jobID = (JobID)client.submitJob(jobGraph).get();
        LOG.info("Submitted job {} and waiting...", (Object)jobID);
        boolean done = false;
        while (deadLine.hasTimeLeft()) {
            Thread.sleep(100L);
            Map accumulators = (Map)client.getAccumulators(jobID).get();
            boolean allDone = true;
            for (Tuple2<String, Integer> acc : expectedAccumulators) {
                Object accumOpt = accumulators.get(acc.f0);
                if (accumOpt == null) {
                    allDone = false;
                    break;
                }
                Integer numFinished = (Integer)accumOpt;
                if (numFinished.equals(acc.f1)) continue;
                allDone = false;
                break;
            }
            if (!allDone) continue;
            done = true;
            break;
        }
        if (!done) {
            Assert.fail((String)"Did not see the expected accumulator results within time limit.");
        }
        LOG.info("Triggering snapshot.");
        switch (snapshotType) {
            case SAVEPOINT_CANONICAL: {
                snapshotPathFuture = client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
                break;
            }
            case SAVEPOINT_NATIVE: {
                snapshotPathFuture = client.triggerSavepoint(jobID, null, SavepointFormatType.NATIVE);
                break;
            }
            case CHECKPOINT: {
                snapshotPathFuture = this.miniClusterResource.getMiniCluster().triggerCheckpoint(jobID);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Snapshot type not supported/implemented.");
            }
        }
        String jobmanagerSnapshotPath = (String)snapshotPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        File jobManagerSnapshot = new File(new URI(jobmanagerSnapshotPath).getPath());
        if (jobManagerSnapshot.isDirectory()) {
            FileUtils.moveDirectory((File)jobManagerSnapshot, (File)new File(snapshotPath));
        } else {
            FileUtils.moveFile((File)jobManagerSnapshot, (File)new File(snapshotPath));
        }
    }

    @SafeVarargs
    protected final void restoreAndExecute(StreamExecutionEnvironment env, String snapshotPath, Tuple2<String, Integer> ... expectedAccumulators) throws Exception {
        Deadline deadLine = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        ClusterClient client = this.miniClusterResource.getClusterClient();
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)snapshotPath));
        JobID jobID = (JobID)client.submitJob(jobGraph).get();
        boolean done = false;
        while (deadLine.hasTimeLeft()) {
            try {
                CompletableFuture jobStatusFuture = client.getJobStatus(jobID);
                JobStatus jobStatus = (JobStatus)jobStatusFuture.get(5L, TimeUnit.SECONDS);
                if (jobStatus == JobStatus.FAILED) {
                    LOG.warn("Job reached status failed", ((SerializedThrowable)((JobResult)client.requestJobResult(jobID).get()).getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader()));
                }
                Assert.assertNotEquals((Object)JobStatus.FAILED, (Object)jobStatus);
            }
            catch (Exception e) {
                Assert.fail((String)("Could not connect to job: " + e));
            }
            Thread.sleep(100L);
            Map accumulators = (Map)client.getAccumulators(jobID).get();
            boolean allDone = true;
            for (Tuple2<String, Integer> acc : expectedAccumulators) {
                Object numFinished = accumulators.get(acc.f0);
                if (numFinished == null) {
                    allDone = false;
                    break;
                }
                if (numFinished.equals(acc.f1)) continue;
                allDone = false;
                break;
            }
            if (!allDone) continue;
            done = true;
            break;
        }
        if (!done) {
            Assert.fail((String)"Did not see the expected accumulator results within time limit.");
        }
    }

    public static class SnapshotSpec
    implements Serializable {
        private final FlinkVersion flinkVersion;
        private final String stateBackendType;
        private final SnapshotType snapshotType;

        public SnapshotSpec(FlinkVersion flinkVersion, String stateBackendType, SnapshotType snapshotType) {
            this.flinkVersion = flinkVersion;
            this.stateBackendType = stateBackendType;
            this.snapshotType = snapshotType;
        }

        public FlinkVersion getFlinkVersion() {
            return this.flinkVersion;
        }

        public String getStateBackendType() {
            return this.stateBackendType;
        }

        public SnapshotType getSnapshotType() {
            return this.snapshotType;
        }

        public static Collection<SnapshotSpec> withVersions(String stateBackendType, SnapshotType snapshotType, Collection<FlinkVersion> flinkVersions) {
            LinkedList<SnapshotSpec> snapshotSpecCollection = new LinkedList<SnapshotSpec>();
            for (FlinkVersion version : flinkVersions) {
                snapshotSpecCollection.add(new SnapshotSpec(version, stateBackendType, snapshotType));
            }
            return snapshotSpecCollection;
        }

        public String toString() {
            StringBuilder str = new StringBuilder("flink" + this.flinkVersion);
            switch (this.stateBackendType) {
                case "rocksdb": {
                    str.append("-rocksdb");
                    break;
                }
                case "jobmanager": {
                    break;
                }
                case "hashmap": {
                    str.append("-hashmap");
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("State backend type not supported.");
                }
            }
            switch (this.snapshotType) {
                case SAVEPOINT_CANONICAL: {
                    str.append("-savepoint");
                    break;
                }
                case SAVEPOINT_NATIVE: {
                    str.append("-savepoint-native");
                    break;
                }
                case CHECKPOINT: {
                    str.append("-checkpoint");
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Snapshot type not supported.");
                }
            }
            return str.toString();
        }
    }

    public static enum SnapshotType {
        SAVEPOINT_CANONICAL,
        SAVEPOINT_NATIVE,
        CHECKPOINT;

    }

    public static enum ExecutionMode {
        CREATE_SNAPSHOT,
        VERIFY_SNAPSHOT;

    }
}

