/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.batch.sql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UpdateTableITCase
extends BatchTestBase {
    private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;

    @Parameterized.Parameters(name="updateMode = {0}")
    public static Collection<SupportsRowLevelUpdate.RowLevelUpdateMode> data() {
        return Arrays.asList(SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS, SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS);
    }

    public UpdateTableITCase(SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
        this.updateMode = updateMode;
    }

    @Test
    public void testUpdate() throws Exception {
        String dataId = this.registerData();
        this.tEnv().executeSql(String.format("CREATE TABLE t ( a int PRIMARY KEY NOT ENFORCED, b string, c double) WITH ('connector' = 'test-update-delete', 'data-id' = '%s', 'update-mode' = '%s')", dataId, this.updateMode));
        this.tEnv().executeSql("UPDATE t SET b = 'uaa', c = c * c WHERE a >= 1").await();
        List<String> rows = this.toSortedResults(this.tEnv().executeSql("SELECT * FROM t"));
        Assertions.assertThat((String)rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uaa, 16.0]]");
        this.tEnv().executeSql("UPDATE t SET b = 'uab' WHERE a > (SELECT count(1) FROM t WHERE a > 1)").await();
        rows = this.toSortedResults(this.tEnv().executeSql("SELECT * FROM t"));
        Assertions.assertThat((String)rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uab, 16.0]]");
    }

    @Test
    public void testStatementSetContainUpdateAndInsert() throws Exception {
        this.tEnv().executeSql("CREATE TABLE t (a int, b string, c double) WITH ('connector' = 'test-update-delete')");
        StatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql("INSERT INTO t VALUES (1, 'v1', 1)");
        statementSet.addInsertSql("UPDATE t SET b = 'uaa'");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StatementSet)statementSet).execute()).isInstanceOf(TableException.class)).hasMessage("Unsupported SQL query! Only accept a single SQL statement of type UPDATE.");
    }

    @Test
    public void testCompilePlanSql() throws Exception {
        this.tEnv().executeSql("CREATE TABLE t (a int, b string, c double) WITH ('connector' = 'test-update-delete')");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().compilePlanSql("UPDATE t SET b = 'uaa'")).isInstanceOf(TableException.class)).hasMessage("Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT");
    }

    @Test
    public void testUpdateWithLegacyTableSink() {
        this.tEnv().executeSql("CREATE TABLE t (a int, b string, c double) WITH ('connector' = 'COLLECTION')");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("UPDATE t SET b = 'uaa'")).isInstanceOf(TableException.class)).hasMessage(String.format("Can't perform update operation of the table %s  because the corresponding table sink is the legacy TableSink, Please implement %s for it.", "`default_catalog`.`default_database`.`t`", DynamicTableSink.class.getName()));
    }

    private String registerData() {
        List<RowData> values = this.createValue();
        return TestUpdateDeleteTableFactory.registerRowData(values);
    }

    private List<RowData> createValue() {
        ArrayList<RowData> values = new ArrayList<RowData>();
        for (int i = 0; i < 3; ++i) {
            values.add((RowData)GenericRowData.of((Object[])new Object[]{i, StringData.fromString((String)("b_" + i)), (double)i * 2.0}));
        }
        return values;
    }

    private List<String> toSortedResults(TableResult result) {
        return CollectionUtil.iteratorToList((Iterator)result.collect()).stream().map(Row::toString).sorted().collect(Collectors.toList());
    }
}

