/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;

public class MigrationTestUtils {

    public static class AccumulatorCountingSink<T>
    extends RichSinkFunction<T> {
        private static final long serialVersionUID = 1L;
        public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
        int count = 0;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, (Accumulator)new IntCounter());
        }

        public void invoke(T value, SinkFunction.Context context) throws Exception {
            ++this.count;
            this.getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add((Object)1);
        }
    }

    public static class CheckingParallelSourceWithUnionListState
    extends RichParallelSourceFunction<Tuple2<Long, Long>>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
        private volatile boolean isRunning = true;
        private final int numElements;

        public CheckingParallelSourceWithUnionListState(int numElements) {
            this.numElements = numElements;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListState unionListState = context.getOperatorStateStore().getUnionListState(CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR);
            if (!context.isRestored()) {
                throw new RuntimeException("This source should always be restored because it's only used when restoring from a savepoint.");
            }
            Assert.assertThat((Object)unionListState.get(), (Matcher)Matchers.containsInAnyOrder((Object[])CheckpointingParallelSourceWithUnionListState.CHECKPOINTED_STRINGS));
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            ctx.emitWatermark(new Watermark(1000L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    if (i % (long)this.getRuntimeContext().getNumberOfParallelSubtasks() != (long)this.getRuntimeContext().getIndexOfThisSubtask()) continue;
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class CheckpointingParallelSourceWithUnionListState
    extends RichSourceFunction<Tuple2<Long, Long>>
    implements CheckpointedFunction {
        static final ListStateDescriptor<String> STATE_DESCRIPTOR = new ListStateDescriptor("source-state", (TypeSerializer)StringSerializer.INSTANCE);
        static final String[] CHECKPOINTED_STRINGS = new String[]{"Here be dragons!", "Here be more dragons!", "Here be yet more dragons!", "Here be the mostest dragons!"};
        private static final long serialVersionUID = 1L;
        private volatile boolean isRunning = true;
        private final int numElements;
        private transient ListState<String> unionListState;

        public CheckpointingParallelSourceWithUnionListState(int numElements) {
            this.numElements = numElements;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.unionListState.clear();
            for (String s : CHECKPOINTED_STRINGS) {
                if (s.hashCode() % this.getRuntimeContext().getNumberOfParallelSubtasks() != this.getRuntimeContext().getIndexOfThisSubtask()) continue;
                this.unionListState.add((Object)s);
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.unionListState = context.getOperatorStateStore().getUnionListState(STATE_DESCRIPTOR);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            ctx.emitWatermark(new Watermark(0L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    if (i % (long)this.getRuntimeContext().getNumberOfParallelSubtasks() != (long)this.getRuntimeContext().getIndexOfThisSubtask()) continue;
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class CheckingNonParallelSourceWithListState
    extends RichSourceFunction<Tuple2<Long, Long>>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
        private volatile boolean isRunning = true;
        private final int numElements;

        public CheckingNonParallelSourceWithListState(int numElements) {
            this.numElements = numElements;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListState unionListState = context.getOperatorStateStore().getListState(CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR);
            if (!context.isRestored()) {
                throw new RuntimeException("This source should always be restored because it's only used when restoring from a savepoint.");
            }
            Assert.assertThat((Object)unionListState.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"Here be dragons!", "Here be more dragons!", "Here be yet more dragons!", "Here be the mostest dragons!"}));
            this.getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, (Accumulator)new IntCounter());
            this.getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add((Object)1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            ctx.emitWatermark(new Watermark(1000L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class CheckpointingNonParallelSourceWithListState
    implements SourceFunction<Tuple2<Long, Long>>,
    CheckpointedFunction {
        static final ListStateDescriptor<String> STATE_DESCRIPTOR = new ListStateDescriptor("source-state", (TypeSerializer)StringSerializer.INSTANCE);
        static final String CHECKPOINTED_STRING = "Here be dragons!";
        static final String CHECKPOINTED_STRING_1 = "Here be more dragons!";
        static final String CHECKPOINTED_STRING_2 = "Here be yet more dragons!";
        static final String CHECKPOINTED_STRING_3 = "Here be the mostest dragons!";
        private static final long serialVersionUID = 1L;
        private volatile boolean isRunning = true;
        private final int numElements;
        private transient ListState<String> unionListState;

        public CheckpointingNonParallelSourceWithListState(int numElements) {
            this.numElements = numElements;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.unionListState.clear();
            this.unionListState.add((Object)CHECKPOINTED_STRING);
            this.unionListState.add((Object)CHECKPOINTED_STRING_1);
            this.unionListState.add((Object)CHECKPOINTED_STRING_2);
            this.unionListState.add((Object)CHECKPOINTED_STRING_3);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.unionListState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            ctx.emitWatermark(new Watermark(0L));
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (long i = 0L; i < (long)this.numElements; ++i) {
                    ctx.collect((Object)new Tuple2((Object)i, (Object)i));
                }
            }
            while (this.isRunning) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

