/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TimersSavepointITCase {
    private static final int PARALLELISM = 4;
    private static final OneShotLatch savepointLatch = new OneShotLatch();
    private static final OneShotLatch resultLatch = new OneShotLatch();
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    public static final String SAVEPOINT_FILE_NAME = "legacy-raw-state-heap-timers-rocks-db-1.12";
    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test(timeout=60000L)
    public void testSavepointWithTimers() throws Exception {
        block14: {
            try (ClusterClient client = this.miniClusterResource.getClusterClient();){
                if (this.executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
                    this.takeSavepoint("src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12", client);
                    break block14;
                }
                if (this.executionMode == ExecutionMode.VERIFY_SAVEPOINT) {
                    this.verifySavepoint(TimersSavepointITCase.getResourceFilename(SAVEPOINT_FILE_NAME), client);
                    break block14;
                }
                throw new IllegalStateException("Unknown ExecutionMode " + (Object)((Object)this.executionMode));
            }
        }
    }

    private void verifySavepoint(String savepointPath, ClusterClient<?> client) throws IOException, InterruptedException, ExecutionException {
        JobGraph jobGraph = this.getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
        client.submitJob(jobGraph).get();
        resultLatch.await();
    }

    private void takeSavepoint(String savepointPath, ClusterClient<?> client) throws Exception {
        JobGraph jobGraph = this.getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
        client.submitJob(jobGraph).get();
        savepointLatch.await();
        CompletableFuture savepointPathFuture = client.triggerSavepoint(jobGraph.getJobID(), null);
        String jobmanagerSavepointPath = (String)savepointPathFuture.get(2L, TimeUnit.SECONDS);
        File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
        FileUtils.moveDirectory((File)jobManagerSavepoint, (File)new File(savepointPath));
    }

    public JobGraph getJobGraph(EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType) throws IOException {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(i, p) -> i.intValue())).keyBy((KeySelector & Serializable)i -> i).process((KeyedProcessFunction)new TimersProcessFunction()).addSink((SinkFunction)new DiscardingSink());
        Configuration config = new Configuration();
        config.set(StateBackendOptions.STATE_BACKEND, (Object)"rocksdb");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)TMP_FOLDER.newFolder().toURI().toString());
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)TMP_FOLDER.newFolder().toURI().toString());
        config.set(RocksDBOptions.TIMER_SERVICE_FACTORY, (Object)priorityQueueStateType);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        return env.getStreamGraph("Test", false).getJobGraph();
    }

    private static String getResourceFilename(String filename) {
        ClassLoader cl = TimersSavepointITCase.class.getClassLoader();
        URL resource = cl.getResource(filename);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    private static class TimersProcessFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
        private TimersProcessFunction() {
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            if (value == 0) {
                ctx.timerService().registerEventTimeTimer(2L);
                savepointLatch.trigger();
            }
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            out.collect((Object)1);
            resultLatch.trigger();
        }
    }

    private static class Source
    implements SourceFunction<Integer>,
    CheckpointedFunction {
        private volatile boolean running = true;
        private int emittedCount;
        private ListState<Integer> state;

        private Source() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (this.emittedCount == 0) {
                        ctx.collect((Object)0);
                        this.emittedCount = 1;
                    } else if (this.emittedCount == 1) {
                        ctx.collect((Object)this.emittedCount);
                    } else {
                        ctx.collect((Object)this.emittedCount++);
                    }
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.state.add((Object)this.emittedCount);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.state = context.getOperatorStateStore().getListState(new ListStateDescriptor("emittedCount", (TypeSerializer)IntSerializer.INSTANCE));
            if (context.isRestored()) {
                this.emittedCount = 2;
            }
        }
    }

    public static enum ExecutionMode {
        PERFORM_SAVEPOINT,
        VERIFY_SAVEPOINT;

    }
}

