/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CachedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={TestLoggerExtension.class})
class StreamingJobGraphGeneratorTest {
    StreamingJobGraphGeneratorTest() {
    }

    @Test
    void testParallelismOneNotChained() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.addSink((SinkFunction)new SinkFunction<Tuple2<String, String>>(){

            public void invoke(Tuple2<String, String> value) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(2);
        Assertions.assertThat((int)((JobVertex)verticesSorted.get(0)).getParallelism()).isEqualTo(1);
        Assertions.assertThat((int)((JobVertex)verticesSorted.get(1)).getParallelism()).isEqualTo(1);
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapSinkVertex = (JobVertex)verticesSorted.get(1);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((JobEdge)mapSinkVertex.getInputs().get(0)).getSource().getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled()).withFailMessage("Checkpointing enabled", new Object[0])).isFalse();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
        Assertions.assertThat((long)snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval()).isEqualTo(Long.MAX_VALUE);
        Assertions.assertThat((boolean)snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce()).isFalse();
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assertions.assertThat((Comparable)streamConfig.getCheckpointMode()).isEqualTo((Object)CheckpointingMode.AT_LEAST_ONCE);
    }

    @Test
    void testEnabledUnalignedCheckAndDisabledCheckpointing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled()).withFailMessage("Checkpointing enabled", new Object[0])).isFalse();
        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assertions.assertThat((Comparable)streamConfig.getCheckpointMode()).isEqualTo((Object)CheckpointingMode.AT_LEAST_ONCE);
        Assertions.assertThat((boolean)streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
    }

    @Test
    public void testTransformationSetParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(1L, 3L).map((MapFunction & Serializable)i -> i).setParallelism(10).print().setParallelism(20);
        StreamGraph streamGraph = env.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(0)).isParallelismConfigured()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(1)).isParallelismConfigured()).isTrue();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(2)).isParallelismConfigured()).isTrue();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(3);
        Assertions.assertThat((boolean)((JobVertex)vertices.get(0)).isParallelismConfigured()).isFalse();
        Assertions.assertThat((boolean)((JobVertex)vertices.get(1)).isParallelismConfigured()).isTrue();
        Assertions.assertThat((boolean)((JobVertex)vertices.get(2)).isParallelismConfigured()).isTrue();
    }

    @Test
    public void testChainNodeSetParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(1L, 3L).map((MapFunction & Serializable)value -> value).print().setParallelism(env.getParallelism());
        StreamGraph streamGraph = env.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(0)).isParallelismConfigured()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(1)).isParallelismConfigured()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(2)).isParallelismConfigured()).isTrue();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(1);
        Assertions.assertThat((boolean)((JobVertex)vertices.get(0)).isParallelismConfigured()).isTrue();
    }

    @Test
    public void testChainedSourcesSetParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleInputTransformation transform = new MultipleInputTransformation("mit", (StreamOperatorFactory)new UnusedOperatorFactory(), Types.LONG, env.getParallelism(), false);
        DataStreamSource source1 = env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "source1");
        DataStreamSource source2 = env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), "source2");
        transform.addInput(source1.getTransformation());
        transform.addInput(source2.getTransformation());
        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        source1.setParallelism(env.getParallelism());
        env.addOperator((Transformation)transform);
        StreamGraph streamGraph = env.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(0)).isParallelismConfigured()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(1)).isParallelismConfigured()).isTrue();
        Assertions.assertThat((boolean)((StreamNode)streamNodes.get(2)).isParallelismConfigured()).isFalse();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(1);
        Assertions.assertThat((boolean)((JobVertex)vertices.get(0)).isParallelismConfigured()).isTrue();
    }

    @Test
    public void testDynamicGraphVertexParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int defaultParallelism = 20;
        env.setParallelism(defaultParallelism);
        env.fromSequence(1L, 3L).map((MapFunction & Serializable)value -> value).print();
        StreamGraph streamGraph = env.getStreamGraph();
        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
            Assertions.assertThat((int)streamNode.getParallelism()).isEqualTo(defaultParallelism);
        }
        streamGraph.setDynamic(false);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        for (JobVertex vertex : vertices) {
            Assertions.assertThat((int)vertex.getParallelism()).isEqualTo(defaultParallelism);
        }
        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
            Assertions.assertThat((int)streamNode.getParallelism()).isEqualTo(defaultParallelism);
        }
        streamGraph.setDynamic(true);
        jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        for (JobVertex vertex : vertices) {
            Assertions.assertThat((int)vertex.getParallelism()).isEqualTo(-1);
        }
    }

    @Test
    void testUnalignedCheckAndAtLeastOnce() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{0}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        StreamConfig streamConfig = new StreamConfig(((JobVertex)verticesSorted.get(0)).getConfiguration());
        Assertions.assertThat((Comparable)streamConfig.getCheckpointMode()).isEqualTo((Object)CheckpointingMode.AT_LEAST_ONCE);
        Assertions.assertThat((boolean)streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
    }

    @Test
    void generatorForwardsSavepointRestoreSettings() {
        StreamGraph streamGraph = new StreamGraph(new ExecutionConfig(), new CheckpointConfig(), SavepointRestoreSettings.forPath((String)"hello"));
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
        Assertions.assertThat((String)savepointRestoreSettings.getRestorePath()).isEqualTo("hello");
    }

    @Test
    void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction)new MapFunction<Integer, Integer>(){

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapPrintVertex = (JobVertex)verticesSorted.get(1);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((JobEdge)mapPrintVertex.getInputs().get(0)).getSource().getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
        StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
        Map chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(this.getClass().getClassLoader());
        StreamConfig printConfig = (StreamConfig)chainedConfigs.values().iterator().next();
        Assertions.assertThat((boolean)sourceConfig.isChainStart()).isTrue();
        Assertions.assertThat((boolean)sourceConfig.isChainEnd()).isTrue();
        Assertions.assertThat((boolean)mapConfig.isChainStart()).isTrue();
        Assertions.assertThat((boolean)mapConfig.isChainEnd()).isFalse();
        Assertions.assertThat((boolean)printConfig.isChainStart()).isFalse();
        Assertions.assertThat((boolean)printConfig.isChainEnd()).isTrue();
    }

    @Test
    void testOperatorCoordinatorAddedToJobVertex() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource stream = env.fromSource((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestingSource");
        OneInputTransformation resultTransform = new OneInputTransformation(stream.getTransformation(), "AnyName", (StreamOperatorFactory)new CoordinatedTransformOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, env.getParallelism());
        new TestingSingleOutputStreamOperator(env, resultTransform).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assertions.assertThat((List)jobGraph.getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
    }

    @Test
    void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method sinkMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Tuple2<Integer, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resource1);
        SingleOutputStreamOperator map = source.map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
                return value;
            }
        });
        opMethod.invoke((Object)map, resource2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction)new FilterFunction<Tuple2<Integer, Integer>>(){

            public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
                return false;
            }
        });
        opMethod.invoke((Object)filter, resource3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce((ReduceFunction)new ReduceFunction<Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        });
        opMethod.invoke((Object)reduce, resource4);
        DataStreamSink sink = reduce.addSink((SinkFunction)new SinkFunction<Tuple2<Integer, Integer>>(){

            public void invoke(Tuple2<Integer, Integer> value) throws Exception {
            }
        });
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        JobVertex sourceMapFilterVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex reduceSinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assertions.assertThat((Object)sourceMapFilterVertex.getMinResources()).isEqualTo((Object)resource3.merge(resource2).merge(resource1));
        Assertions.assertThat((Object)reduceSinkVertex.getPreferredResources()).isEqualTo((Object)resource4.merge(resource5));
    }

    @Test
    void testResourcesForIteration() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method sinkMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        opMethod.invoke((Object)source, resource1);
        IterativeStream iteration = source.iterate(3000L);
        opMethod.invoke((Object)iteration, resource2);
        SingleOutputStreamOperator flatMap = iteration.flatMap((FlatMapFunction)new FlatMapFunction<Integer, Integer>(){

            public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
            }
        }).name("test_flatMap");
        opMethod.invoke((Object)flatMap, resource3);
        SingleOutputStreamOperator increment = flatMap.filter((FilterFunction)new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        }).name("test_filter");
        opMethod.invoke((Object)increment, resource4);
        DataStreamSink sink = iteration.closeWith((DataStream)increment).addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
            }
        }).disableChaining().name("test_sink");
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assertions.assertThat((Object)jobVertex.getMinResources()).isEqualTo((Object)resource1);
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Source")) {
                Assertions.assertThat((Object)jobVertex.getPreferredResources()).isEqualTo((Object)resource2);
                continue;
            }
            if (jobVertex.getName().contains("test_flatMap")) {
                Assertions.assertThat((Object)jobVertex.getMinResources()).isEqualTo((Object)resource3.merge(resource4));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Tail")) {
                Assertions.assertThat((Object)jobVertex.getPreferredResources()).isEqualTo((Object)ResourceSpec.DEFAULT);
                continue;
            }
            if (!jobVertex.getName().contains("test_sink")) continue;
            Assertions.assertThat((Object)jobVertex.getMinResources()).isEqualTo((Object)resource5);
        }
    }

    @Test
    void testInputOutputFormat() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new InputFormatSourceFunction((InputFormat)new TypeSerializerInputFormat(TypeInformation.of(Long.class)), TypeInformation.of(Long.class)), TypeInformation.of(Long.class)).name("source");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink1");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink2");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(1);
        JobVertex jobVertex = (JobVertex)jobGraph.getVertices().iterator().next();
        Assertions.assertThat((Object)jobVertex).isInstanceOf(InputOutputFormatVertex.class);
        InputOutputFormatContainer formatContainer = new InputOutputFormatContainer(new TaskConfig(jobVertex.getConfiguration()), Thread.currentThread().getContextClassLoader());
        Map inputFormats = formatContainer.getInputFormats();
        Map outputFormats = formatContainer.getOutputFormats();
        Assertions.assertThat((Map)inputFormats).hasSize(1);
        Assertions.assertThat((Map)outputFormats).hasSize(2);
        HashMap<String, OperatorID> nameToOperatorIds = new HashMap<String, OperatorID>();
        StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
        nameToOperatorIds.put(headConfig.getOperatorName(), headConfig.getOperatorID());
        Map chainedConfigs = headConfig.getTransitiveChainedTaskConfigs(Thread.currentThread().getContextClassLoader());
        for (StreamConfig config : chainedConfigs.values()) {
            nameToOperatorIds.put(config.getOperatorName(), config.getOperatorID());
        }
        InputFormat sourceFormat = (InputFormat)((UserCodeWrapper)inputFormats.get(nameToOperatorIds.get("Source: source"))).getUserCodeObject();
        Assertions.assertThat((Object)sourceFormat).isInstanceOf(TypeSerializerInputFormat.class);
        OutputFormat sinkFormat1 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink1"))).getUserCodeObject();
        Assertions.assertThat((Object)sinkFormat1).isInstanceOf(DiscardingOutputFormat.class);
        OutputFormat sinkFormat2 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink2"))).getUserCodeObject();
        Assertions.assertThat((Object)sinkFormat2).isInstanceOf(DiscardingOutputFormat.class);
    }

    @Test
    void testCoordinatedOperator() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestSource");
        source.addSink((SinkFunction)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(1);
        JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
        List coordinatorProviders = jobVertex.getOperatorCoordinators();
        Assertions.assertThat((List)coordinatorProviders).hasSize(1);
        ClassLoader classLoader = this.getClass().getClassLoader();
        Assertions.assertThat((Class)jobVertex.getInvokableClass(classLoader)).isEqualTo(SourceOperatorStreamTask.class);
        StreamOperatorFactory operatorFactory = new StreamConfig(jobVertex.getConfiguration()).getStreamOperatorFactory(classLoader);
        Assertions.assertThat((Object)operatorFactory).isInstanceOf(SourceOperatorFactory.class);
    }

    @Test
    void testExchangeModePipelined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.PIPELINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.PIPELINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(2);
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testExchangeModeBatch() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setBufferTimeout(-1L);
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.BATCH));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.BATCH));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(3);
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapVertex = (JobVertex)verticesSorted.get(1);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
        Assertions.assertThat((Comparable)((IntermediateDataSet)mapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
    }

    @Test
    void testExchangeModeUndefined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.UNDEFINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.UNDEFINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(2);
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testExchangeModeHybridFull() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.HYBRID_FULL));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.HYBRID_FULL));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(2);
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.HYBRID_FULL);
    }

    @Test
    void testExchangeModeHybridSelective() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.HYBRID_SELECTIVE));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.HYBRID_SELECTIVE));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(2);
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.HYBRID_SELECTIVE);
    }

    @Test
    void testStreamingJobTypeByDefault() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new String[]{"test"}).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assertions.assertThat((Comparable)jobGraph.getJobType()).isEqualTo((Object)JobType.STREAMING);
    }

    @Test
    void testBatchJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.fromElements((Object[])new String[]{"test"}).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        Assertions.assertThat((Comparable)jobGraph.getJobType()).isEqualTo((Object)JobType.BATCH);
    }

    @Test
    void testPartitionTypesInBatchMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(4);
        env.disableOperatorChaining();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1});
        source.map((MapFunction & Serializable)value -> value).setParallelism(1).rescale().map((MapFunction & Serializable)value -> value).rebalance().map((MapFunction & Serializable)value -> value).keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)value -> value).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        this.assertHasOutputPartitionType((JobVertex)verticesSorted.get(0), ResultPartitionType.BLOCKING);
        this.assertHasOutputPartitionType((JobVertex)verticesSorted.get(1), ResultPartitionType.BLOCKING);
        this.assertHasOutputPartitionType((JobVertex)verticesSorted.get(2), ResultPartitionType.BLOCKING);
        this.assertHasOutputPartitionType((JobVertex)verticesSorted.get(3), ResultPartitionType.BLOCKING);
        this.assertHasOutputPartitionType((JobVertex)verticesSorted.get(4), ResultPartitionType.BLOCKING);
    }

    private void assertHasOutputPartitionType(JobVertex jobVertex, ResultPartitionType partitionType) {
        Assertions.assertThat((Comparable)((IntermediateDataSet)jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)partitionType);
    }

    @Test
    void testNormalExchangeModeWithBufferTimeout() {
        this.testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
    }

    private void testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode exchangeMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(100L);
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        PartitionTransformation transformation = new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), exchangeMode);
        DataStream partitionStream = new DataStream(env, (Transformation)transformation);
        partitionStream.map((MapFunction & Serializable)value -> value).print();
        StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    @Test
    void testDisablingBufferTimeoutWithPipelinedExchanges() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setBufferTimeout(-1L);
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction & Serializable)value -> value).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        for (JobVertex vertex : jobGraph.getVertices()) {
            StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
            for (NonChainedOutput output : streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
                Assertions.assertThat((long)output.getBufferTimeout()).isEqualTo(-1L);
            }
        }
    }

    @Test
    void testIteration() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        IterativeStream iteration = source.iterate(3000L);
        iteration.name("iteration").setParallelism(2);
        SingleOutputStreamOperator map = iteration.map((MapFunction & Serializable)x -> x + 1).name("map").setParallelism(2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction & Serializable)x -> false).name("filter").setParallelism(2);
        iteration.closeWith((DataStream)filter).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
        Assertions.assertThat((Object)slotSharingGroup).isNotNull();
        CoLocationGroup iterationSourceCoLocationGroup = null;
        CoLocationGroup iterationSinkCoLocationGroup = null;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            Assertions.assertThat((Object)jobVertex.getSlotSharingGroup()).isEqualTo((Object)slotSharingGroup);
            if (jobVertex.getName().startsWith("IterationSource")) {
                iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
                Assertions.assertThat((List)iterationSourceCoLocationGroup.getVertexIds()).contains((Object[])new JobVertexID[]{jobVertex.getID()});
                continue;
            }
            if (jobVertex.getName().startsWith("IterationSink")) {
                iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
                Assertions.assertThat((List)iterationSinkCoLocationGroup.getVertexIds()).contains((Object[])new JobVertexID[]{jobVertex.getID()});
                continue;
            }
            Assertions.assertThat((Object)jobVertex.getCoLocationGroup()).isNull();
        }
        Assertions.assertThat(iterationSourceCoLocationGroup).isNotNull();
        Assertions.assertThat(iterationSinkCoLocationGroup).isNotNull();
        Assertions.assertThat(iterationSinkCoLocationGroup).isEqualTo(iterationSourceCoLocationGroup);
    }

    @Test
    void testDefaultJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), env.getConfig(), env.getCheckpointConfig()).generate();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((Comparable)jobGraph.getJobType()).isEqualTo((Object)JobType.STREAMING);
    }

    @Test
    void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = chainEnv.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(0)), (StreamNode)((StreamNode)streamNodes.get(1)), (StreamGraph)streamGraph)).isTrue();
        Assertions.assertThat((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(1)), (StreamNode)((StreamNode)streamNodes.get(2)), (StreamGraph)streamGraph)).isFalse();
    }

    @Test
    void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).disableChaining().map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = chainEnv.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        Assertions.assertThat((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(0)), (StreamNode)((StreamNode)streamNodes.get(1)), (StreamGraph)streamGraph)).isFalse();
        Assertions.assertThat((boolean)StreamingJobGraphGenerator.areOperatorsChainable((StreamNode)((StreamNode)streamNodes.get(1)), (StreamNode)((StreamNode)streamNodes.get(2)), (StreamGraph)streamGraph)).isTrue();
    }

    @Test
    void testYieldingOperatorProperlyChainedOnLegacySources() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromElements((Object[])new Integer[]{1}).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map((MapFunction & Serializable)x -> x).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)vertices).hasSize(2);
        Assertions.assertThat((List)((JobVertex)vertices.get(0)).getOperatorIDs()).hasSize(2);
        Assertions.assertThat((List)((JobVertex)vertices.get(1)).getOperatorIDs()).hasSize(5);
    }

    @Test
    void testYieldingOperatorProperlyChainedOnNewSources() {
        LocalStreamEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        chainEnv.fromSource((Source)new NumberSequenceSource(0L, 10L), WatermarkStrategy.noWatermarks(), "input").map((MapFunction & Serializable)x -> x).transform("test", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory()).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)vertices).hasSize(1);
        Assertions.assertThat((List)((JobVertex)vertices.get(0)).getOperatorIDs()).hasSize(4);
    }

    @Test
    void testDeterministicUnionOrder() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        JobGraph jobGraph = this.getUnionJobGraph((StreamExecutionEnvironment)env);
        JobVertex jobSink = (JobVertex)Iterables.getLast((Iterable)jobGraph.getVerticesSortedTopologicallyFromSources());
        List expectedSourceOrder = jobSink.getInputs().stream().map(edge -> edge.getSource().getProducer().getName()).collect(Collectors.toList());
        for (int i = 0; i < 100; ++i) {
            JobGraph jobGraph2 = this.getUnionJobGraph((StreamExecutionEnvironment)env);
            JobVertex jobSink2 = (JobVertex)Iterables.getLast((Iterable)jobGraph2.getVerticesSortedTopologicallyFromSources());
            ((ObjectAssert)Assertions.assertThat((Object)jobSink).withFailMessage("Different runs should yield different vertexes", new Object[0])).isNotEqualTo((Object)jobSink2);
            List actualSourceOrder = jobSink2.getInputs().stream().map(edge -> edge.getSource().getProducer().getName()).collect(Collectors.toList());
            ((ListAssert)Assertions.assertThat(actualSourceOrder).withFailMessage("Union inputs reordered", new Object[0])).isEqualTo(expectedSourceOrder);
        }
    }

    private JobGraph getUnionJobGraph(StreamExecutionEnvironment env) {
        this.createSource(env, 1).union(new DataStream[]{this.createSource(env, 2)}).union(new DataStream[]{this.createSource(env, 3)}).union(new DataStream[]{this.createSource(env, 4)}).addSink((SinkFunction)new DiscardingSink());
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private DataStream<Integer> createSource(StreamExecutionEnvironment env, int index) {
        return env.fromElements((Object[])new Integer[]{index}).name("source" + index).map((MapFunction & Serializable)i -> i).name("map" + index);
    }

    @Test
    void testNotSupportInputSelectableOperatorIfCheckpointing() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000L);
        DataStreamSource source1 = env.fromElements((Object[])new String[]{"1"});
        DataStreamSource source2 = env.fromElements((Object[])new Integer[]{1});
        source1.connect((DataStream)source2).transform("test", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TestAnyModeReadingStreamOperator("test operator")).print();
        Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph())).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
        ResourceSpec resource = ResourceSpec.UNKNOWN;
        List<ResourceSpec> resourceSpecs = Arrays.asList(resource, resource, resource, resource);
        Configuration taskManagerConfig = new Configuration(){
            {
                this.set(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS, new HashMap<String, String>(){
                    {
                        this.put("DATAPROC", "6");
                        this.put("PYTHON", "4");
                    }
                });
            }
        };
        ArrayList<Map<ManagedMemoryUseCase, Integer>> operatorScopeManagedMemoryUseCaseWeights = new ArrayList<Map<ManagedMemoryUseCase, Integer>>();
        ArrayList<Set<ManagedMemoryUseCase>> slotScopeManagedMemoryUseCases = new ArrayList<Set<ManagedMemoryUseCase>>();
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.emptySet());
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.emptyMap());
        slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        operatorScopeManagedMemoryUseCaseWeights.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        slotScopeManagedMemoryUseCases.add(Collections.emptySet());
        JobGraph jobGraph = this.createJobGraphForManagedMemoryFractionTest(resourceSpecs, operatorScopeManagedMemoryUseCaseWeights, slotScopeManagedMemoryUseCases);
        JobVertex vertex1 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex vertex2 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex vertex3 = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
        StreamConfig sourceConfig = new StreamConfig(vertex1.getConfiguration());
        this.verifyFractions(sourceConfig, 0.3, 0.0, 0.0, taskManagerConfig);
        StreamConfig map1Config = (StreamConfig)Iterables.getOnlyElement(sourceConfig.getTransitiveChainedTaskConfigs(StreamingJobGraphGeneratorTest.class.getClassLoader()).values());
        this.verifyFractions(map1Config, 0.3, 0.4, 0.0, taskManagerConfig);
        StreamConfig map2Config = new StreamConfig(vertex2.getConfiguration());
        this.verifyFractions(map2Config, 0.0, 0.4, 0.0, taskManagerConfig);
        StreamConfig map3Config = new StreamConfig(vertex3.getConfiguration());
        this.verifyFractions(map3Config, 1.0, 0.0, 0.0, taskManagerConfig);
    }

    private JobGraph createJobGraphForManagedMemoryFractionTest(List<ResourceSpec> resourceSpecs, List<Map<ManagedMemoryUseCase, Integer>> operatorScopeUseCaseWeights, List<Set<ManagedMemoryUseCase>> slotScopeUseCases) throws Exception {
        Method opMethod = StreamingJobGraphGeneratorTest.getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resourceSpecs.get(0));
        SingleOutputStreamOperator map1 = source.map((MapFunction & Serializable)value -> value);
        opMethod.invoke((Object)map1, resourceSpecs.get(1));
        SingleOutputStreamOperator map2 = map1.rebalance().map((MapFunction & Serializable)value -> value);
        opMethod.invoke((Object)map2, resourceSpecs.get(2));
        SingleOutputStreamOperator map3 = map2.rebalance().map((MapFunction & Serializable)value -> value).slotSharingGroup("test");
        opMethod.invoke((Object)map3, resourceSpecs.get(3));
        this.declareManagedMemoryUseCaseForTranformation(source.getTransformation(), operatorScopeUseCaseWeights.get(0), slotScopeUseCases.get(0));
        this.declareManagedMemoryUseCaseForTranformation(map1.getTransformation(), operatorScopeUseCaseWeights.get(1), slotScopeUseCases.get(1));
        this.declareManagedMemoryUseCaseForTranformation(map2.getTransformation(), operatorScopeUseCaseWeights.get(2), slotScopeUseCases.get(2));
        this.declareManagedMemoryUseCaseForTranformation(map3.getTransformation(), operatorScopeUseCaseWeights.get(3), slotScopeUseCases.get(3));
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private void declareManagedMemoryUseCaseForTranformation(Transformation<?> transformation, Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights, Set<ManagedMemoryUseCase> slotScopeUseCases) {
        for (Map.Entry<ManagedMemoryUseCase, Integer> entry : operatorScopeUseCaseWeights.entrySet()) {
            transformation.declareManagedMemoryUseCaseAtOperatorScope(entry.getKey(), entry.getValue().intValue());
        }
        for (ManagedMemoryUseCase useCase : slotScopeUseCases) {
            transformation.declareManagedMemoryUseCaseAtSlotScope(useCase);
        }
    }

    private void verifyFractions(StreamConfig streamConfig, double expectedBatchFrac, double expectedPythonFrac, double expectedStateBackendFrac, Configuration tmConfig) {
        double delta = 1.0E-6;
        Assertions.assertThat((double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, tmConfig, ClassLoader.getSystemClassLoader())).isCloseTo(expectedStateBackendFrac, Offset.offset((Number)1.0E-6));
        Assertions.assertThat((double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, tmConfig, ClassLoader.getSystemClassLoader())).isCloseTo(expectedPythonFrac, Offset.offset((Number)1.0E-6));
        Assertions.assertThat((double)streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, tmConfig, ClassLoader.getSystemClassLoader())).isCloseTo(expectedBatchFrac, Offset.offset((Number)1.0E-6));
    }

    @Test
    void testHybridShuffleModeInNonBatchMode() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.STREAMING);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.disableOperatorChaining();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitioned = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.HYBRID_FULL));
        partitioned.addSink((SinkFunction)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testSetNonDefaultSlotSharingInHybridMode() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
        StreamGraph streamGraph = this.createStreamGraphForSlotSharingTest(configuration);
        streamGraph.getStreamNodes().stream().filter(n -> "map1".equals(n.getOperatorName())).findFirst().get().setSlotSharingGroup("testSlotSharingGroup");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph)).isInstanceOf(IllegalStateException.class)).hasMessage("hybrid shuffle mode currently does not support setting non-default slot sharing group.");
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
        StreamGraph streamGraph2 = this.createStreamGraphForSlotSharingTest(configuration);
        streamGraph2.getStreamNodes().stream().filter(n -> "map1".equals(n.getOperatorName())).findFirst().get().setSlotSharingGroup("testSlotSharingGroup");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph2)).isInstanceOf(IllegalStateException.class)).hasMessage("hybrid shuffle mode currently does not support setting non-default slot sharing group.");
    }

    @Test
    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
        StreamGraph streamGraph = this.createStreamGraphForSlotSharingTest(new Configuration());
        streamGraph.getStreamNodes().stream().filter(n -> "map1".equals(n.getOperatorName())).findFirst().get().setSlotSharingGroup("testSlotSharingGroup");
        streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(4);
        List<JobVertex> verticesMatched = StreamingJobGraphGeneratorTest.getExpectedVerticesList(verticesSorted);
        JobVertex source1Vertex = verticesMatched.get(0);
        JobVertex source2Vertex = verticesMatched.get(1);
        JobVertex map1Vertex = verticesMatched.get(2);
        JobVertex map2Vertex = verticesMatched.get(3);
        this.assertSameSlotSharingGroup(source1Vertex, source2Vertex, map2Vertex);
        this.assertDistinctSharingGroups(source1Vertex, map1Vertex);
    }

    @Test
    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
        StreamGraph streamGraph = this.createStreamGraphForSlotSharingTest(new Configuration());
        streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(4);
        List<JobVertex> verticesMatched = StreamingJobGraphGeneratorTest.getExpectedVerticesList(verticesSorted);
        JobVertex source1Vertex = verticesMatched.get(0);
        JobVertex source2Vertex = verticesMatched.get(1);
        JobVertex map1Vertex = verticesMatched.get(2);
        JobVertex map2Vertex = verticesMatched.get(3);
        this.assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex, map1Vertex);
    }

    @Test
    void testSlotSharingResourceConfiguration() {
        String slotSharingGroup1 = "slot-a";
        String slotSharingGroup2 = "slot-b";
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        ResourceProfile resourceProfile3 = ResourceProfile.fromResources((double)3.0, (int)30);
        HashMap<String, ResourceProfile> slotSharingGroupResource = new HashMap<String, ResourceProfile>();
        slotSharingGroupResource.put("slot-a", resourceProfile1);
        slotSharingGroupResource.put("slot-b", resourceProfile2);
        slotSharingGroupResource.put("default", resourceProfile3);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).name("slot-a").slotSharingGroup("slot-a").map((MapFunction & Serializable)x -> x + 1).name("slot-b").slotSharingGroup("slot-b").map((MapFunction & Serializable)x -> x * x).name("default").slotSharingGroup("default");
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        int numVertex = 0;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            ++numVertex;
            if (jobVertex.getName().contains("slot-a")) {
                Assertions.assertThat((Object)jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo((Object)resourceProfile1);
                continue;
            }
            if (jobVertex.getName().contains("slot-b")) {
                Assertions.assertThat((Object)jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo((Object)resourceProfile2);
                continue;
            }
            if (jobVertex.getName().contains("default")) {
                Assertions.assertThat((Object)jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo((Object)resourceProfile3);
                continue;
            }
            Assertions.fail((String)"");
        }
        Assertions.assertThat((int)numVertex).isEqualTo(3);
    }

    @Test
    void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)10);
        HashMap<String, ResourceProfile> slotSharingGroupResource = new HashMap<String, ResourceProfile>();
        slotSharingGroupResource.put("default", resourceProfile);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction & Serializable)x -> x + 1);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResource);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        int numVertex = 0;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            ++numVertex;
            Assertions.assertThat((Object)jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo((Object)resourceProfile);
        }
        Assertions.assertThat((int)numVertex).isEqualTo(2);
    }

    @Test
    void testNamingOfChainedMultipleInputs() {
        Object[] sources = new String[]{"source-1", "source-2", "source-3"};
        JobGraph graph = this.createGraphWithMultipleInputs(true, (String[])sources);
        JobVertex head = (JobVertex)graph.getVerticesSortedTopologicallyFromSources().iterator().next();
        Assertions.assertThat((Object[])sources).allMatch(source -> head.getOperatorPrettyName().contains((CharSequence)source));
    }

    @Test
    void testNamingOfNonChainedMultipleInputs() {
        String[] sources = new String[]{"source-1", "source-2", "source-3"};
        JobGraph graph = this.createGraphWithMultipleInputs(false, sources);
        JobVertex head = (JobVertex)Iterables.find((Iterable)graph.getVertices(), vertex -> vertex.getInvokableClassName().equals(MultipleInputStreamTask.class.getName()));
        ((AbstractStringAssert)Assertions.assertThat((String)head.getName()).withFailMessage(head.getName(), new Object[0])).doesNotContain(new CharSequence[]{"source-1"});
        ((AbstractStringAssert)Assertions.assertThat((String)head.getOperatorPrettyName()).withFailMessage(head.getOperatorPrettyName(), new Object[0])).doesNotContain(new CharSequence[]{"source-1"});
    }

    public JobGraph createGraphWithMultipleInputs(boolean chain, String ... inputNames) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleInputTransformation transform = new MultipleInputTransformation("mit", (StreamOperatorFactory)new UnusedOperatorFactory(), Types.LONG, env.getParallelism());
        Arrays.stream(inputNames).map(name -> env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), name).getTransformation()).forEach(arg_0 -> ((MultipleInputTransformation)transform).addInput(arg_0));
        transform.setChainingStrategy(chain ? ChainingStrategy.HEAD_WITH_SOURCES : ChainingStrategy.NEVER);
        env.addOperator((Transformation)transform);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    @Test
    void testTreeDescription() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        JobGraph job = this.createJobGraphWithDescription(env, "test source");
        Object[] allVertices = job.getVerticesAsArray();
        Assertions.assertThat((Object[])allVertices).hasSize(1);
        Assertions.assertThat((String)allVertices[0].getOperatorPrettyName()).isEqualTo("test source\n:- x + 1\n:  :- first print of map1\n:  +- second print of map1\n+- x + 2\n   :- first print of map2\n   +- second print of map2\n");
    }

    @Test
    void testTreeDescriptionWithChainedSource() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        JobGraph job = this.createJobGraphWithDescription(env, "test source 1", "test source 2");
        Object[] allVertices = job.getVerticesAsArray();
        Assertions.assertThat((Object[])allVertices).hasSize(1);
        Assertions.assertThat((String)allVertices[0].getOperatorPrettyName()).isEqualTo("operator chained with source [test source 1, test source 2]\n:- x + 1\n:  :- first print of map1\n:  +- second print of map1\n+- x + 2\n   :- first print of map2\n   +- second print of map2\n");
    }

    @Test
    void testCascadingDescription() {
        Configuration config = new Configuration();
        config.set(PipelineOptions.VERTEX_DESCRIPTION_MODE, (Object)PipelineOptions.VertexDescriptionMode.CASCADING);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        JobGraph job = this.createJobGraphWithDescription(env, "test source");
        Object[] allVertices = job.getVerticesAsArray();
        Assertions.assertThat((Object[])allVertices).hasSize(1);
        Assertions.assertThat((String)allVertices[0].getOperatorPrettyName()).isEqualTo("test source -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
    }

    @Test
    void testCascadingDescriptionWithChainedSource() {
        Configuration config = new Configuration();
        config.set(PipelineOptions.VERTEX_DESCRIPTION_MODE, (Object)PipelineOptions.VertexDescriptionMode.CASCADING);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        JobGraph job = this.createJobGraphWithDescription(env, "test source 1", "test source 2");
        Object[] allVertices = job.getVerticesAsArray();
        Assertions.assertThat((Object[])allVertices).hasSize(1);
        Assertions.assertThat((String)allVertices[0].getOperatorPrettyName()).isEqualTo("operator chained with source [test source 1, test source 2] -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
    }

    @Test
    void testNamingWithoutIndex() {
        JobGraph job = this.createStreamGraphForSlotSharingTest(new Configuration()).getJobGraph();
        List allVertices = job.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)allVertices).hasSize(4);
        Assertions.assertThat((String)((JobVertex)allVertices.get(0)).getName()).isEqualTo("Source: source1");
        Assertions.assertThat((String)((JobVertex)allVertices.get(1)).getName()).isEqualTo("Source: source2");
        Assertions.assertThat((String)((JobVertex)allVertices.get(2)).getName()).isEqualTo("map1");
        Assertions.assertThat((String)((JobVertex)allVertices.get(3)).getName()).isEqualTo("map2");
    }

    @Test
    void testNamingWithIndex() {
        Configuration config = new Configuration();
        config.setBoolean(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX, true);
        JobGraph job = this.createStreamGraphForSlotSharingTest(config).getJobGraph();
        List allVertices = job.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)allVertices).hasSize(4);
        Assertions.assertThat((String)((JobVertex)allVertices.get(0)).getName()).isEqualTo("[vertex-0]Source: source1");
        Assertions.assertThat((String)((JobVertex)allVertices.get(1)).getName()).isEqualTo("[vertex-1]Source: source2");
        Assertions.assertThat((String)((JobVertex)allVertices.get(2)).getName()).isEqualTo("[vertex-2]map1");
        Assertions.assertThat((String)((JobVertex)allVertices.get(3)).getName()).isEqualTo("[vertex-3]map2");
    }

    @Test
    void testCacheJobGraph() throws Throwable {
        TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
        env.setParallelism(2);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source = env.fromElements(new Integer[]{1, 2, 3}).name("source");
        CachedDataStream cachedStream = source.map((MapFunction & Serializable)i -> i + 1).name("map-1").map((MapFunction & Serializable)i -> i + 1).name("map-2").cache();
        Assertions.assertThat((Object)cachedStream.getTransformation()).isInstanceOf(CacheTransformation.class);
        CacheTransformation cacheTransformation = (CacheTransformation)cachedStream.getTransformation();
        cachedStream.print().name("print");
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        List allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)allVertices).hasSize(3);
        JobVertex cacheWriteVertex = allVertices.stream().filter(jobVertex -> "CacheWrite".equals(jobVertex.getName())).findFirst().orElseThrow(() -> new RuntimeException("CacheWrite job vertex not found"));
        List inputs = cacheWriteVertex.getInputs();
        Assertions.assertThat((List)inputs).hasSize(1);
        Assertions.assertThat((Comparable)((JobEdge)inputs.get(0)).getDistributionPattern()).isEqualTo((Object)DistributionPattern.POINTWISE);
        Assertions.assertThat((Comparable)((JobEdge)inputs.get(0)).getSource().getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING_PERSISTENT);
        Assertions.assertThat((Comparable)new AbstractID((AbstractID)((JobEdge)inputs.get(0)).getSourceId())).isEqualTo((Object)cacheTransformation.getDatasetId());
        Assertions.assertThat((String)((JobEdge)inputs.get(0)).getSource().getProducer().getName()).isEqualTo("map-1 -> map-2 -> Sink: print");
        env.addCompletedClusterDataset(cacheTransformation.getDatasetId());
        cachedStream.print().name("print");
        jobGraph = env.getStreamGraph().getJobGraph();
        allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)allVertices).hasSize(1);
        Assertions.assertThat((String)((JobVertex)allVertices.get(0)).getName()).isEqualTo("CacheRead -> Sink: print");
        Assertions.assertThat((List)((JobVertex)allVertices.get(0)).getIntermediateDataSetIdsToConsume()).hasSize(1);
        Assertions.assertThat((Comparable)new AbstractID((AbstractID)((JobVertex)allVertices.get(0)).getIntermediateDataSetIdsToConsume().get(0))).isEqualTo((Object)cacheTransformation.getDatasetId());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHybridBroadcastEdgeAlwaysUseFullResultPartition(boolean isSelective) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.disableOperatorChaining();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new BroadcastPartitioner(), isSelective ? StreamExchangeMode.HYBRID_SELECTIVE : StreamExchangeMode.HYBRID_FULL));
        partitionAfterSourceDataStream.addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)verticesSorted).hasSize(2);
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.HYBRID_FULL);
    }

    @Test
    void testHybridPartitionReuse() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource source = env.fromElements((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        DataStream partition1 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.HYBRID_FULL));
        DataStream partition2 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.HYBRID_SELECTIVE));
        DataStream partition3 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.HYBRID_SELECTIVE));
        DataStream partition4 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.HYBRID_FULL));
        DataStream partition5 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.BATCH));
        DataStream partition7 = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.HYBRID_FULL));
        partition1.addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink1");
        partition2.addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink2");
        partition3.addSink((SinkFunction)new DiscardingSink()).setParallelism(3);
        partition4.addSink((SinkFunction)new DiscardingSink()).setParallelism(2);
        partition5.addSink((SinkFunction)new DiscardingSink()).setParallelism(2);
        SingleOutputStreamOperator mapStream = partition7.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream mapPartition = new DataStream(env, (Transformation)new PartitionTransformation(mapStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.HYBRID_FULL));
        mapPartition.addSink((SinkFunction)new DiscardingSink()).name("sink3");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)vertices).hasSize(7);
        JobVertex sourceVertex = (JobVertex)vertices.get(0);
        List producedDataSet = sourceVertex.getProducedDataSets().stream().map(IntermediateDataSet::getId).collect(Collectors.toList());
        Assertions.assertThat(producedDataSet).hasSize(5);
        JobVertex sinkVertex1 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink1"));
        JobVertex sinkVertex2 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink2"));
        JobVertex sinkVertex3 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink3"));
        Assertions.assertThat((Comparable)((JobEdge)sinkVertex2.getInputs().get(0)).getSource().getId()).isEqualTo((Object)((JobEdge)sinkVertex1.getInputs().get(0)).getSource().getId());
        Assertions.assertThat((Comparable)((JobEdge)sinkVertex3.getInputs().get(0)).getSource().getId()).isNotEqualTo((Object)((JobEdge)sinkVertex1.getInputs().get(0)).getSource().getId());
        StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
        List nonChainedOutputs = streamConfig.getOperatorNonChainedOutputs(this.getClass().getClassLoader()).stream().map(NonChainedOutput::getDataSetId).collect(Collectors.toList());
        Assertions.assertThat(nonChainedOutputs).hasSize(4);
        List streamOutputsInOrder = streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader()).stream().map(NonChainedOutput::getDataSetId).collect(Collectors.toList());
        Assertions.assertThat(streamOutputsInOrder).hasSize(5);
        Assertions.assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
    }

    @Test
    void testIntermediateDataSetReuse() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(-1L);
        DataStreamSource source = env.fromElements((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        source.rebalance().addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink1");
        source.rebalance().addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink2");
        source.rebalance().addSink((SinkFunction)new DiscardingSink()).setParallelism(3);
        source.broadcast().addSink((SinkFunction)new DiscardingSink()).setParallelism(2);
        source.forward().addSink((SinkFunction)new DiscardingSink()).setParallelism(1).disableChaining();
        source.forward().addSink((SinkFunction)new DiscardingSink()).setParallelism(1).disableChaining();
        SingleOutputStreamOperator mapStream = source.forward().map((MapFunction & Serializable)value -> value).setParallelism(1);
        mapStream.broadcast().addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink3");
        mapStream.broadcast().addSink((SinkFunction)new DiscardingSink()).setParallelism(2).name("sink4");
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat((List)vertices).hasSize(9);
        JobVertex sourceVertex = (JobVertex)vertices.get(0);
        List producedDataSet = sourceVertex.getProducedDataSets().stream().map(IntermediateDataSet::getId).collect(Collectors.toList());
        Assertions.assertThat(producedDataSet).hasSize(6);
        JobVertex sinkVertex1 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink1"));
        JobVertex sinkVertex2 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink2"));
        JobVertex sinkVertex3 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink3"));
        JobVertex sinkVertex4 = (JobVertex)Preconditions.checkNotNull((Object)StreamingJobGraphGeneratorTest.findJobVertexWithName(vertices, "sink4"));
        Assertions.assertThat((Comparable)((JobEdge)sinkVertex2.getInputs().get(0)).getSource().getId()).isEqualTo((Object)((JobEdge)sinkVertex1.getInputs().get(0)).getSource().getId());
        Assertions.assertThat((Comparable)((JobEdge)sinkVertex4.getInputs().get(0)).getSource().getId()).isEqualTo((Object)((JobEdge)sinkVertex3.getInputs().get(0)).getSource().getId());
        Assertions.assertThat((Comparable)((JobEdge)sinkVertex3.getInputs().get(0)).getSource().getId()).isNotEqualTo((Object)((JobEdge)sinkVertex1.getInputs().get(0)).getSource().getId());
        StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
        List nonChainedOutputs = streamConfig.getOperatorNonChainedOutputs(this.getClass().getClassLoader()).stream().map(NonChainedOutput::getDataSetId).collect(Collectors.toList());
        Assertions.assertThat(nonChainedOutputs).hasSize(5);
        Assertions.assertThat(nonChainedOutputs).doesNotContain((Object[])new IntermediateDataSetID[]{((JobEdge)sinkVertex3.getInputs().get(0)).getSource().getId()});
        List streamOutputsInOrder = streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader()).stream().map(NonChainedOutput::getDataSetId).collect(Collectors.toList());
        Assertions.assertThat(streamOutputsInOrder).hasSize(6);
        Assertions.assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
    }

    @Test
    void testStreamConfigSerializationException() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 2, 3});
        env.addOperator((Transformation)new OneInputTransformation(source.getTransformation(), "serializationTestOperator", (StreamOperatorFactory)new SerializationTestOperatorFactory(false), Types.INT, 1));
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph)).hasRootCauseInstanceOf(IOException.class).hasRootCauseMessage("This operator factory is not serializable.");
    }

    @Test
    public void testCoordinatedSerializationException() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 2, 3});
        env.addOperator((Transformation)new OneInputTransformation(source.getTransformation(), "serializationTestOperator", (StreamOperatorFactory)new SerializationTestOperatorFactory(true), Types.INT, 1));
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph)).hasRootCauseInstanceOf(IOException.class).hasRootCauseMessage("This provider is not serializable.");
    }

    @Test
    void testSupportConcurrentExecutionAttempts() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        source.rebalance().map((MapFunction & Serializable)v -> v).name("map1").map((MapFunction & Serializable)v -> v).name("map2").rebalance().sinkTo((Sink)new PrintSink()).name("sink");
        StreamGraph streamGraph = env.getStreamGraph();
        List streamNodes = streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt(StreamNode::getId)).collect(Collectors.toList());
        StreamNode sourceNode = (StreamNode)streamNodes.get(0);
        StreamNode map1Node = (StreamNode)streamNodes.get(1);
        StreamNode map2Node = (StreamNode)streamNodes.get(2);
        StreamNode sinkNode = (StreamNode)streamNodes.get(3);
        streamGraph.setSupportsConcurrentExecutionAttempts(Integer.valueOf(sourceNode.getId()), true);
        streamGraph.setSupportsConcurrentExecutionAttempts(Integer.valueOf(map1Node.getId()), true);
        streamGraph.setSupportsConcurrentExecutionAttempts(Integer.valueOf(map2Node.getId()), false);
        streamGraph.setSupportsConcurrentExecutionAttempts(Integer.valueOf(sinkNode.getId()), false);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(3);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("source")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("map")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            if (jobVertex.getName().contains("sink")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            Assertions.fail((String)("Unexpected job vertex " + jobVertex.getName()));
        }
    }

    @Test
    void testSinkSupportConcurrentExecutionAttempts() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        source.rebalance().sinkTo((Sink)new TestSinkWithSupportsConcurrentExecutionAttempts()).name("sink");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(6);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("source")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("pre-writer")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("Writer")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("pre-committer")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            if (jobVertex.getName().contains("post-committer")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            if (jobVertex.getName().contains("Committer")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            Assertions.fail((String)("Unexpected job vertex " + jobVertex.getName()));
        }
    }

    @Test
    void testSinkFunctionNotSupportConcurrentExecutionAttempts() {
        StreamingJobGraphGeneratorTest.testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(new TestingSinkFunctionNotSupportConcurrentExecutionAttempts<Integer>(), false);
    }

    @Test
    void testSinkFunctionSupportConcurrentExecutionAttempts() {
        StreamingJobGraphGeneratorTest.testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(new TestingSinkFunctionSupportConcurrentExecutionAttempts<Integer>(), true);
    }

    @Test
    void testOutputFormatNotSupportConcurrentExecutionAttempts() {
        StreamingJobGraphGeneratorTest.testWhetherOutputFormatSupportsConcurrentExecutionAttempts(new TestingOutputFormatNotSupportConcurrentExecutionAttempts<Integer>(), false);
    }

    @Test
    void testOutputFormatSupportConcurrentExecutionAttempts() {
        StreamingJobGraphGeneratorTest.testWhetherOutputFormatSupportsConcurrentExecutionAttempts(new TestingOutputFormatSupportConcurrentExecutionAttempts<Integer>(), true);
    }

    private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(OutputFormat<Integer> outputFormat, boolean isSupported) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        source.rebalance().writeUsingOutputFormat(outputFormat).name("sink");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(2);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("source")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("sink")) {
                if (isSupported) {
                    Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                    continue;
                }
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            Assertions.fail((String)("Unexpected job vertex " + jobVertex.getName()));
        }
    }

    private static void testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(SinkFunction<Integer> function, boolean isSupported) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        source.rebalance().addSink(function).name("sink");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(2);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("source")) {
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                continue;
            }
            if (jobVertex.getName().contains("sink")) {
                if (isSupported) {
                    Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
                    continue;
                }
                Assertions.assertThat((boolean)jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
                continue;
            }
            Assertions.fail((String)("Unexpected job vertex " + jobVertex.getName()));
        }
    }

    private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
        for (JobVertex jobVertex : vertices) {
            if (!jobVertex.getName().contains(name)) continue;
            return jobVertex;
        }
        return null;
    }

    private JobGraph createJobGraphWithDescription(StreamExecutionEnvironment env, String ... inputNames) {
        SingleOutputStreamOperator source;
        env.setParallelism(1);
        if (inputNames.length == 1) {
            source = env.fromElements((Object[])new Long[]{1L, 2L, 3L}).setDescription(inputNames[0]);
        } else {
            MultipleInputTransformation transform = new MultipleInputTransformation("mit", (StreamOperatorFactory)new UnusedOperatorFactory(), Types.LONG, env.getParallelism());
            transform.setDescription("operator chained with source");
            transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
            Arrays.stream(inputNames).map(name -> env.fromSource((Source)new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), name).setDescription(name).getTransformation()).forEach(arg_0 -> ((MultipleInputTransformation)transform).addInput(arg_0));
            source = new DataStream(env, (Transformation)transform);
        }
        SingleOutputStreamOperator map1 = source.map((MapFunction & Serializable)x -> x + 1L).setDescription("x + 1");
        SingleOutputStreamOperator map2 = source.map((MapFunction & Serializable)x -> x + 2L).setDescription("x + 2");
        map1.print().setDescription("first print of map1");
        map1.print().setDescription("second print of map1");
        map2.print().setDescription("first print of map2");
        map2.print().setDescription("second print of map2");
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
    }

    private static List<JobVertex> getExpectedVerticesList(List<JobVertex> vertices) {
        ArrayList<JobVertex> verticesMatched = new ArrayList<JobVertex>();
        List<String> expectedOrder = Arrays.asList("source1", "source2", "map1", "map2");
        for (int i = 0; i < expectedOrder.size(); ++i) {
            for (JobVertex vertex : vertices) {
                if (!vertex.getName().contains(expectedOrder.get(i))) continue;
                verticesMatched.add(vertex);
            }
        }
        return verticesMatched;
    }

    private StreamGraph createStreamGraphForSlotSharingTest(Configuration config) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        env.setBufferTimeout(-1L);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator source1 = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source1");
        source1.rebalance().map((MapFunction & Serializable)v -> v).name("map1");
        SingleOutputStreamOperator source2 = env.fromElements((Object[])new Integer[]{4, 5, 6}).name("source2");
        DataStream partitioned = new DataStream(env, (Transformation)new PartitionTransformation(source2.getTransformation(), (StreamPartitioner)new RebalancePartitioner(), StreamExchangeMode.BATCH));
        partitioned.map((MapFunction & Serializable)v -> v).name("map2");
        return env.getStreamGraph();
    }

    private void assertSameSlotSharingGroup(JobVertex ... vertices) {
        for (int i = 0; i < vertices.length - 1; ++i) {
            Assertions.assertThat((Object)vertices[i + 1].getSlotSharingGroup()).isEqualTo((Object)vertices[i].getSlotSharingGroup());
        }
    }

    private void assertDistinctSharingGroups(JobVertex ... vertices) {
        for (int i = 0; i < vertices.length - 1; ++i) {
            for (int j = i + 1; j < vertices.length; ++j) {
                Assertions.assertThat((Object)vertices[i].getSlotSharingGroup()).isNotEqualTo((Object)vertices[j].getSlotSharingGroup());
            }
        }
    }

    private static Method getSetResourcesMethodAndSetAccessible(Class<?> clazz) throws NoSuchMethodException {
        Method setResourcesMethod = clazz.getDeclaredMethod("setResources", ResourceSpec.class);
        setResourcesMethod.setAccessible(true);
        return setResourcesMethod;
    }

    private static class TestingStreamExecutionEnvironment
    extends StreamExecutionEnvironment {
        Set<AbstractID> completedClusterDatasetIds = new HashSet<AbstractID>();

        private TestingStreamExecutionEnvironment() {
        }

        public void addCompletedClusterDataset(AbstractID id) {
            this.completedClusterDatasetIds.add(id);
        }

        public Set<AbstractID> listCompletedClusterDatasets() {
            return new HashSet<AbstractID>(this.completedClusterDatasetIds);
        }
    }

    private static class TestingSingleOutputStreamOperator<OUT>
    extends SingleOutputStreamOperator<OUT> {
        public TestingSingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<OUT> transformation) {
            super(environment, transformation);
        }
    }

    private static class CoordinatedTransformOperatorFactory
    extends AbstractStreamOperatorFactory<Integer>
    implements CoordinatedOperatorFactory<Integer>,
    OneInputStreamOperatorFactory<Integer, Integer> {
        private CoordinatedTransformOperatorFactory() {
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
            return new OperatorCoordinator.Provider(){

                public OperatorID getOperatorId() {
                    return null;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return null;
                }
            };
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            return null;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return null;
        }
    }

    private static class YieldingTestOperatorFactory<T>
    extends SimpleOperatorFactory<T>
    implements YieldingOperatorFactory<T>,
    OneInputStreamOperatorFactory<T, T> {
        private YieldingTestOperatorFactory() {
            super((StreamOperator)new StreamMap((MapFunction & Serializable)x -> x));
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
        }
    }

    static final class UnusedOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            throw new UnsupportedOperationException();
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    private static class NonSerializableCoordinatorProvider
    implements OperatorCoordinator.Provider {
        private NonSerializableCoordinatorProvider() {
        }

        public OperatorID getOperatorId() {
            throw new UnsupportedOperationException();
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
            throw new UnsupportedOperationException();
        }

        private void writeObject(ObjectOutputStream oos) throws IOException {
            throw new IOException("This provider is not serializable.");
        }
    }

    private static class SerializationTestOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        private SerializationTestOperator() {
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    private static class SerializationTestOperatorFactory
    extends AbstractStreamOperatorFactory<Integer>
    implements CoordinatedOperatorFactory<Integer> {
        private final boolean isOperatorFactorySerializable;

        SerializationTestOperatorFactory(boolean isOperatorFactorySerializable) {
            this.isOperatorFactorySerializable = isOperatorFactorySerializable;
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
            return new NonSerializableCoordinatorProvider();
        }

        private void writeObject(ObjectOutputStream oos) throws IOException {
            if (!this.isOperatorFactorySerializable) {
                throw new IOException("This operator factory is not serializable.");
            }
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            SerializationTestOperator castedOperator = new SerializationTestOperator();
            return (T)((Object)castedOperator);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SerializationTestOperator.class;
        }
    }

    private static class TestSinkWithSupportsConcurrentExecutionAttempts
    implements SupportsConcurrentExecutionAttempts,
    TwoPhaseCommittingSink<Integer, Void>,
    WithPreWriteTopology<Integer>,
    WithPreCommitTopology<Integer, Void>,
    WithPostCommitTopology<Integer, Void> {
        private TestSinkWithSupportsConcurrentExecutionAttempts() {
        }

        public TwoPhaseCommittingSink.PrecommittingSinkWriter<Integer, Void> createWriter(Sink.InitContext context) throws IOException {
            return new TwoPhaseCommittingSink.PrecommittingSinkWriter<Integer, Void>(){

                public Collection<Void> prepareCommit() throws IOException, InterruptedException {
                    return null;
                }

                public void write(Integer element, SinkWriter.Context context) throws IOException, InterruptedException {
                }

                public void flush(boolean endOfInput) throws IOException, InterruptedException {
                }

                public void close() throws Exception {
                }
            };
        }

        public Committer<Void> createCommitter() throws IOException {
            return new Committer<Void>(){

                public void commit(Collection<Committer.CommitRequest<Void>> committables) throws IOException, InterruptedException {
                }

                public void close() throws Exception {
                }
            };
        }

        public SimpleVersionedSerializer<Void> getCommittableSerializer() {
            return new SimpleVersionedSerializer<Void>(){

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Void obj) throws IOException {
                    return new byte[0];
                }

                public Void deserialize(int version, byte[] serialized) throws IOException {
                    return null;
                }
            };
        }

        public void addPostCommitTopology(DataStream<CommittableMessage<Void>> committables) {
            committables.map((MapFunction & Serializable)v -> v).name("post-committer").returns(CommittableMessageTypeInfo.noOutput()).rebalance();
        }

        public DataStream<CommittableMessage<Void>> addPreCommitTopology(DataStream<CommittableMessage<Void>> committables) {
            return committables.map((MapFunction & Serializable)v -> v).name("pre-committer").returns(CommittableMessageTypeInfo.noOutput()).rebalance();
        }

        public DataStream<Integer> addPreWriteTopology(DataStream<Integer> inputDataStream) {
            return inputDataStream.map((MapFunction & Serializable)v -> v).name("pre-writer").rebalance();
        }
    }

    private static class TestingSinkFunctionSupportConcurrentExecutionAttempts<T>
    implements SinkFunction<T>,
    SupportsConcurrentExecutionAttempts {
        private TestingSinkFunctionSupportConcurrentExecutionAttempts() {
        }

        public void invoke(T value, SinkFunction.Context context) throws Exception {
        }
    }

    private static class TestingSinkFunctionNotSupportConcurrentExecutionAttempts<T>
    implements SinkFunction<T> {
        private TestingSinkFunctionNotSupportConcurrentExecutionAttempts() {
        }

        public void invoke(T value, SinkFunction.Context context) throws Exception {
        }
    }

    private static class TestingOutputFormatSupportConcurrentExecutionAttempts<T>
    implements OutputFormat<T>,
    SupportsConcurrentExecutionAttempts {
        private TestingOutputFormatSupportConcurrentExecutionAttempts() {
        }

        public void configure(Configuration parameters) {
        }

        public void writeRecord(T record) throws IOException {
        }

        public void close() throws IOException {
        }
    }

    private static class TestingOutputFormatNotSupportConcurrentExecutionAttempts<T>
    implements OutputFormat<T> {
        private TestingOutputFormatNotSupportConcurrentExecutionAttempts() {
        }

        public void configure(Configuration parameters) {
        }

        public void writeRecord(T record) throws IOException {
        }

        public void close() throws IOException {
        }
    }
}

