/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class WindowTableFunctionJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTable1Ddl = "CREATE TABLE MyTable (\n a INT,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTable1Ddl);
        String srcTable2Ddl = "CREATE TABLE MyTable2 (\n a INT,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTable2Ddl);
    }

    @Test
    public void testIndividualWindowTVF() {
        String sinkTableDdl = "CREATE TABLE MySink (\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n a INT,\n b BIGINT,\n c VARCHAR\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))");
    }

    @Test
    public void testIndividualWindowTVFProcessingTime() {
        String sinkTableDdl = "CREATE TABLE MySink (\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n a INT,\n b BIGINT,\n c VARCHAR\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))");
    }

    @Test
    public void testFollowedByWindowJoin() {
        String sinkTableDdl = "CREATE TABLE MySink (\n window_start TIMESTAMP(3) NOT NULL,\n window_end TIMESTAMP(3) NOT NULL,\n l_a INT,\n l_b BIGINT,\n l_c VARCHAR,\n r_a INT,\n r_b BIGINT,\n r_c VARCHAR\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  L.window_start,\n  L.window_end,\n  L.a,\n  L.b,\n  L.c,\n  R.a,\n  R.b,\n  R.c\nFROM (\n  SELECT\n    window_start,\n    window_end,\n    a,\n    b,\n    c\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  WHERE b > 10\n) L\nJOIN (\n  SELECT\n    window_start,\n    window_end,\n    a,\n    b,\n    c\n  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  WHERE b > 10\n) R\nON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a");
    }

    @Test
    public void testFollowedByWindowRank() {
        String sinkTableDdl = "CREATE TABLE MySink (\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n a INT,\n b BIGINT,\n c VARCHAR\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM (\n  SELECT\n    *,\n   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)))\nWHERE rownum <= 3");
    }

    @Test
    public void testFollowedByWindowDeduplicate() {
        String sinkTableDdl = "CREATE TABLE MySink (\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n a INT,\n b BIGINT,\n c VARCHAR\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  window_start,\n  window_end,\n  a,\n  b,\n  c\nFROM (\n  SELECT\n    *,\n   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY rowtime DESC) as rownum\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)))\nWHERE rownum <= 1");
    }
}

