/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.batch.sql;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class MatchRecognizeITCase {
    private StreamExecutionEnvironment env;
    private StreamTableEnvironment tEnv;

    @Before
    public void setup() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.inBatchMode());
        this.tEnv.getConfig().set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, (Object)false);
    }

    @Test
    public void testSimplePattern() {
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a"}), Row.of((Object[])new Object[]{2, "z"}), Row.of((Object[])new Object[]{3, "b"}), Row.of((Object[])new Object[]{4, "c"}), Row.of((Object[])new Object[]{5, "d"}), Row.of((Object[])new Object[]{6, "a"}), Row.of((Object[])new Object[]{7, "b"}), Row.of((Object[])new Object[]{8, "c"}), Row.of((Object[])new Object[]{9, "h"})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT T.aid, T.bid, T.cid\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    `A\"`.id AS aid,\n    l.id AS bid,\n    C.id AS cid\n  PATTERN (`A\"` l C)\n  DEFINE\n    `A\"` AS name = 'a',\n    l AS name = 'b',\n    C AS name = 'c'\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{6, 7, 8})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testSimplePatternWithNulls() {
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", null}), Row.of((Object[])new Object[]{2, "b", null}), Row.of((Object[])new Object[]{3, "c", null}), Row.of((Object[])new Object[]{4, "d", null}), Row.of((Object[])new Object[]{5, null, null}), Row.of((Object[])new Object[]{6, "a", null}), Row.of((Object[])new Object[]{7, "b", null}), Row.of((Object[])new Object[]{8, "c", null}), Row.of((Object[])new Object[]{9, null, null})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name", "nullField"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.STRING})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).column("nullField", (AbstractDataType)DataTypes.STRING()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT T.aid, T.bNull, T.cid, T.aNull\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    A.id AS aid,\n    A.nullField AS aNull,\n    LAST(B.nullField) AS bNull,\n    C.id AS cid\n  PATTERN (A B C)\n  DEFINE\n    A AS name = 'a' AND nullField IS NULL,\n    B AS name = 'b' AND LAST(A.nullField) IS NULL,\n    C AS name = 'c'\n) AS T");
        Assertions.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{1, null, 3, null}), Row.of((Object[])new Object[]{6, null, 8, null})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testCodeSplitsAreProperlyGenerated() {
        this.tEnv.getConfig().setMaxGeneratedCodeLength(Integer.valueOf(1));
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", "key1", "second_key3"}), Row.of((Object[])new Object[]{2, "b", "key1", "second_key3"}), Row.of((Object[])new Object[]{3, "c", "key1", "second_key3"}), Row.of((Object[])new Object[]{4, "d", "key", "second_key"}), Row.of((Object[])new Object[]{5, "e", "key", "second_key"}), Row.of((Object[])new Object[]{6, "a", "key2", "second_key4"}), Row.of((Object[])new Object[]{7, "b", "key2", "second_key4"}), Row.of((Object[])new Object[]{8, "c", "key2", "second_key4"}), Row.of((Object[])new Object[]{9, "f", "key", "second_key"})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name", "key1", "key2"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.STRING, Types.STRING})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).column("key1", (AbstractDataType)DataTypes.STRING()).column("key2", (AbstractDataType)DataTypes.STRING()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM MyTable\nMATCH_RECOGNIZE (\n  PARTITION BY key1, key2\n  ORDER BY proctime\n  MEASURES\n    A.id AS aid,\n    A.key1 AS akey1,\n    LAST(B.id) AS bid,\n    C.id AS cid,\n    C.key2 AS ckey2\n  PATTERN (A B C)\n  DEFINE\n    A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n    B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n    C AS name = 'c' AND LAST(A.name) = 'a'\n) AS T");
        List actual = CollectionUtil.iteratorToList((Iterator)tableResult.collect());
        actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0))));
        Assertions.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{"key1", "second_key3", 1, "key1", 2, 3, "second_key3"}), Row.of((Object[])new Object[]{"key2", "second_key4", 6, "key2", 7, 8, "second_key4"})), (Object)actual);
    }

    @Test
    public void testLogicalOffsets() {
        this.tEnv.createTemporaryView("Ticker", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{"ACME", 1L, 19, 1}), Row.of((Object[])new Object[]{"ACME", 2L, 17, 2}), Row.of((Object[])new Object[]{"ACME", 3L, 13, 3}), Row.of((Object[])new Object[]{"ACME", 4L, 20, 4}), Row.of((Object[])new Object[]{"ACME", 5L, 20, 5}), Row.of((Object[])new Object[]{"ACME", 6L, 26, 6}), Row.of((Object[])new Object[]{"ACME", 7L, 20, 7}), Row.of((Object[])new Object[]{"ACME", 8L, 25, 8})}).returns(Types.ROW_NAMED((String[])new String[]{"symbol", "tstamp", "price", "tax"}, (TypeInformation[])new TypeInformation[]{Types.STRING, Types.LONG, Types.INT, Types.INT})), Schema.newBuilder().column("symbol", (AbstractDataType)DataTypes.STRING()).column("tstamp", (AbstractDataType)DataTypes.BIGINT()).column("price", (AbstractDataType)DataTypes.INT()).column("tax", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM Ticker\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    FIRST(DOWN.tstamp) AS start_tstamp,\n    LAST(DOWN.tstamp) AS bottom_tstamp,\n    UP.tstamp AS end_tstamp,\n    FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n    UP.price + UP.tax AS end_total\n  ONE ROW PER MATCH\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (DOWN{2,} UP)\n  DEFINE\n    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n    UP AS price < FIRST(DOWN.price)\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{6L, 7L, 8L, 33, 33})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testLogicalOffsetsWithStarVariable() {
        this.tEnv.createTemporaryView("Ticker", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "ACME", 1L, 20}), Row.of((Object[])new Object[]{2, "ACME", 2L, 19}), Row.of((Object[])new Object[]{3, "ACME", 3L, 18}), Row.of((Object[])new Object[]{4, "ACME", 4L, 17}), Row.of((Object[])new Object[]{5, "ACME", 5L, 16}), Row.of((Object[])new Object[]{6, "ACME", 6L, 15}), Row.of((Object[])new Object[]{7, "ACME", 7L, 14}), Row.of((Object[])new Object[]{8, "ACME", 8L, 20})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "symbol", "tstamp", "price"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.LONG, Types.INT})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("symbol", (AbstractDataType)DataTypes.STRING()).column("tstamp", (AbstractDataType)DataTypes.BIGINT()).column("price", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM Ticker\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    FIRST(id, 0) as id0,\n    FIRST(id, 1) as id1,\n    FIRST(id, 2) as id2,\n    FIRST(id, 3) as id3,\n    FIRST(id, 4) as id4,\n    FIRST(id, 5) as id5,\n    FIRST(id, 6) as id6,\n    FIRST(id, 7) as id7,\n    LAST(id, 0) as id8,\n    LAST(id, 1) as id9,\n    LAST(id, 2) as id10,\n    LAST(id, 3) as id11,\n    LAST(id, 4) as id12,\n    LAST(id, 5) as id13,\n    LAST(id, 6) as id14,\n    LAST(id, 7) as id15\n  ONE ROW PER MATCH\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (`DOWN\"`{2,} UP)\n  DEFINE\n    `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n    UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testLogicalOffsetOutsideOfRangeInMeasures() {
        this.tEnv.createTemporaryView("Ticker", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{"ACME", 1L, 19, 1}), Row.of((Object[])new Object[]{"ACME", 2L, 17, 2}), Row.of((Object[])new Object[]{"ACME", 3L, 13, 3}), Row.of((Object[])new Object[]{"ACME", 4L, 20, 4})}).returns(Types.ROW_NAMED((String[])new String[]{"symbol", "tstamp", "price", "tax"}, (TypeInformation[])new TypeInformation[]{Types.STRING, Types.LONG, Types.INT, Types.INT})), Schema.newBuilder().column("symbol", (AbstractDataType)DataTypes.STRING()).column("tstamp", (AbstractDataType)DataTypes.BIGINT()).column("price", (AbstractDataType)DataTypes.INT()).column("tax", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM Ticker\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    FIRST(DOWN.price) as first,\n    LAST(DOWN.price) as last,\n    FIRST(DOWN.price, 5) as nullPrice\n  ONE ROW PER MATCH\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (DOWN{2,} UP)\n  DEFINE\n    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n    UP AS price > LAST(DOWN.price)\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{19, 13, null})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testAggregates() {
        this.tEnv.getConfig().setMaxGeneratedCodeLength(Integer.valueOf(1));
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", 1, 0.8, 1}), Row.of((Object[])new Object[]{2, "z", 2, 0.8, 3}), Row.of((Object[])new Object[]{3, "b", 1, 0.8, 2}), Row.of((Object[])new Object[]{4, "c", 1, 0.8, 5}), Row.of((Object[])new Object[]{5, "d", 4, 0.1, 5}), Row.of((Object[])new Object[]{6, "a", 2, 1.5, 2}), Row.of((Object[])new Object[]{7, "b", 2, 0.8, 3}), Row.of((Object[])new Object[]{8, "c", 1, 0.8, 2}), Row.of((Object[])new Object[]{9, "h", 4, 0.8, 3}), Row.of((Object[])new Object[]{10, "h", 4, 0.8, 3}), Row.of((Object[])new Object[]{11, "h", 2, 0.8, 3}), Row.of((Object[])new Object[]{12, "h", 2, 0.8, 3})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name", "price", "rate", "weight"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.INT, Types.DOUBLE, Types.INT})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).column("price", (AbstractDataType)DataTypes.INT()).column("rate", (AbstractDataType)DataTypes.DOUBLE()).column("weight", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        this.tEnv.createTemporarySystemFunction("weightedAvg", (UserDefinedFunction)new JavaUserDefinedAggFunctions.WeightedAvg());
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    FIRST(id) as startId,\n    SUM(A.price) AS sumA,\n    COUNT(D.price) AS countD,\n    SUM(D.price) as sumD,\n    weightedAvg(price, weight) as wAvg,\n    AVG(B.price) AS avgB,\n    SUM(B.price * B.rate) as sumExprB,\n    LAST(id) as endId\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (A+ B+ C D? E)\n  DEFINE\n    A AS SUM(A.price) < 6,\n    B AS SUM(B.price * B.rate) < SUM(A.price) AND\n      SUM(B.price * B.rate) > 0.2 AND\n      SUM(B.price) >= 1 AND\n      AVG(B.price) >= 1 AND\n      weightedAvg(price, weight) > 1\n) AS T");
        Assertions.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{1, 5, 0L, null, 2L, 3, 3.4, 8}), Row.of((Object[])new Object[]{9, 4, 0L, null, 3L, 4, 3.2, 12})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testAggregatesWithNullInputs() {
        this.tEnv.getConfig().setMaxGeneratedCodeLength(Integer.valueOf(1));
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", 10}), Row.of((Object[])new Object[]{2, "z", 10}), Row.of((Object[])new Object[]{3, "b", null}), Row.of((Object[])new Object[]{4, "c", null}), Row.of((Object[])new Object[]{5, "d", 3}), Row.of((Object[])new Object[]{6, "c", 3}), Row.of((Object[])new Object[]{7, "c", 3}), Row.of((Object[])new Object[]{8, "c", 3}), Row.of((Object[])new Object[]{9, "c", 2})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name", "price"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.INT})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).column("price", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        this.tEnv.createTemporarySystemFunction("weightedAvg", (UserDefinedFunction)new JavaUserDefinedAggFunctions.WeightedAvg());
        TableResult tableResult = this.tEnv.executeSql("SELECT *\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    SUM(A.price) as sumA,\n    COUNT(A.id) as countAId,\n    COUNT(A.price) as countAPrice,\n    COUNT(*) as countAll,\n    COUNT(price) as countAllPrice,\n    LAST(id) as endId\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (A+ C)\n  DEFINE\n    A AS SUM(A.price) < 30,\n    C AS C.name = 'c'\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{29, 7L, 5L, 8L, 6L, 8})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testAccessingCurrentTime() {
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a"})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).columnByExpression("proctime", "PROCTIME()").build()));
        TableResult tableResult = this.tEnv.executeSql("SELECT T.aid\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    A.id AS aid,\n    A.proctime AS aProctime,\n    LAST(A.proctime + INTERVAL '1' second) as calculatedField\n  PATTERN (A)\n  DEFINE\n    A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n) AS T");
        Assertions.assertEquals(Collections.singletonList(Row.of((Object[])new Object[]{1})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    @Test
    public void testUserDefinedFunctions() {
        this.tEnv.getConfig().setMaxGeneratedCodeLength(Integer.valueOf(1));
        this.tEnv.createTemporaryView("MyTable", this.tEnv.fromDataStream((DataStream)this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", 1}), Row.of((Object[])new Object[]{2, "a", 1}), Row.of((Object[])new Object[]{3, "a", 1}), Row.of((Object[])new Object[]{4, "a", 1}), Row.of((Object[])new Object[]{5, "a", 1}), Row.of((Object[])new Object[]{6, "b", 1}), Row.of((Object[])new Object[]{7, "a", 1}), Row.of((Object[])new Object[]{8, "a", 1}), Row.of((Object[])new Object[]{9, "f", 1})}).returns(Types.ROW_NAMED((String[])new String[]{"id", "name", "price"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING, Types.INT})), Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).column("name", (AbstractDataType)DataTypes.STRING()).column("price", (AbstractDataType)DataTypes.INT()).columnByExpression("proctime", "PROCTIME()").build()));
        this.tEnv.createTemporarySystemFunction("prefix", (UserDefinedFunction)new PrefixingScalarFunc());
        this.tEnv.createTemporarySystemFunction("countFrom", (UserDefinedFunction)new RichAggFunc());
        String prefix = "PREF";
        int startFrom = 4;
        Configuration jobParameters = new Configuration();
        jobParameters.setString("prefix", prefix);
        jobParameters.setString("start", Integer.toString(startFrom));
        this.env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)jobParameters);
        TableResult tableResult = this.tEnv.executeSql(String.format("SELECT *\nFROM MyTable\nMATCH_RECOGNIZE (\n  ORDER BY proctime\n  MEASURES\n    FIRST(id) as firstId,\n    prefix(A.name) as prefixedNameA,\n    countFrom(A.price) as countFromA,\n    LAST(id) as lastId\n  AFTER MATCH SKIP PAST LAST ROW\n  PATTERN (A+ C)\n  DEFINE\n    A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n) AS T", prefix, 8));
        Assertions.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{1, "PREF:a", 8, 5}), Row.of((Object[])new Object[]{7, "PREF:a", 6, 9})), (Object)CollectionUtil.iteratorToList((Iterator)tableResult.collect()));
    }

    public static class RichAggFunc
    extends AggregateFunction<Integer, CountAcc> {
        private Integer start = 0;

        public void open(FunctionContext context) throws Exception {
            this.start = Integer.valueOf(context.getJobParameter("start", "0"));
        }

        public void close() throws Exception {
            this.start = 0;
        }

        public CountAcc createAccumulator() {
            return new CountAcc(this.start);
        }

        public Integer getValue(CountAcc accumulator) {
            return accumulator.count;
        }

        public void accumulate(CountAcc countAcc, Integer value) {
            CountAcc countAcc2 = countAcc;
            countAcc2.count = countAcc2.count + value;
        }
    }

    public static class CountAcc {
        public Integer count;

        public CountAcc(Integer count) {
            this.count = count;
        }
    }

    public static class PrefixingScalarFunc
    extends ScalarFunction {
        private String prefix = "ERROR_VALUE";

        public void open(FunctionContext context) throws Exception {
            this.prefix = context.getJobParameter("prefix", "");
        }

        public String eval(String value) {
            return String.format("%s:%s", this.prefix, value);
        }
    }
}

