/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.connectors;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.expressions.ExpressionVisitor;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.connectors.ExternalDynamicSource;
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;

@Internal
public final class DynamicSourceUtils {
    public static RelNode convertDataStreamToRel(boolean isBatchMode, ReadableConfig config, FlinkRelBuilder relBuilder, ContextResolvedTable contextResolvedTable, DataStream<?> dataStream, DataType physicalDataType, boolean isTopLevelRecord, ChangelogMode changelogMode) {
        ExternalDynamicSource tableSource = new ExternalDynamicSource(contextResolvedTable.getIdentifier(), dataStream, physicalDataType, isTopLevelRecord, changelogMode);
        FlinkStatistic statistic = FlinkStatistic.unknown(contextResolvedTable.getResolvedSchema()).build();
        return DynamicSourceUtils.convertSourceToRel(isBatchMode, config, relBuilder, contextResolvedTable, statistic, Collections.emptyList(), tableSource);
    }

    public static RelNode convertSourceToRel(boolean isBatchMode, ReadableConfig config, FlinkRelBuilder relBuilder, ContextResolvedTable contextResolvedTable, FlinkStatistic statistic, List<RelHint> hints, DynamicTableSource tableSource) {
        String tableDebugName = contextResolvedTable.getIdentifier().asSummaryString();
        ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable)contextResolvedTable.getResolvedTable();
        DynamicSourceUtils.prepareDynamicSource(tableDebugName, resolvedCatalogTable, tableSource, isBatchMode, config);
        DynamicSourceUtils.pushTableScan(isBatchMode, relBuilder, contextResolvedTable, statistic, hints, tableSource);
        ResolvedSchema schema = contextResolvedTable.getResolvedSchema();
        if (!schema.getColumns().stream().allMatch(Column::isPhysical)) {
            DynamicSourceUtils.pushMetadataProjection(relBuilder, schema);
            DynamicSourceUtils.pushGeneratedProjection(relBuilder, schema);
        }
        if (!isBatchMode && !schema.getWatermarkSpecs().isEmpty()) {
            DynamicSourceUtils.pushWatermarkAssigner(relBuilder, schema);
        }
        return relBuilder.build();
    }

    public static void prepareDynamicSource(String tableDebugName, ResolvedCatalogTable table, DynamicTableSource source, boolean isBatchMode, ReadableConfig config) {
        ResolvedSchema schema = table.getResolvedSchema();
        DynamicSourceUtils.validateAndApplyMetadata(tableDebugName, schema, source);
        if (source instanceof ScanTableSource) {
            DynamicSourceUtils.validateScanSource(tableDebugName, schema, (ScanTableSource)source, isBatchMode, config);
        }
    }

    public static List<Column.MetadataColumn> createRequiredMetadataColumns(ResolvedSchema schema, DynamicTableSource source) {
        List<Column.MetadataColumn> metadataColumns = DynamicSourceUtils.extractMetadataColumns(schema);
        HashMap<String, Column.MetadataColumn> metadataKeysToMetadataColumns = new HashMap<String, Column.MetadataColumn>();
        for (Column.MetadataColumn column : metadataColumns) {
            String metadataKey = column.getMetadataKey().orElse(column.getName());
            metadataKeysToMetadataColumns.put(metadataKey, column);
        }
        Map<String, DataType> metadataMap = DynamicSourceUtils.extractMetadataMap(source);
        return metadataMap.keySet().stream().filter(metadataKeysToMetadataColumns::containsKey).map(metadataKeysToMetadataColumns::get).collect(Collectors.toList());
    }

    public static RowType createProducedType(ResolvedSchema schema, DynamicTableSource source) {
        Map<String, DataType> metadataMap = DynamicSourceUtils.extractMetadataMap(source);
        Stream physicalFields = ((RowType)schema.toPhysicalRowDataType().getLogicalType()).getFields().stream();
        Stream<RowType.RowField> metadataFields = DynamicSourceUtils.createRequiredMetadataColumns(schema, source).stream().map(k -> new RowType.RowField(k.getName(), ((DataType)metadataMap.get(k.getMetadataKey().orElse(k.getName()))).getLogicalType()));
        List rowFields = Stream.concat(physicalFields, metadataFields).collect(Collectors.toList());
        return new RowType(false, rowFields);
    }

    public static boolean isUpsertSource(ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource)tableSource).getChangelogMode();
        boolean isUpsertMode = mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
        boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
        return isUpsertMode && hasPrimaryKey;
    }

    public static boolean isSourceChangeEventsDuplicate(ResolvedSchema resolvedSchema, DynamicTableSource tableSource, TableConfig tableConfig) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource)tableSource).getChangelogMode();
        boolean isCDCSource = !mode.containsOnly(RowKind.INSERT) && !DynamicSourceUtils.isUpsertSource(resolvedSchema, tableSource);
        boolean changeEventsDuplicate = (Boolean)tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
        boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
        return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
    }

    private static void pushWatermarkAssigner(FlinkRelBuilder relBuilder, ResolvedSchema schema) {
        ExpressionConverter converter = new ExpressionConverter(relBuilder);
        RelDataType inputRelDataType = relBuilder.peek().getRowType();
        WatermarkSpec watermarkSpec = (WatermarkSpec)schema.getWatermarkSpecs().get(0);
        String rowtimeColumn = watermarkSpec.getRowtimeAttribute();
        int rowtimeColumnIdx = inputRelDataType.getFieldNames().indexOf(rowtimeColumn);
        RexNode watermarkRexNode = (RexNode)watermarkSpec.getWatermarkExpression().accept((ExpressionVisitor)converter);
        relBuilder.watermark(rowtimeColumnIdx, watermarkRexNode);
    }

    private static void pushGeneratedProjection(FlinkRelBuilder relBuilder, ResolvedSchema schema) {
        ExpressionConverter converter = new ExpressionConverter(relBuilder);
        List projection = schema.getColumns().stream().map(c -> {
            if (c instanceof Column.ComputedColumn) {
                Column.ComputedColumn computedColumn = (Column.ComputedColumn)c;
                return (RexNode)computedColumn.getExpression().accept((ExpressionVisitor)converter);
            }
            return relBuilder.field(c.getName());
        }).collect(Collectors.toList());
        relBuilder.projectNamed(projection, schema.getColumns().stream().map(Column::getName).collect(Collectors.toList()), true);
    }

    private static void pushMetadataProjection(FlinkRelBuilder relBuilder, ResolvedSchema schema) {
        RexBuilder rexBuilder = relBuilder.getRexBuilder();
        List<String> fieldNames = schema.getColumns().stream().filter(c -> !(c instanceof Column.ComputedColumn)).map(Column::getName).collect(Collectors.toList());
        List fieldNodes = schema.getColumns().stream().filter(c -> !(c instanceof Column.ComputedColumn)).map(c -> {
            RelDataType relDataType = relBuilder.getTypeFactory().createFieldTypeFromLogicalType(c.getDataType().getLogicalType());
            if (c instanceof Column.MetadataColumn) {
                Column.MetadataColumn metadataColumn = (Column.MetadataColumn)c;
                String columnName = metadataColumn.getName();
                return rexBuilder.makeAbstractCast(relDataType, relBuilder.field(columnName));
            }
            return relBuilder.field(c.getName());
        }).collect(Collectors.toList());
        relBuilder.projectNamed(fieldNodes, fieldNames, true);
    }

    private static void pushTableScan(boolean isBatchMode, FlinkRelBuilder relBuilder, ContextResolvedTable contextResolvedTable, FlinkStatistic statistic, List<RelHint> hints, DynamicTableSource tableSource) {
        RowType producedType = DynamicSourceUtils.createProducedType(contextResolvedTable.getResolvedSchema(), tableSource);
        RelDataType producedRelDataType = relBuilder.getTypeFactory().buildRelNodeRowType(producedType);
        TableSourceTable tableSourceTable = new TableSourceTable(relBuilder.getRelOptSchema(), producedRelDataType, statistic, tableSource, !isBatchMode, contextResolvedTable, ShortcutUtils.unwrapContext(relBuilder), new SourceAbilitySpec[0]);
        LogicalTableScan scan = LogicalTableScan.create(relBuilder.getCluster(), tableSourceTable, hints);
        relBuilder.push(scan);
    }

    private static Map<String, DataType> extractMetadataMap(DynamicTableSource source) {
        if (source instanceof SupportsReadingMetadata) {
            return ((SupportsReadingMetadata)source).listReadableMetadata();
        }
        return Collections.emptyMap();
    }

    private static List<Column.MetadataColumn> extractMetadataColumns(ResolvedSchema schema) {
        return schema.getColumns().stream().filter(Column.MetadataColumn.class::isInstance).map(Column.MetadataColumn.class::cast).collect(Collectors.toList());
    }

    private static void validateAndApplyMetadata(String tableDebugName, ResolvedSchema schema, DynamicTableSource source) {
        List<Column.MetadataColumn> metadataColumns = DynamicSourceUtils.extractMetadataColumns(schema);
        if (metadataColumns.isEmpty()) {
            return;
        }
        if (!(source instanceof SupportsReadingMetadata)) {
            throw new ValidationException(String.format("Table '%s' declares metadata columns, but the underlying %s doesn't implement the %s interface. Therefore, metadata cannot be read from the given source.", source.asSummaryString(), DynamicTableSource.class.getSimpleName(), SupportsReadingMetadata.class.getSimpleName()));
        }
        SupportsReadingMetadata metadataSource = (SupportsReadingMetadata)source;
        Map metadataMap = metadataSource.listReadableMetadata();
        metadataColumns.forEach(c -> {
            String metadataKey = c.getMetadataKey().orElse(c.getName());
            LogicalType metadataType = c.getDataType().getLogicalType();
            DataType expectedMetadataDataType = (DataType)metadataMap.get(metadataKey);
            if (expectedMetadataDataType == null) {
                throw new ValidationException(String.format("Invalid metadata key '%s' in column '%s' of table '%s'. The %s class '%s' supports the following metadata keys for reading:\n%s", metadataKey, c.getName(), tableDebugName, DynamicTableSource.class.getSimpleName(), source.getClass().getName(), String.join((CharSequence)"\n", metadataMap.keySet())));
            }
            if (!LogicalTypeCasts.supportsExplicitCast((LogicalType)expectedMetadataDataType.getLogicalType(), (LogicalType)metadataType)) {
                if (metadataKey.equals(c.getName())) {
                    throw new ValidationException(String.format("Invalid data type for metadata column '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", c.getName(), tableDebugName, expectedMetadataDataType.getLogicalType(), metadataType));
                }
                throw new ValidationException(String.format("Invalid data type for metadata column '%s' with metadata key '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", c.getName(), metadataKey, tableDebugName, expectedMetadataDataType.getLogicalType(), metadataType));
            }
        });
        metadataSource.applyReadableMetadata(DynamicSourceUtils.createRequiredMetadataColumns(schema, source).stream().map(column -> column.getMetadataKey().orElse(column.getName())).collect(Collectors.toList()), TypeConversions.fromLogicalToDataType((LogicalType)DynamicSourceUtils.createProducedType(schema, source)));
    }

    private static void validateScanSource(String tableDebugName, ResolvedSchema schema, ScanTableSource scanSource, boolean isBatchMode, ReadableConfig config) {
        ScanTableSource.ScanRuntimeProvider provider = scanSource.getScanRuntimeProvider((ScanTableSource.ScanContext)ScanRuntimeProviderContext.INSTANCE);
        ChangelogMode changelogMode = scanSource.getChangelogMode();
        DynamicSourceUtils.validateWatermarks(tableDebugName, schema);
        if (isBatchMode) {
            DynamicSourceUtils.validateScanSourceForBatch(tableDebugName, changelogMode, provider);
        } else {
            DynamicSourceUtils.validateScanSourceForStreaming(tableDebugName, schema, scanSource, changelogMode, config);
        }
    }

    private static void validateScanSourceForStreaming(String tableDebugName, ResolvedSchema schema, ScanTableSource scanSource, ChangelogMode changelogMode, ReadableConfig config) {
        boolean hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE);
        boolean hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER);
        if (!hasUpdateBefore && hasUpdateAfter) {
            if (!schema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Table '%s' produces a changelog stream that contains UPDATE_AFTER but no UPDATE_BEFORE. This requires defining a primary key constraint on the table.", tableDebugName));
            }
        } else {
            boolean changeEventsDuplicate;
            if (hasUpdateBefore && !hasUpdateAfter) {
                throw new ValidationException(String.format("Invalid source for table '%s'. A %s doesn't support a changelog stream that contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class '%s'.", tableDebugName, ScanTableSource.class.getSimpleName(), scanSource.getClass().getName()));
            }
            if (!changelogMode.containsOnly(RowKind.INSERT) && (changeEventsDuplicate = ((Boolean)config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE)).booleanValue()) && !schema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Configuration '%s' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table '%s' doesn't have a primary key.", ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE.key(), tableDebugName));
            }
        }
    }

    private static void validateScanSourceForBatch(String tableDebugName, ChangelogMode changelogMode, ScanTableSource.ScanRuntimeProvider provider) {
        if (!provider.isBounded()) {
            throw new ValidationException(String.format("Querying an unbounded table '%s' in batch mode is not allowed. The table source is unbounded.", tableDebugName));
        }
        if (!changelogMode.containsOnly(RowKind.INSERT)) {
            throw new TableException(String.format("Querying a table in batch mode is currently only possible for INSERT-only table sources. But the source for table '%s' produces other changelog messages than just INSERT.", tableDebugName));
        }
    }

    private static void validateWatermarks(String tableDebugName, ResolvedSchema schema) {
        if (schema.getWatermarkSpecs().isEmpty()) {
            return;
        }
        if (schema.getWatermarkSpecs().size() > 1) {
            throw new TableException(String.format("Currently only at most one WATERMARK declaration is supported for table '%s'.", tableDebugName));
        }
        String rowtimeAttribute = ((WatermarkSpec)schema.getWatermarkSpecs().get(0)).getRowtimeAttribute();
        if (rowtimeAttribute.contains(".")) {
            throw new TableException(String.format("A nested field '%s' cannot be declared as rowtime attribute for table '%s' right now.", rowtimeAttribute, tableDebugName));
        }
    }

    private DynamicSourceUtils() {
    }
}

