/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.SplittableIterator;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SortingBoundedInputITCase
extends AbstractTestBase {
    private static final WatermarkGenerator<Tuple2<Integer, Integer>> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP = new WatermarkGenerator<Tuple2<Integer, Integer>>(){

        public void onEvent(Tuple2<Integer, Integer> event, long eventTimestamp, WatermarkOutput output) {
            if (eventTimestamp == 4L) {
                output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(5L));
            } else if (eventTimestamp == 14L) {
                output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(15L));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    };

    @Test
    public void testOneInputOperator() {
        long numberOfRecords = 1000000L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        DataStreamSource elements = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}));
        SingleOutputStreamOperator counts = elements.keyBy((KeySelector & Serializable)element -> (Integer)element.f0).transform("Asserting operator", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (OneInputStreamOperator)new AssertingOperator());
        long sum = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)counts)).stream().mapToLong(l -> l).sum();
        Assert.assertThat((Object)sum, (Matcher)CoreMatchers.equalTo((Object)numberOfRecords));
    }

    @Test
    public void testTwoInputOperator() {
        long numberOfRecords = 500000L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        DataStreamSource elements1 = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}));
        DataStreamSource elements2 = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO}));
        SingleOutputStreamOperator counts = elements1.connect((DataStream)elements2).keyBy((KeySelector & Serializable)element -> (Integer)element.f0, (KeySelector & Serializable)element -> (Integer)element.f0).transform("Asserting operator", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TwoInputStreamOperator)new AssertingTwoInputOperator());
        long sum = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)counts)).stream().mapToLong(l -> l).sum();
        Assert.assertThat((Object)sum, (Matcher)CoreMatchers.equalTo((Object)(numberOfRecords * 2L)));
    }

    @Test
    public void testThreeInputOperator() {
        long numberOfRecords = 500000L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        KeyedStream elements1 = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy((KeySelector & Serializable)el -> (Integer)el.f0);
        KeyedStream elements2 = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy((KeySelector & Serializable)el -> (Integer)el.f0);
        KeyedStream elements3 = env.fromParallelCollection((SplittableIterator)new InputGenerator(numberOfRecords), (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO})).keyBy((KeySelector & Serializable)el -> (Integer)el.f0);
        KeyedMultipleInputTransformation assertingTransformation = new KeyedMultipleInputTransformation("Asserting operator", (StreamOperatorFactory)new AssertingThreeInputOperatorFactory(), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, -1, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector());
        assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector());
        assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector());
        env.addOperator((Transformation)assertingTransformation);
        DataStream counts = new DataStream(env, (Transformation)assertingTransformation);
        long sum = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)counts)).stream().mapToLong(l -> l).sum();
        Assert.assertThat((Object)sum, (Matcher)CoreMatchers.equalTo((Object)(numberOfRecords * 3L)));
    }

    @Test
    public void testBatchExecutionWithTimersOneInput() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, previousTimestamp) -> ((Integer)r.f1).intValue());
        SingleOutputStreamOperator elements = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)1, (Object)3), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)1), Tuple2.of((Object)1, (Object)4), Tuple2.of((Object)2, (Object)3), Tuple2.of((Object)1, (Object)2), Tuple2.of((Object)1, (Object)13), Tuple2.of((Object)1, (Object)11), Tuple2.of((Object)2, (Object)14), Tuple2.of((Object)1, (Object)11)}).assignTimestampsAndWatermarks(watermarkStrategy);
        final OutputTag lateElements = new OutputTag("late_elements", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator sums = elements.map((MapFunction & Serializable)element -> (Integer)element.f0).keyBy((KeySelector & Serializable)element -> element).process((KeyedProcessFunction)new KeyedProcessFunction<Integer, Integer, Tuple3<Long, Integer, Integer>>(){
            private MapState<Long, Integer> countState;
            private ValueState<Long> previousTimestampState;

            public void open(Configuration parameters) {
                this.countState = this.getRuntimeContext().getMapState(new MapStateDescriptor("sum", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO));
                this.previousTimestampState = this.getRuntimeContext().getState(new ValueStateDescriptor("previousTimestamp", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
            }

            public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
                Long elementTimestamp = ctx.timestamp();
                long nextTen = (elementTimestamp + 10L) / 10L * 10L;
                ctx.timerService().registerEventTimeTimer(nextTen);
                if (elementTimestamp < ctx.timerService().currentWatermark()) {
                    ctx.output(lateElements, (Object)value);
                } else {
                    Long previousTimestamp = (Long)Optional.ofNullable(this.previousTimestampState.value()).orElse(0L);
                    Assert.assertThat((Object)elementTimestamp, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)previousTimestamp));
                    this.previousTimestampState.update((Object)elementTimestamp);
                    Integer currentCount = (Integer)Optional.ofNullable(this.countState.get((Object)nextTen)).orElse(0);
                    this.countState.put((Object)nextTen, (Object)(currentCount + 1));
                }
            }

            public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
                out.collect((Object)Tuple3.of((Object)timestamp, (Object)ctx.getCurrentKey(), (Object)this.countState.get((Object)timestamp)));
                this.countState.remove((Object)timestamp);
                ctx.timerService().registerEventTimeTimer(timestamp + 1L);
            }
        });
        SideOutputDataStream lateStream = sums.getSideOutput(lateElements);
        List lateRecordsCollected = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)lateStream));
        List sumsCollected = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)sums));
        Assert.assertTrue((boolean)lateRecordsCollected.isEmpty());
        Assert.assertThat((Object)sumsCollected, (Matcher)CoreMatchers.equalTo(Arrays.asList(Tuple3.of((Object)10L, (Object)1, (Object)4), Tuple3.of((Object)20L, (Object)1, (Object)3), Tuple3.of((Object)10L, (Object)2, (Object)2), Tuple3.of((Object)20L, (Object)2, (Object)1))));
    }

    @Test
    public void testBatchExecutionWithTimersTwoInput() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, previousTimestamp) -> ((Integer)r.f1).intValue());
        SingleOutputStreamOperator elements1 = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)1, (Object)3), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)1), Tuple2.of((Object)1, (Object)4), Tuple2.of((Object)2, (Object)3), Tuple2.of((Object)1, (Object)2), Tuple2.of((Object)1, (Object)13), Tuple2.of((Object)1, (Object)11), Tuple2.of((Object)2, (Object)14), Tuple2.of((Object)1, (Object)11)}).assignTimestampsAndWatermarks(watermarkStrategy).map((MapFunction & Serializable)element -> (Integer)element.f0);
        SingleOutputStreamOperator elements2 = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)1, (Object)3), Tuple2.of((Object)1, (Object)1), Tuple2.of((Object)2, (Object)1), Tuple2.of((Object)1, (Object)4), Tuple2.of((Object)2, (Object)3), Tuple2.of((Object)1, (Object)2), Tuple2.of((Object)1, (Object)13), Tuple2.of((Object)1, (Object)11), Tuple2.of((Object)2, (Object)14), Tuple2.of((Object)1, (Object)11)}).assignTimestampsAndWatermarks(watermarkStrategy).map((MapFunction & Serializable)element -> (Integer)element.f0);
        final OutputTag lateElements = new OutputTag("late_elements", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        SingleOutputStreamOperator sums = elements1.connect((DataStream)elements2).keyBy((KeySelector & Serializable)element -> element, (KeySelector & Serializable)element -> element).process((KeyedCoProcessFunction)new KeyedCoProcessFunction<Integer, Integer, Integer, Tuple3<Long, Integer, Integer>>(){
            private MapState<Long, Integer> countState;
            private ValueState<Long> previousTimestampState;

            public void open(Configuration parameters) {
                this.countState = this.getRuntimeContext().getMapState(new MapStateDescriptor("sum", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO));
                this.previousTimestampState = this.getRuntimeContext().getState(new ValueStateDescriptor("previousTimestamp", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
            }

            public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
                this.processElement(value, ctx);
            }

            public void processElement2(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
                this.processElement(value, ctx);
            }

            private void processElement(Integer value, KeyedCoProcessFunction.Context ctx) throws Exception {
                Long elementTimestamp = ctx.timestamp();
                long nextTen = (elementTimestamp + 10L) / 10L * 10L;
                ctx.timerService().registerEventTimeTimer(nextTen);
                if (elementTimestamp < ctx.timerService().currentWatermark()) {
                    ctx.output(lateElements, (Object)value);
                } else {
                    Long previousTimestamp = (Long)Optional.ofNullable(this.previousTimestampState.value()).orElse(0L);
                    Assert.assertThat((Object)elementTimestamp, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)previousTimestamp));
                    this.previousTimestampState.update((Object)elementTimestamp);
                    Integer currentCount = (Integer)Optional.ofNullable(this.countState.get((Object)nextTen)).orElse(0);
                    this.countState.put((Object)nextTen, (Object)(currentCount + 1));
                }
            }

            public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
                out.collect((Object)Tuple3.of((Object)timestamp, (Object)ctx.getCurrentKey(), (Object)this.countState.get((Object)timestamp)));
                this.countState.remove((Object)timestamp);
                ctx.timerService().registerEventTimeTimer(timestamp + 1L);
            }
        });
        SideOutputDataStream lateStream = sums.getSideOutput(lateElements);
        List lateRecordsCollected = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)lateStream));
        List sumsCollected = CollectionUtil.iteratorToList((Iterator)DataStreamUtils.collect((DataStream)sums));
        Assert.assertTrue((boolean)lateRecordsCollected.isEmpty());
        Assert.assertThat((Object)sumsCollected, (Matcher)CoreMatchers.equalTo(Arrays.asList(Tuple3.of((Object)10L, (Object)1, (Object)8), Tuple3.of((Object)20L, (Object)1, (Object)6), Tuple3.of((Object)10L, (Object)2, (Object)4), Tuple3.of((Object)20L, (Object)2, (Object)2))));
    }

    private static class InputGenerator
    extends SplittableIterator<Tuple2<Integer, byte[]>> {
        private final long numberOfRecords;
        private long generatedRecords;
        private final Random rnd = new Random();
        private final byte[] bytes = new byte[500];

        private InputGenerator(long numberOfRecords) {
            this.numberOfRecords = numberOfRecords;
            this.rnd.nextBytes(this.bytes);
        }

        public Iterator<Tuple2<Integer, byte[]>>[] split(int numPartitions) {
            long numberOfRecordsPerPartition = this.numberOfRecords / (long)numPartitions;
            long remainder = this.numberOfRecords % (long)numPartitions;
            Iterator[] iterators = new Iterator[numPartitions];
            for (int i = 0; i < numPartitions - 1; ++i) {
                iterators[i] = new InputGenerator(numberOfRecordsPerPartition);
            }
            iterators[numPartitions - 1] = new InputGenerator(numberOfRecordsPerPartition + remainder);
            return iterators;
        }

        public int getMaximumNumberOfSplits() {
            return (int)Math.min(this.numberOfRecords, Integer.MAX_VALUE);
        }

        public boolean hasNext() {
            return this.generatedRecords < this.numberOfRecords;
        }

        public Tuple2<Integer, byte[]> next() {
            if (this.hasNext()) {
                ++this.generatedRecords;
                return Tuple2.of((Object)this.rnd.nextInt(10), (Object)this.bytes);
            }
            return null;
        }
    }

    private static class SingleInput
    implements Input<Tuple2<Integer, byte[]>> {
        private final Consumer<Tuple2<Integer, byte[]>> recordConsumer;

        private SingleInput(Consumer<Tuple2<Integer, byte[]>> recordConsumer) {
            this.recordConsumer = recordConsumer;
        }

        public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
            this.recordConsumer.accept((Tuple2<Integer, byte[]>)element.getValue());
        }

        public void processWatermark(Watermark mark) {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void setKeyContextElement(StreamRecord<Tuple2<Integer, byte[]>> record) {
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        }
    }

    private static class AssertingThreeInputOperatorFactory
    implements StreamOperatorFactory<Long> {
        private AssertingThreeInputOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            return (T)((Object)new AssertingThreeInputOperator(parameters, 3));
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public ChainingStrategy getChainingStrategy() {
            return ChainingStrategy.NEVER;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return AssertingThreeInputOperator.class;
        }
    }

    private static class AssertingThreeInputOperator
    extends AbstractStreamOperatorV2<Long>
    implements MultipleInputStreamOperator<Long>,
    BoundedMultiInput {
        private final Set<Integer> seenKeys = new HashSet<Integer>();
        private long seenRecords = 0L;
        private Integer currentKey = null;
        private boolean input1Finished = false;
        private boolean input2Finished = false;
        private boolean input3Finished = false;

        public AssertingThreeInputOperator(StreamOperatorParameters<Long> parameters, int numberOfInputs) {
            super(parameters, 3);
            assert (numberOfInputs == 3);
        }

        private void processElement(Tuple2<Integer, byte[]> element) {
            ++this.seenRecords;
            Integer incomingKey = (Integer)element.f0;
            if (!Objects.equals(incomingKey, this.currentKey)) {
                if (!this.seenKeys.add(incomingKey)) {
                    Assert.fail((String)("Received an out of order key: " + incomingKey));
                }
                this.currentKey = incomingKey;
            }
        }

        public void endInput(int inputId) {
            if (inputId == 1) {
                this.input1Finished = true;
            }
            if (inputId == 2) {
                this.input2Finished = true;
            }
            if (inputId == 3) {
                this.input3Finished = true;
            }
            if (this.input1Finished && this.input2Finished && this.input3Finished) {
                this.output.collect((Object)new StreamRecord((Object)this.seenRecords));
            }
        }

        public List<Input> getInputs() {
            return Arrays.asList(new SingleInput(this::processElement), new SingleInput(this::processElement), new SingleInput(this::processElement));
        }
    }

    private static class AssertingTwoInputOperator
    extends AbstractStreamOperator<Long>
    implements TwoInputStreamOperator<Tuple2<Integer, byte[]>, Tuple2<Integer, byte[]>, Long>,
    BoundedMultiInput {
        private final Set<Integer> seenKeys = new HashSet<Integer>();
        private long seenRecords = 0L;
        private Integer currentKey = null;
        private boolean input1Finished = false;
        private boolean input2Finished = false;

        private AssertingTwoInputOperator() {
        }

        public void processElement1(StreamRecord<Tuple2<Integer, byte[]>> element) {
            this.processElement(element);
        }

        public void processElement2(StreamRecord<Tuple2<Integer, byte[]>> element) {
            this.processElement(element);
        }

        private void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) {
            ++this.seenRecords;
            Integer incomingKey = (Integer)((Tuple2)element.getValue()).f0;
            if (!Objects.equals(incomingKey, this.currentKey)) {
                if (!this.seenKeys.add(incomingKey)) {
                    Assert.fail((String)("Received an out of order key: " + incomingKey));
                }
                this.currentKey = incomingKey;
            }
        }

        public void endInput(int inputId) {
            if (inputId == 1) {
                this.input1Finished = true;
            }
            if (inputId == 2) {
                this.input2Finished = true;
            }
            if (this.input1Finished && this.input2Finished) {
                this.output.collect((Object)new StreamRecord((Object)this.seenRecords));
            }
        }
    }

    private static class AssertingOperator
    extends AbstractStreamOperator<Long>
    implements OneInputStreamOperator<Tuple2<Integer, byte[]>, Long>,
    BoundedOneInput {
        private final Set<Integer> seenKeys = new HashSet<Integer>();
        private long seenRecords = 0L;
        private Integer currentKey = null;

        private AssertingOperator() {
        }

        public void processElement(StreamRecord<Tuple2<Integer, byte[]>> element) throws Exception {
            ++this.seenRecords;
            Integer incomingKey = (Integer)((Tuple2)element.getValue()).f0;
            if (!Objects.equals(incomingKey, this.currentKey)) {
                if (!this.seenKeys.add(incomingKey)) {
                    Assert.fail((String)("Received an out of order key: " + incomingKey));
                }
                this.currentKey = incomingKey;
            }
        }

        public void endInput() {
            this.output.collect((Object)new StreamRecord((Object)this.seenRecords));
        }
    }
}

