/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;

public class DynamicPartitionPruningUtils {
    public static boolean supportDynamicPartitionPruning(Join join) {
        return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true) || DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false);
    }

    public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) {
        if (!((Boolean)ShortcutUtils.unwrapContext(join).getTableConfig().get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)).booleanValue()) {
            return false;
        }
        if (join.getJoinType() == JoinRelType.LEFT ? factInLeft : (join.getJoinType() == JoinRelType.RIGHT ? !factInLeft : join.getJoinType() != JoinRelType.INNER && join.getJoinType() != JoinRelType.SEMI)) {
            return false;
        }
        JoinInfo joinInfo = join.analyzeCondition();
        if (joinInfo.leftKeys.isEmpty()) {
            return false;
        }
        RelNode left = join.getLeft();
        RelNode right = join.getRight();
        return factInLeft ? DynamicPartitionPruningUtils.isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys) : DynamicPartitionPruningUtils.isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys);
    }

    private static boolean isDynamicPartitionPruningPattern(RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) {
        return DynamicPartitionPruningUtils.isDimSide(dimSide) && DynamicPartitionPruningUtils.isFactSide(factSide, factSideJoinKey);
    }

    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
        DppFactSideFactors factSideFactors = new DppFactSideFactors();
        DynamicPartitionPruningUtils.visitFactSide(rel, factSideFactors, joinKeys);
        return factSideFactors.isFactSide();
    }

    private static boolean isDimSide(RelNode rel) {
        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
        DynamicPartitionPruningUtils.visitDimSide(rel, dimSideFactors);
        return dimSideFactors.isDimSide();
    }

    private static void visitFactSide(RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) {
        if (rel instanceof TableScan) {
            TableScan scan = (TableScan)rel;
            if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
            if (tableSourceTable == null) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            CatalogTable catalogTable = (CatalogTable)tableSourceTable.contextResolvedTable().getTable();
            List partitionKeys = catalogTable.getPartitionKeys();
            if (partitionKeys.isEmpty()) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            DynamicTableSource tableSource = tableSourceTable.tableSource();
            if (!(tableSource instanceof SupportsDynamicFiltering) || !(tableSource instanceof ScanTableSource)) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            if (!DynamicPartitionPruningUtils.isNewSource((ScanTableSource)tableSource)) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            List<String> candidateFields = joinKeys.stream().map(i -> scan.getRowType().getFieldNames().get((int)i)).collect(Collectors.toList());
            if (candidateFields.isEmpty()) {
                factSideFactors.isSuitableFactScanSource = false;
                return;
            }
            factSideFactors.isSuitableFactScanSource = !DynamicPartitionPruningUtils.getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields).isEmpty();
        } else if (rel instanceof HepRelVertex) {
            DynamicPartitionPruningUtils.visitFactSide(((HepRelVertex)rel).getCurrentRel(), factSideFactors, joinKeys);
        } else if (rel instanceof Exchange || rel instanceof Filter) {
            DynamicPartitionPruningUtils.visitFactSide(rel.getInput(0), factSideFactors, joinKeys);
        } else if (rel instanceof Project) {
            List<RexNode> projects = ((Project)rel).getProjects();
            ImmutableIntList inputJoinKeys = DynamicPartitionPruningUtils.getInputIndices(projects, joinKeys);
            if (inputJoinKeys.isEmpty()) {
                factSideFactors.isSuitableJoinKey = false;
                return;
            }
            DynamicPartitionPruningUtils.visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
        } else if (rel instanceof Calc) {
            Calc calc = (Calc)rel;
            RexProgram program = calc.getProgram();
            List<RexNode> projects = program.getProjectList().stream().map(program::expandLocalRef).collect(Collectors.toList());
            ImmutableIntList inputJoinKeys = DynamicPartitionPruningUtils.getInputIndices(projects, joinKeys);
            if (inputJoinKeys.isEmpty()) {
                factSideFactors.isSuitableJoinKey = false;
                return;
            }
            DynamicPartitionPruningUtils.visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
        }
    }

    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(DynamicTableSource tableSource, List<String> candidateFields) {
        List acceptedFilterFields = ((SupportsDynamicFiltering)tableSource).listAcceptedFilterFields();
        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
            return new ArrayList<String>();
        }
        ArrayList<String> suitableFields = new ArrayList<String>();
        for (String candidateField : candidateFields) {
            if (!acceptedFilterFields.contains(candidateField)) continue;
            suitableFields.add(candidateField);
        }
        return suitableFields;
    }

    private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) {
        if (rel instanceof TableScan) {
            CatalogTable catalogTable;
            TableScan scan = (TableScan)rel;
            TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class);
            if (table == null) {
                return;
            }
            if (!dimSideFactors.hasFilter && table.abilitySpecs() != null && table.abilitySpecs().length != 0) {
                for (SourceAbilitySpec spec : table.abilitySpecs()) {
                    if (!(spec instanceof FilterPushDownSpec)) continue;
                    List<RexNode> predicates = ((FilterPushDownSpec)spec).getPredicates();
                    for (RexNode predicate : predicates) {
                        if (!DynamicPartitionPruningUtils.isSuitableFilter(predicate)) continue;
                        dimSideFactors.hasFilter = true;
                    }
                }
            }
            dimSideFactors.hasNonPartitionedScan = !(catalogTable = (CatalogTable)table.contextResolvedTable().getTable()).isPartitioned();
        } else if (rel instanceof HepRelVertex) {
            DynamicPartitionPruningUtils.visitDimSide(((HepRelVertex)rel).getCurrentRel(), dimSideFactors);
        } else if (rel instanceof Exchange || rel instanceof Project) {
            DynamicPartitionPruningUtils.visitDimSide(rel.getInput(0), dimSideFactors);
        } else if (rel instanceof Calc) {
            RexProgram origProgram = ((Calc)rel).getProgram();
            if (origProgram.getCondition() != null && DynamicPartitionPruningUtils.isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) {
                dimSideFactors.hasFilter = true;
            }
            DynamicPartitionPruningUtils.visitDimSide(rel.getInput(0), dimSideFactors);
        } else if (rel instanceof Filter) {
            if (DynamicPartitionPruningUtils.isSuitableFilter(((Filter)rel).getCondition())) {
                dimSideFactors.hasFilter = true;
            }
            DynamicPartitionPruningUtils.visitDimSide(rel.getInput(0), dimSideFactors);
        }
    }

    private static boolean isSuitableFilter(RexNode filterCondition) {
        switch (filterCondition.getKind()) {
            case AND: {
                List<RexNode> conjunctions = RelOptUtil.conjunctions(filterCondition);
                return DynamicPartitionPruningUtils.isSuitableFilter(conjunctions.get(0)) || DynamicPartitionPruningUtils.isSuitableFilter(conjunctions.get(1));
            }
            case OR: {
                List<RexNode> disjunctions = RelOptUtil.disjunctions(filterCondition);
                return DynamicPartitionPruningUtils.isSuitableFilter(disjunctions.get(0)) && DynamicPartitionPruningUtils.isSuitableFilter(disjunctions.get(1));
            }
            case NOT: {
                return DynamicPartitionPruningUtils.isSuitableFilter((RexNode)((RexCall)filterCondition).operands.get(0));
            }
            case EQUALS: 
            case GREATER_THAN: 
            case GREATER_THAN_OR_EQUAL: 
            case LESS_THAN: 
            case LESS_THAN_OR_EQUAL: 
            case NOT_EQUALS: 
            case IN: 
            case LIKE: 
            case CONTAINS: 
            case SEARCH: 
            case IS_FALSE: 
            case IS_NOT_FALSE: 
            case IS_NOT_TRUE: 
            case IS_TRUE: {
                return true;
            }
        }
        return false;
    }

    private static boolean isNewSource(ScanTableSource scanTableSource) {
        ScanTableSource.ScanRuntimeProvider provider = scanTableSource.getScanRuntimeProvider((ScanTableSource.ScanContext)ScanRuntimeProviderContext.INSTANCE);
        if (provider instanceof SourceProvider) {
            return true;
        }
        if (provider instanceof TransformationScanProvider) {
            Transformation<RowData> transformation = ((TransformationScanProvider)provider).createTransformation(name -> Optional.empty());
            return transformation instanceof SourceTransformation;
        }
        return provider instanceof DataStreamScanProvider;
    }

    private static ImmutableIntList getInputIndices(List<RexNode> projects, ImmutableIntList joinKeys) {
        ArrayList<Integer> indices = new ArrayList<Integer>();
        for (int k : joinKeys) {
            RexNode rexNode = projects.get(k);
            if (!(rexNode instanceof RexInputRef)) continue;
            indices.add(((RexInputRef)rexNode).getIndex());
        }
        return ImmutableIntList.copyOf(indices);
    }

    private static class DppFactSideFactors {
        private boolean isSuitableFactScanSource;
        private boolean isSuitableJoinKey = true;

        private DppFactSideFactors() {
        }

        public boolean isFactSide() {
            return this.isSuitableFactScanSource && this.isSuitableJoinKey;
        }
    }

    private static class DppDimSideFactors {
        private boolean hasFilter;
        private boolean hasNonPartitionedScan;

        private DppDimSideFactors() {
        }

        public boolean isDimSide() {
            return this.hasFilter && this.hasNonPartitionedScan;
        }
    }
}

