/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class WindowAggregateJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTableDdl = "CREATE TABLE MyTable (\n a INT,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTableDdl);
    }

    @Test
    public void testEventTimeTumbleWindow() {
        this.tEnv.createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n cnt BIGINT,\n sum_a INT,\n distinct_cnt BIGINT,\n concat_distinct STRING\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  window_start,\n  window_end,\n  COUNT(*),\n  SUM(a),\n  COUNT(DISTINCT c),\n  concat_distinct_agg(c)\nFROM TABLE(\n   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testEventTimeTumbleWindowWithOffset() {
        this.tEnv.createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n cnt BIGINT,\n sum_a INT,\n distinct_cnt BIGINT,\n concat_distinct STRING\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  window_start,\n  window_end,\n  COUNT(*),\n  SUM(a),\n  COUNT(DISTINCT c),\n  concat_distinct_agg(c)\nFROM TABLE(\n   TUMBLE(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '5' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testProcTimeTumbleWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_end TIMESTAMP(3),\n cnt BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  window_end,\n  COUNT(*)\nFROM TABLE(\n   TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testEventTimeHopWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n cnt BIGINT,\n sum_a INT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  COUNT(c),\n  SUM(a)\nFROM TABLE(\n   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testEventTimeHopWindowWithOffset() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n cnt BIGINT,\n sum_a INT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  COUNT(c),\n  SUM(a)\nFROM TABLE(\n   HOP(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '10' SECOND,\n     INTERVAL '5' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testProcTimeHopWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n sum_a INT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  SUM(a)\nFROM TABLE(\n   HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testEventTimeCumulateWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_end TIMESTAMP(3),\n cnt BIGINT,\n sum_a INT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  window_end,\n  COUNT(c),\n  SUM(a)\nFROM TABLE(\n   CUMULATE(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '15' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testEventTimeCumulateWindowWithOffset() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_end TIMESTAMP(3),\n cnt BIGINT,\n sum_a INT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  window_end,\n  COUNT(c),\n  SUM(a)\nFROM TABLE(\n   CUMULATE(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '15' SECOND,\n     INTERVAL '15' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testProcTimeCumulateWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n cnt BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  COUNT(c)\nFROM TABLE(\n   CUMULATE(\n     TABLE MyTable,\n     DESCRIPTOR(proctime),\n     INTERVAL '5' SECOND,\n     INTERVAL '15' SECOND))\nGROUP BY b, window_start, window_end");
    }

    @Test
    public void testDistinctSplitEnabled() {
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, (Object)true);
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  window_start timestamp(3),\n  window_end timestamp(3),\n  cnt_star bigint,\n  sum_b bigint,\n  cnt_distinct_c bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select a,    window_start,    window_end,    count(*),    sum(b),    count(distinct c) AS uv FROM TABLE (   CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) GROUP BY a, window_start, window_end");
    }
}

