/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.junit.Assert;

public class StreamCheckpointingITCase
extends StreamFaultToleranceTestBase {
    static final long NUM_STRINGS = 10000000L;

    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        DataStreamSource stream = env.addSource((SourceFunction)new StringGeneratingSourceFunction(10000000L));
        stream.filter((FilterFunction)new StringRichFilterFunction()).shuffle().map((MapFunction)new StringPrefixCountRichMapFunction()).startNewChain().map((MapFunction)new StatefulCounterFunction()).keyBy(new String[]{"prefix"}).map((MapFunction)new OnceFailingPrefixCounter(10000000L)).addSink((SinkFunction)new SinkFunction<StreamFaultToleranceTestBase.PrefixCount>(){

            public void invoke(StreamFaultToleranceTestBase.PrefixCount value) throws Exception {
            }
        });
    }

    @Override
    public void postSubmit() {
        long filterSum = 0L;
        for (long l : StringRichFilterFunction.counts) {
            filterSum += l;
        }
        long mapSum = 0L;
        for (long l : StringPrefixCountRichMapFunction.counts) {
            mapSum += l;
        }
        long countSum = 0L;
        for (long l : StatefulCounterFunction.counts) {
            countSum += l;
        }
        long reduceInputCount = 0L;
        for (long l : OnceFailingPrefixCounter.counts) {
            reduceInputCount += l;
        }
        Assert.assertEquals((long)10000000L, (long)filterSum);
        Assert.assertEquals((long)10000000L, (long)mapSum);
        Assert.assertEquals((long)10000000L, (long)countSum);
        Assert.assertEquals((long)10000000L, (long)reduceInputCount);
        Object object = OnceFailingPrefixCounter.prefixCounts.values().iterator();
        while (object.hasNext()) {
            Long count = (Long)object.next();
            Assert.assertEquals((Object)new Long(250000L), (Object)count);
        }
    }

    private static class StringPrefixCountRichMapFunction
    extends RichMapFunction<String, StreamFaultToleranceTestBase.PrefixCount>
    implements ListCheckpointed<Long> {
        static long[] counts = new long[12];
        private long count;

        private StringPrefixCountRichMapFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(String value) throws IOException {
            ++this.count;
            return new StreamFaultToleranceTestBase.PrefixCount(value.substring(0, 1), value, 1L);
        }

        public void close() throws IOException {
            StringPrefixCountRichMapFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.count = state.get(0);
        }
    }

    private static class StringRichFilterFunction
    extends RichFilterFunction<String>
    implements ListCheckpointed<Long> {
        static long[] counts = new long[12];
        private long count;

        private StringRichFilterFunction() {
        }

        public boolean filter(String value) {
            ++this.count;
            return value.length() < 100;
        }

        public void close() {
            StringRichFilterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.count = state.get(0);
        }
    }

    private static class OnceFailingPrefixCounter
    extends RichMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount>
    implements ListCheckpointed<Long> {
        private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
        static long[] counts = new long[12];
        private static volatile boolean hasFailed = false;
        private final long numElements;
        private long failurePos;
        private long count;
        private ValueState<Long> pCount;
        private long inputCount;

        OnceFailingPrefixCounter(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) throws IOException {
            long failurePosMin = (long)(0.4 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            long failurePosMax = (long)(0.7 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = new Random().nextLong() % (failurePosMax - failurePosMin) + failurePosMin;
            this.count = 0L;
            this.pCount = this.getRuntimeContext().getState(new ValueStateDescriptor("pCount", Long.class, (Object)0L));
        }

        public void close() throws IOException {
            OnceFailingPrefixCounter.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.inputCount;
        }

        public StreamFaultToleranceTestBase.PrefixCount map(StreamFaultToleranceTestBase.PrefixCount value) throws Exception {
            ++this.count;
            if (!hasFailed && this.count >= this.failurePos) {
                hasFailed = true;
                throw new Exception("Test Failure");
            }
            ++this.inputCount;
            long currentPrefixCount = (Long)this.pCount.value() + value.count;
            this.pCount.update((Object)currentPrefixCount);
            prefixCounts.put(value.prefix, currentPrefixCount);
            value.count = currentPrefixCount;
            return value;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.inputCount);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.inputCount = state.get(0);
        }
    }

    private static class StatefulCounterFunction
    extends RichMapFunction<StreamFaultToleranceTestBase.PrefixCount, StreamFaultToleranceTestBase.PrefixCount>
    implements ListCheckpointed<Long> {
        private long count;
        static long[] counts = new long[12];

        private StatefulCounterFunction() {
        }

        public StreamFaultToleranceTestBase.PrefixCount map(StreamFaultToleranceTestBase.PrefixCount value) throws Exception {
            ++this.count;
            return value;
        }

        public void close() throws IOException {
            StatefulCounterFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.count;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.count = state.get(0);
        }
    }

    private static class StringGeneratingSourceFunction
    extends RichSourceFunction<String>
    implements ParallelSourceFunction<String>,
    ListCheckpointed<Integer> {
        private final long numElements;
        private final Random rnd = new Random();
        private final StringBuilder stringBuilder = new StringBuilder();
        private int index;
        private int step;
        private volatile boolean isRunning = true;
        static long[] counts = new long[12];

        public void close() throws IOException {
            StringGeneratingSourceFunction.counts[this.getRuntimeContext().getIndexOfThisSubtask()] = this.index;
        }

        StringGeneratingSourceFunction(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) throws IOException {
            this.step = this.getRuntimeContext().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = this.getRuntimeContext().getIndexOfThisSubtask();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object lockingObject = ctx.getCheckpointLock();
            while (this.isRunning && (long)this.index < this.numElements) {
                char first = (char)(this.index % 40 + 40);
                this.stringBuilder.setLength(0);
                this.stringBuilder.append(first);
                String result = StringGeneratingSourceFunction.randomString(this.stringBuilder, this.rnd);
                Object object = lockingObject;
                synchronized (object) {
                    this.index += this.step;
                    ctx.collect((Object)result);
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private static String randomString(StringBuilder bld, Random rnd) {
            int len = rnd.nextInt(10) + 5;
            for (int i = 0; i < len; ++i) {
                char next = (char)(rnd.nextInt(20000) + 33);
                bld.append(next);
            }
            return bld.toString();
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.index);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.index = state.get(0);
        }
    }
}

