/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.CombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReduceWithCombinerITCase
extends MultipleProgramsTestBase {
    public ReduceWithCombinerITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(MultipleProgramsTestBase.TestExecutionMode.CLUSTER);
    }

    @Test
    public void testReduceOnNonKeyedDataset() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple2<Integer, Boolean>> input = this.createNonKeyedInput(env);
        List actual = input.reduceGroup((GroupReduceFunction)new NonKeyedCombReducer()).collect();
        String expected = "10,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    @Test
    public void testForkingReduceOnNonKeyedDataset() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple2<Integer, Boolean>> input = this.createNonKeyedInput(env);
        GroupReduceOperator r1 = input.reduceGroup((GroupReduceFunction)new NonKeyedCombReducer());
        GroupReduceOperator r2 = input.reduceGroup((GroupReduceFunction)new NonKeyedGroupCombReducer());
        List actual = r1.union((DataSet)r2).collect();
        String expected = "10,true\n10,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    @Test
    public void testReduceOnKeyedDataset() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<String, Integer, Boolean>> input = this.createKeyedInput(env);
        List actual = input.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new KeyedCombReducer()).collect();
        String expected = "k1,6,true\nk2,4,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    @Test
    public void testReduceOnKeyedDatasetWithSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<String, Integer, Boolean>> input = this.createKeyedInput(env);
        List actual = input.groupBy((KeySelector)new KeySelectorX()).reduceGroup((GroupReduceFunction)new KeyedCombReducer()).collect();
        String expected = "k1,6,true\nk2,4,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    @Test
    public void testForkingReduceOnKeyedDataset() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<String, Integer, Boolean>> input = this.createKeyedInput(env);
        UnsortedGrouping counts = input.groupBy(new int[]{0});
        GroupReduceOperator r1 = counts.reduceGroup((GroupReduceFunction)new KeyedCombReducer());
        GroupReduceOperator r2 = counts.reduceGroup((GroupReduceFunction)new KeyedGroupCombReducer());
        List actual = r1.union((DataSet)r2).collect();
        String expected = "k1,6,true\nk2,4,true\nk1,6,true\nk2,4,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    @Test
    public void testForkingReduceOnKeyedDatasetWithSelection() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<String, Integer, Boolean>> input = this.createKeyedInput(env);
        UnsortedGrouping counts = input.groupBy((KeySelector)new KeySelectorX());
        GroupReduceOperator r1 = counts.reduceGroup((GroupReduceFunction)new KeyedCombReducer());
        GroupReduceOperator r2 = counts.reduceGroup((GroupReduceFunction)new KeyedGroupCombReducer());
        List actual = r1.union((DataSet)r2).collect();
        String expected = "k1,6,true\nk2,4,true\nk1,6,true\nk2,4,true\n";
        TestBaseUtils.compareResultAsTuples((List)actual, (String)expected);
    }

    private DataSet<Tuple2<Integer, Boolean>> createNonKeyedInput(ExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false), new Tuple2((Object)1, (Object)false))).rebalance();
    }

    private DataSet<Tuple3<String, Integer, Boolean>> createKeyedInput(ExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k2", (Object)1, (Object)false), new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k2", (Object)1, (Object)false), new Tuple3((Object)"k2", (Object)1, (Object)false), new Tuple3((Object)"k1", (Object)1, (Object)false), new Tuple3((Object)"k2", (Object)1, (Object)false))).rebalance();
    }

    private class KeyedGroupCombReducer
    implements GroupCombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>,
    GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> {
        private KeyedGroupCombReducer() {
        }

        public void combine(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
            String key = null;
            int sum = 0;
            boolean flag = true;
            for (Tuple3<String, Integer, Boolean> tuple : values) {
                key = key == null ? (String)tuple.f0 : key;
                sum += ((Integer)tuple.f1).intValue();
                flag &= (Boolean)tuple.f2 == false;
            }
            out.collect((Object)new Tuple3((Object)key, (Object)sum, (Object)flag));
        }

        public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
            String key = null;
            int sum = 0;
            boolean flag = true;
            for (Tuple3<String, Integer, Boolean> tuple : values) {
                key = key == null ? (String)tuple.f0 : key;
                sum += ((Integer)tuple.f1).intValue();
                flag &= ((Boolean)tuple.f2).booleanValue();
            }
            out.collect((Object)new Tuple3((Object)key, (Object)sum, (Object)flag));
        }
    }

    private class KeyedCombReducer
    implements CombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>,
    GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> {
        private KeyedCombReducer() {
        }

        public Tuple3<String, Integer, Boolean> combine(Iterable<Tuple3<String, Integer, Boolean>> values) throws Exception {
            String key = null;
            int sum = 0;
            boolean flag = true;
            for (Tuple3<String, Integer, Boolean> tuple : values) {
                key = key == null ? (String)tuple.f0 : key;
                sum += ((Integer)tuple.f1).intValue();
                flag &= (Boolean)tuple.f2 == false;
            }
            return new Tuple3(key, (Object)sum, (Object)flag);
        }

        public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
            String key = null;
            int sum = 0;
            boolean flag = true;
            for (Tuple3<String, Integer, Boolean> tuple : values) {
                key = key == null ? (String)tuple.f0 : key;
                sum += ((Integer)tuple.f1).intValue();
                flag &= ((Boolean)tuple.f2).booleanValue();
            }
            out.collect((Object)new Tuple3((Object)key, (Object)sum, (Object)flag));
        }
    }

    private static class KeySelectorX
    implements KeySelector<Tuple3<String, Integer, Boolean>, String> {
        private static final long serialVersionUID = 1L;

        private KeySelectorX() {
        }

        public String getKey(Tuple3<String, Integer, Boolean> in) {
            return (String)in.f0;
        }
    }

    private static class NonKeyedGroupCombReducer
    implements GroupCombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
    GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>> {
        private NonKeyedGroupCombReducer() {
        }

        public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
            int sum = 0;
            boolean flag = true;
            for (Tuple2<Integer, Boolean> tuple : values) {
                sum += ((Integer)tuple.f0).intValue();
                flag &= ((Boolean)tuple.f1).booleanValue();
            }
            out.collect((Object)new Tuple2((Object)sum, (Object)flag));
        }

        public void combine(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
            int sum = 0;
            boolean flag = true;
            for (Tuple2<Integer, Boolean> tuple : values) {
                sum += ((Integer)tuple.f0).intValue();
                flag &= (Boolean)tuple.f1 == false;
            }
            out.collect((Object)new Tuple2((Object)sum, (Object)flag));
        }
    }

    private static class NonKeyedCombReducer
    implements CombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
    GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>> {
        private NonKeyedCombReducer() {
        }

        public Tuple2<Integer, Boolean> combine(Iterable<Tuple2<Integer, Boolean>> values) throws Exception {
            int sum = 0;
            boolean flag = true;
            for (Tuple2<Integer, Boolean> tuple : values) {
                sum += ((Integer)tuple.f0).intValue();
                flag &= (Boolean)tuple.f1 == false;
            }
            return new Tuple2((Object)sum, (Object)flag);
        }

        public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
            int sum = 0;
            boolean flag = true;
            for (Tuple2<Integer, Boolean> tuple : values) {
                sum += ((Integer)tuple.f0).intValue();
                flag &= ((Boolean)tuple.f1).booleanValue();
            }
            out.collect((Object)new Tuple2((Object)sum, (Object)flag));
        }
    }
}

