/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.batch.sql;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.collection.Seq;

@RunWith(value=Parameterized.class)
public class DynamicFilteringTest
extends TableTestBase {
    private final BatchShuffleMode batchShuffleMode;
    private BatchTableTestUtil util;

    @Parameterized.Parameters(name="mode = {0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({BatchShuffleMode.ALL_EXCHANGES_BLOCKING}, {BatchShuffleMode.ALL_EXCHANGES_PIPELINED});
    }

    public DynamicFilteringTest(BatchShuffleMode batchShuffleMode) {
        this.batchShuffleMode = batchShuffleMode;
    }

    @Before
    public void before() {
        this.util = this.batchTestUtil(TableConfig.getDefault());
        this.util.tableEnv().getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)10);
        this.util.tableEnv().getConfig().getConfiguration().set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)this.batchShuffleMode);
        this.util.tableEnv().executeSql("CREATE TABLE fact1 (\n  a1 bigint,\n  b1 int,\n  c1 varchar,\n  p1 varchar\n) PARTITIONED BY(p1) WITH (\n 'connector' = 'values',\n 'disable-lookup' = 'true',\n 'runtime-source' = 'NewSource',\n 'partition-list' = 'p1:1;p1:2;p1:3',\n 'dynamic-filtering-fields' = 'p1;b1',\n 'bounded' = 'true'\n)");
        this.util.tableEnv().executeSql("CREATE TABLE fact2 (\n  a2 bigint,\n  b2 int,\n  c2 varchar,\n  p2 varchar\n) PARTITIONED BY(p2) WITH (\n 'connector' = 'values',\n 'disable-lookup' = 'true',\n 'runtime-source' = 'NewSource',\n 'partition-list' = 'p1:1;p1:2;p1:3',\n 'dynamic-filtering-fields' = 'p2',\n 'bounded' = 'true'\n)");
        this.util.tableEnv().executeSql("CREATE TABLE dim (\n  x BIGINT,\n  y BIGINT,\n  z VARCHAR,\n  p VARCHAR\n)  WITH (\n 'connector' = 'values',\n 'disable-lookup' = 'true',\n 'bounded' = 'true'\n)");
    }

    @Test
    public void testLegacySource() {
        this.util.tableEnv().executeSql("CREATE TABLE legacy_source (\n  a1 BIGINT,\n  b1 BIGINT,\n  c1 VARCHAR,\n  d1 BIGINT,\n  p1 VARCHAR\n) PARTITIONED BY (p1)\n  WITH (\n 'connector' = 'values',\n 'runtime-source' = 'SourceFunction',\n 'partition-list' = 'p1:1;p1:2;p1:3',\n 'dynamic-filtering-fields' = 'p1',\n 'disable-lookup' = 'true',\n 'bounded' = 'true'\n)");
        this.util.verifyExplain("SELECT * FROM legacy_source, dim WHERE p1 = p AND x > 10", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }

    @Test
    public void testSimpleDynamicFiltering() {
        this.util.verifyExplain("SELECT * FROM fact1, dim WHERE p1 = p AND x > 10", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }

    @Test
    public void testDynamicFilteringWithMultipleInput() {
        this.util.verifyExplain("SELECT * FROM fact1, dim, fact2 WHERE p1 = p and p1 = p2 AND x > 10", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }

    @Test
    public void testDuplicateFactTables() {
        this.util.verifyExplain("SELECT * FROM (SELECT * FROM fact1, dim WHERE p1 = p AND x > 10) t1 JOIN fact1 t2 ON t1.y = t2.b1", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }

    @Test
    public void testReuseDimSide() {
        this.util.verifyExplain("SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 UNION ALL SELECT * FROM fact2, dim WHERE p2 = p AND x > 10", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }

    @Test
    public void testDynamicFilteringWithStaticPartitionPruning() {
        this.util.verifyExplain("SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 and p1 > 1", (Seq<ExplainDetail>)JavaScalaConversionUtil.toScala(Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
    }
}

