/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.util.Iterator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;

public class CoGroupConnectedComponentsITCase
extends JavaProgramTestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private static final int MAX_ITERATIONS = 100;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : CoGroupConnectedComponentsITCase.getResultReader((String)this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Operator initialVertices = env.readCsvFile(this.verticesPath).fieldDelimiter(" ").types(Long.class).name("Vertices");
        Operator edges = env.readCsvFile(this.edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).name("Edges");
        Operator verticesWithId = initialVertices.map((MapFunction)new MapFunction<Tuple1<Long>, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
                return new Tuple2(value.f0, value.f0);
            }
        }).name("Assign Vertex Ids");
        DeltaIteration iteration = verticesWithId.iterateDelta((DataSet)verticesWithId, 100, new int[]{0});
        JoinOperator joinWithNeighbors = (JoinOperator)iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
                return new Tuple2(second.f1, first.f1);
            }
        }).name("Join Candidate Id With Neighbor");
        CoGroupOperator minAndUpdate = (CoGroupOperator)joinWithNeighbors.coGroup((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new MinIdAndUpdate()).name("min Id and Update");
        iteration.closeWith((DataSet)minAndUpdate, (DataSet)minAndUpdate).writeAsCsv(this.resultPath, "\n", " ").name("Result");
        env.execute("Workset Connected Components");
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f1->f1"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0->f0"})
    private static final class MinIdAndUpdate
    implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private MinIdAndUpdate() {
        }

        public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws Exception {
            Iterator<Tuple2<Long, Long>> current = second.iterator();
            if (!current.hasNext()) {
                throw new Exception("Error: Id not encountered before.");
            }
            Tuple2<Long, Long> old = current.next();
            long oldId = (Long)old.f1;
            long minimumComponentID = Long.MAX_VALUE;
            for (Tuple2<Long, Long> candidate : first) {
                long candidateComponentID = (Long)candidate.f1;
                if (candidateComponentID >= minimumComponentID) continue;
                minimumComponentID = candidateComponentID;
            }
            if (minimumComponentID < oldId) {
                out.collect((Object)new Tuple2(old.f0, (Object)minimumComponentID));
            }
        }
    }
}

