/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.batch.sql.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.batch.sql.join.JoinITCaseHelper;
import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType;
import org.apache.flink.table.planner.utils.TestingTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class AdaptiveHashJoinITCase
extends TestLogger {
    public static final int DEFAULT_PARALLELISM = 3;
    @ClassRule
    public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(AdaptiveHashJoinITCase.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(3).build());
    private final TableEnvironment tEnv = TestingTableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build(), null, TableConfig.getDefault());

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"6m"));
        return config;
    }

    @Before
    public void before() throws Exception {
        this.tEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)1);
        JoinITCaseHelper.disableOtherJoinOpForJoin(this.tEnv, JoinType.HashJoin());
        ArrayList<Row> data1 = new ArrayList<Row>();
        data1.addAll(this.getRepeatedRow(2, 100000));
        data1.addAll(this.getRepeatedRow(5, 100000));
        data1.addAll(this.getRepeatedRow(10, 100000));
        String dataId1 = TestValuesTableFactory.registerData(data1);
        ArrayList<Row> data2 = new ArrayList<Row>();
        data2.addAll(this.getRepeatedRow(5, 10));
        data2.addAll(this.getRepeatedRow(10, 10));
        data2.addAll(this.getRepeatedRow(20, 10));
        String dataId2 = TestValuesTableFactory.registerData(data2);
        this.tEnv.executeSql(String.format("CREATE TABLE t1 (\n  x INT,\n  y BIGINT,\n  z VARCHAR\n)  WITH (\n 'connector' = 'values',\n 'data-id' = '%s',\n 'bounded' = 'true'\n)", dataId1));
        this.tEnv.executeSql(String.format("CREATE TABLE t2 (\n  a INT,\n  b BIGINT,\n  c VARCHAR\n)  WITH (\n 'connector' = 'values',\n 'data-id' = '%s',\n 'bounded' = 'true'\n)", dataId2));
        this.tEnv.executeSql("CREATE TABLE sink (\n  x INT,\n  z VARCHAR,\n  a INT,\n  b BIGINT,\n  c VARCHAR\n)  WITH (\n 'connector' = 'values',\n 'bounded' = 'true'\n)");
    }

    @After
    public void after() {
        TestValuesTableFactory.clearAllData();
    }

    @Test
    public void testBuildLeftIntKeyAdaptiveHashJoin() throws Exception {
        this.tEnv.executeSql("INSERT INTO sink SELECT x, z, a, b, c FROM t1 JOIN t2 ON t1.x=t2.a").await(60L, TimeUnit.SECONDS);
        this.asserResult("sink", 2000000);
    }

    @Test
    public void testBuildRightIntKeyAdaptiveHashJoin() throws Exception {
        this.tEnv.executeSql("INSERT INTO sink SELECT x, z, a, b, c FROM t2 JOIN t1 ON t1.x=t2.a").await(60L, TimeUnit.SECONDS);
        this.asserResult("sink", 2000000);
    }

    @Test
    public void testBuildLeftStringKeyAdaptiveHashJoin() throws Exception {
        this.tEnv.executeSql("INSERT INTO sink SELECT x, z, a, b, c FROM t1 JOIN t2 ON t1.z=t2.c").await(60L, TimeUnit.SECONDS);
        this.asserResult("sink", 2000000);
    }

    @Test
    public void testBuildRightStringKeyAdaptiveHashJoin() throws Exception {
        this.tEnv.executeSql("INSERT INTO sink SELECT x, z, a, b, c FROM t2 JOIN t1 ON t1.z=t2.c").await(60L, TimeUnit.SECONDS);
        this.asserResult("sink", 2000000);
    }

    private void asserResult(String sinkTableName, int resultSize) {
        List<String> result = TestValuesTableFactory.getResults(sinkTableName);
        Assertions.assertThat((int)result.size()).isEqualTo(resultSize);
    }

    private List<Row> getRepeatedRow(int key, int nums) {
        ArrayList<Row> rows = new ArrayList<Row>();
        for (int i = 0; i < nums; ++i) {
            rows.add(Row.of((Object[])new Object[]{key, (long)key, String.valueOf(key)}));
        }
        return rows;
    }
}

