/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class FilterITCase
extends MultipleProgramsTestBase {
    public FilterITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testAllRejectingFilter() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter1());
        List result = filterDs.collect();
        String expected = "\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testAllPassingFilter() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter2());
        List result = filterDs.collect();
        String expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n6,3,Luke Skywalker\n7,4,Comment#1\n8,4,Comment#2\n9,4,Comment#3\n10,4,Comment#4\n11,5,Comment#5\n12,5,Comment#6\n13,5,Comment#7\n14,5,Comment#8\n15,5,Comment#9\n16,6,Comment#10\n17,6,Comment#11\n18,6,Comment#12\n19,6,Comment#13\n20,6,Comment#14\n21,6,Comment#15\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testFilterOnStringTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter3());
        List result = filterDs.collect();
        String expected = "3,2,Hello world\n4,3,Hello world, how are you?\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testFilterOnIntegerTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter4());
        List result = filterDs.collect();
        String expected = "2,2,Hello\n4,3,Hello world, how are you?\n6,3,Luke Skywalker\n8,4,Comment#2\n10,4,Comment#4\n12,5,Comment#6\n14,5,Comment#8\n16,6,Comment#10\n18,6,Comment#12\n20,6,Comment#14\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testFilterBasicType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter5());
        List result = filterDs.collect();
        String expected = "Hi\nHello\nHello world\nHello world, how are you?\n";
        FilterITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testFilterOnCustomType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter6());
        List result = filterDs.collect();
        String expected = "3,3,Hello world, how are you?\n3,4,I am fine.\n3,5,Luke Skywalker\n";
        FilterITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRichFilterOnStringTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator filterDs = ds.filter((FilterFunction)new RichFilter1()).withBroadcastSet(ints, "ints");
        List result = filterDs.collect();
        String expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testFilterWithBroadcastVariables() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator filterDs = ds.filter((FilterFunction)new RichFilter2()).withBroadcastSet(intDs, "ints");
        List result = filterDs.collect();
        String expected = "11,5,Comment#5\n12,5,Comment#6\n13,5,Comment#7\n14,5,Comment#8\n15,5,Comment#9\n";
        FilterITCase.compareResultAsTuples((List)result, (String)expected);
    }

    private static class RichFilter2
    extends RichFilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private int broadcastSum = 0;

        private RichFilter2() {
        }

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            for (Integer i : ints) {
                this.broadcastSum += i.intValue();
            }
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Long)value.f1 == (long)(this.broadcastSum / 11);
        }
    }

    private static class RichFilter1
    extends RichFilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        int literal = -1;

        private RichFilter1() {
        }

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            Iterator iterator = ints.iterator();
            while (iterator.hasNext()) {
                int i = (Integer)iterator.next();
                this.literal = this.literal < i ? i : this.literal;
            }
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Integer)value.f0 < this.literal;
        }
    }

    private static class Filter6
    implements FilterFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private Filter6() {
        }

        public boolean filter(CollectionDataSets.CustomType value) throws Exception {
            return value.myString.contains("a");
        }
    }

    private static class Filter5
    implements FilterFunction<String> {
        private static final long serialVersionUID = 1L;

        private Filter5() {
        }

        public boolean filter(String value) throws Exception {
            return value.startsWith("H");
        }
    }

    private static class Filter4
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Filter4() {
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Integer)value.f0 % 2 == 0;
        }
    }

    private static class Filter3
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Filter3() {
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return ((String)value.f2).contains("world");
        }
    }

    private static class Filter2
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Filter2() {
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return true;
        }
    }

    private static class Filter1
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Filter1() {
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return false;
        }
    }
}

