/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AdaptiveBatchSchedulerITCase {
    private static final int DEFAULT_MAX_PARALLELISM = 4;
    private static final int SOURCE_PARALLELISM_1 = 2;
    private static final int SOURCE_PARALLELISM_2 = 8;
    private static final int NUMBERS_TO_PRODUCE = 10000;
    private static ConcurrentLinkedQueue<Map<Long, Long>> numberCountResults;
    private Map<Long, Long> expectedResult;

    AdaptiveBatchSchedulerITCase() {
    }

    @BeforeEach
    void setUp() {
        this.expectedResult = LongStream.range(0L, 10000L).boxed().collect(Collectors.toMap(Function.identity(), i -> 2L));
        numberCountResults = new ConcurrentLinkedQueue();
    }

    @Test
    void testSchedulingWithUnknownResource() throws Exception {
        this.testScheduling(false);
    }

    @Test
    void testSchedulingWithFineGrainedResource() throws Exception {
        this.testScheduling(true);
    }

    @Test
    void testParallelismOfForwardGroupLargerThanGlobalMaxParallelism() throws Exception {
        Configuration configuration = AdaptiveBatchSchedulerITCase.createConfiguration();
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((Configuration)configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(8);
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(8).name("source").slotSharingGroup("group1");
        source.forward().map((MapFunction)new NumberCounter()).name("map").slotSharingGroup("group2");
        env.execute();
    }

    @Test
    void testDifferentConsumerParallelism() throws Exception {
        Configuration configuration = AdaptiveBatchSchedulerITCase.createConfiguration();
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((Configuration)configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(8);
        SingleOutputStreamOperator source2 = env.fromSequence(0L, 9999L).setParallelism(8).name("source2").slotSharingGroup("group2");
        SingleOutputStreamOperator source1 = env.fromSequence(0L, 9999L).setParallelism(8).name("source1").slotSharingGroup("group1");
        source1.forward().union(new DataStream[]{source2}).map((MapFunction)new NumberCounter()).name("map1").slotSharingGroup("group3");
        source2.map((MapFunction)new NumberCounter()).name("map2").slotSharingGroup("group4");
        env.execute();
    }

    private void testScheduling(Boolean isFineGrained) throws Exception {
        this.executeJob(isFineGrained);
        Map<Long, Long> numberCountResultMap = numberCountResults.stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v1 + v2));
        for (int i = 0; i < 10000; ++i) {
            if (numberCountResultMap.get(i) == this.expectedResult.get(i)) continue;
            System.out.println(i + ": " + numberCountResultMap.get(i));
        }
        Assertions.assertThat(numberCountResultMap).isEqualTo(this.expectedResult);
    }

    private void executeJob(Boolean isFineGrained) throws Exception {
        Configuration configuration = AdaptiveBatchSchedulerITCase.createConfiguration();
        if (isFineGrained.booleanValue()) {
            configuration.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, (Object)true);
            configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, (Object)true);
        }
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((Configuration)configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        ArrayList<SlotSharingGroup> slotSharingGroups = new ArrayList<SlotSharingGroup>();
        for (int i = 0; i < 3; ++i) {
            SlotSharingGroup group = isFineGrained != false ? SlotSharingGroup.newBuilder((String)("group" + i)).setCpuCores(1.0).setTaskHeapMemory(MemorySize.parse((String)"100m")).build() : SlotSharingGroup.newBuilder((String)("group" + i)).build();
            slotSharingGroups.add(group);
        }
        SingleOutputStreamOperator source1 = env.fromSequence(0L, 9999L).setParallelism(2).name("source1").slotSharingGroup((SlotSharingGroup)slotSharingGroups.get(0));
        SingleOutputStreamOperator source2 = env.fromSequence(0L, 9999L).setParallelism(8).name("source2").slotSharingGroup((SlotSharingGroup)slotSharingGroups.get(1));
        source1.union(new DataStream[]{source2}).rescale().map((MapFunction)new NumberCounter()).name("map").slotSharingGroup((SlotSharingGroup)slotSharingGroups.get(2));
        env.execute();
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "0");
        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
        configuration.setInteger(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 4);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, (Object)MemorySize.parse((String)"150kb"));
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4kb"));
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)1);
        return configuration;
    }

    private static class NumberCounter
    extends RichMapFunction<Long, Long> {
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        private NumberCounter() {
        }

        public Long map(Long value) throws Exception {
            this.numberCountResult.put(value, this.numberCountResult.getOrDefault(value, 0L) + 1L);
            return value;
        }

        public void close() throws Exception {
            numberCountResults.add(this.numberCountResult);
        }
    }
}

