/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u000193A!\u0001\u0002\u0001+\t!t+\u0019;fe6\f'o[!tg&<g.\u001a:DQ\u0006tw-\u001a7pO:{'/\\1mSj,GK]1ogB|7/\u001a*vY\u0016$Vm\u001d;\u000b\u0005\r!\u0011AB:ue\u0016\fWN\u0003\u0002\u0006\r\u0005A\u0001\u000f[=tS\u000e\fGN\u0003\u0002\b\u0011\u0005)!/\u001e7fg*\u0011\u0011BC\u0001\u0005a2\fgN\u0003\u0002\f\u0019\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0007\u000f\u0003\u0015!\u0018M\u00197f\u0015\ty\u0001#A\u0003gY&t7N\u0003\u0002\u0012%\u00051\u0011\r]1dQ\u0016T\u0011aE\u0001\u0004_J<7\u0001A\n\u0003\u0001Y\u0001\"a\u0006\u000e\u000e\u0003aQ!!\u0007\u0006\u0002\u000bU$\u0018\u000e\\:\n\u0005mA\"!\u0004+bE2,G+Z:u\u0005\u0006\u001cX\rC\u0003\u001e\u0001\u0011\u0005a$\u0001\u0004=S:LGO\u0010\u000b\u0002?A\u0011\u0001\u0005A\u0007\u0002\u0005!9!\u0005\u0001b\u0001\n\u0013\u0019\u0013\u0001B;uS2,\u0012\u0001\n\t\u0003/\u0015J!A\n\r\u0003'M#(/Z1n)\u0006\u0014G.\u001a+fgR,F/\u001b7\t\r!\u0002\u0001\u0015!\u0003%\u0003\u0015)H/\u001b7!\u0011\u0015Q\u0003\u0001\"\u0001,\u0003\u0015\u0019X\r^;q)\u0005a\u0003CA\u00171\u001b\u0005q#\"A\u0018\u0002\u000bM\u001c\u0017\r\\1\n\u0005Er#\u0001B+oSRD#!K\u001a\u0011\u0005Q:T\"A\u001b\u000b\u0005Y\u0012\u0012!\u00026v]&$\u0018B\u0001\u001d6\u0005\u0019\u0011UMZ8sK\")!\b\u0001C\u0001W\u0005\u0001C/Z:u!V\u001c\b\u000eZ8x]^\u000bG/\u001a:nCJ\\w+\u001b;i_V$8)\u00197dQ\tID\b\u0005\u00025{%\u0011a(\u000e\u0002\u0005)\u0016\u001cH\u000fC\u0003A\u0001\u0011\u00051&\u0001\u0017uKN$\b+^:iI><hnQ1mG\u0006sGmV1uKJl\u0017M]6BgNLwM\\3s/&$\bnQ1mG\"\u0012q\b\u0010\u0005\u0006\u0007\u0002!\taK\u0001&i\u0016\u001cH\u000fU;tQ\u0012|wO\\,bi\u0016\u0014X.\u0019:l\u0003N\u001c\u0018n\u001a8fe^KG\u000f[\"bY\u000eD#A\u0011\u001f\t\u000b\u0019\u0003A\u0011A\u0016\u0002_Q,7\u000f\u001e)vg\"$wn\u001e8OK^\u001c\u0015\r\\2B]\u0012<\u0016\r^3s[\u0006\u00148.Q:tS\u001etWM],ji\"\u001c\u0015\r\\2)\u0005\u0015c\u0004\"B%\u0001\t\u0003Y\u0013\u0001\b;fgR<%o\\;q\u0017\u0016L\u0018j]\"p[B,H/\u001a3D_2,XN\u001c\u0015\u0003\u0011rBQ\u0001\u0014\u0001\u0005\u0002-\na\u0006^3tiB+8\u000f\u001b3po:\u001c\u0015\r\\2O_R\feMZ3di\u000eC\u0017M\\4fY><gj\u001c:nC2L'0Z&fs\"\u00121\n\u0010")
public class WatermarkAssignerChangelogNormalizeTransposeRuleTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Before
    public void setup() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE simple_src (\n                     |  currency STRING,\n                     |  currency_no STRING,\n                     |  rate  BIGINT,\n                     |  currency_time TIMESTAMP(3),\n                     |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     |  PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = 'UA,D',\n                     |  'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE src_with_computed_column (\n                     |  currency STRING,\n                     |  currency_no STRING,\n                     |  rate  BIGINT,\n                     |  c STRING,\n                     |  currency_time as to_timestamp(c),\n                     |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     |  PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = 'UA,D',\n                     |  'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE src_with_computed_column2 (\n                     | currency int,\n                     | currency2 as currency + 2,\n                     | currency_no STRING,\n                     | rate BIGINT,\n                     | c STRING,\n                     | currency_time as to_timestamp(c),\n                     | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     | PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     | 'connector' = 'values',\n                     | 'changelog-mode' = 'UA,D',\n                     | 'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
    }

    @Test
    public void testPushdownWatermarkWithoutCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM simple_src\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownCalcAndWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM src_with_computed_column\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |  MAX(rate) AS max_rate\n        |FROM simple_src\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownNewCalcAndWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |  MAX(rate) AS max_rate\n        |FROM src_with_computed_column\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testGroupKeyIsComputedColumn() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency2,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM src_with_computed_column2\n        |GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownCalcNotAffectChangelogNormalizeKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE t1 (\n                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',\n                    |  a VARCHAR NOT NULL,\n                    |  b VARCHAR NOT NULL,\n                    |  WATERMARK FOR ingestion_time AS ingestion_time\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'readable-metadata' = 'ts:TIMESTAMP(3)'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE t2 (\n                    |  k VARBINARY,\n                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',\n                    |  a VARCHAR NOT NULL,\n                    |  f BOOLEAN NOT NULL,\n                    |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,\n                    |  PRIMARY KEY (`a`) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'readable-metadata' = 'ts:TIMESTAMP(3)',\n                    | 'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t1.b, t2.f\n        |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time\n        | ON t1.a = t2.a WHERE t2.f = true\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }
}

