/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor;
import org.apache.flink.table.planner.plan.nodes.exec.processor.ProcessorContext;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.types.logical.RowType;

public class DynamicFilteringDependencyProcessor
implements ExecNodeGraphProcessor {
    @Override
    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
        ExecNodeGraph factSideProcessedGraph = this.checkIfFactSourceNeedEnforceDependency(execGraph);
        return this.enforceDimSideBlockingExchange(factSideProcessedGraph, context);
    }

    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph execGraph) {
        final HashMap dynamicFilteringScanDescendants = new HashMap();
        AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                node.getInputEdges().stream().map(ExecEdge::getSource).forEach(input -> {
                    if (input instanceof BatchExecTableSourceScan && input.getInputEdges().size() > 0) {
                        dynamicFilteringScanDescendants.computeIfAbsent((BatchExecTableSourceScan)input, ignored -> new ArrayList()).add(node);
                    }
                });
                this.visitInputs(node);
            }
        };
        execGraph.getRootNodes().forEach(node -> node.accept(dynamicFilteringScanCollector));
        for (Map.Entry entry : dynamicFilteringScanDescendants.entrySet()) {
            ExecNode next;
            if (((List)entry.getValue()).size() == 1 && (next = (ExecNode)((List)entry.getValue()).get(0)) instanceof BatchExecMultipleInput) continue;
            ((BatchExecTableSourceScan)entry.getKey()).setNeedDynamicFilteringDependency(true);
        }
        return execGraph;
    }

    private ExecNodeGraph enforceDimSideBlockingExchange(ExecNodeGraph execGraph, final ProcessorContext context) {
        if (context.getPlanner().getTableConfig().getConfiguration().get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
            return execGraph;
        }
        final HashSet nodesRequiredBlockingOutputs = new HashSet();
        AbstractExecNodeExactlyOnceVisitor nodesRequiredBlockingOutputsCollector = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                if (node instanceof BatchExecDynamicFilteringDataCollector) {
                    nodesRequiredBlockingOutputs.add(node);
                }
                if (nodesRequiredBlockingOutputs.contains(node)) {
                    node.getInputEdges().stream().map(ExecEdge::getSource).forEach(nodesRequiredBlockingOutputs::add);
                }
                this.visitInputs(node);
            }
        };
        execGraph.getRootNodes().forEach(node -> node.accept(nodesRequiredBlockingOutputsCollector));
        AbstractExecNodeExactlyOnceVisitor blockingEnforcerVisitor = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?> node) {
                this.visitInputs(node);
                if (nodesRequiredBlockingOutputs.contains(node)) {
                    return;
                }
                for (int i = 0; i < node.getInputEdges().size(); ++i) {
                    ExecEdge edge = node.getInputEdges().get(i);
                    ExecNode<?> source = edge.getSource();
                    if (!nodesRequiredBlockingOutputs.contains(source)) continue;
                    if (source instanceof BatchExecExchange) {
                        ((BatchExecExchange)source).setRequiredExchangeMode(StreamExchangeMode.BATCH);
                        continue;
                    }
                    if (node instanceof BatchExecExchange) {
                        ((BatchExecExchange)node).setRequiredExchangeMode(StreamExchangeMode.BATCH);
                        continue;
                    }
                    BatchExecExchange exchange = DynamicFilteringDependencyProcessor.this.createExchange(source, node.getInputProperties().get(i), context.getPlanner().getTableConfig());
                    ExecEdge newEdge = ExecEdge.builder().source(exchange).target(node).build();
                    node.replaceInputEdge(i, newEdge);
                }
            }
        };
        execGraph.getRootNodes().forEach(node -> node.accept(blockingEnforcerVisitor));
        return execGraph;
    }

    private BatchExecExchange createExchange(ExecNode<?> source, InputProperty inputProperty, TableConfig tableConfig) {
        InputProperty newProperty = InputProperty.builder().requiredDistribution(inputProperty.getRequiredDistribution() == InputProperty.UNKNOWN_DISTRIBUTION ? InputProperty.ANY_DISTRIBUTION : inputProperty.getRequiredDistribution()).damBehavior(inputProperty.getDamBehavior()).priority(inputProperty.getPriority()).build();
        BatchExecExchange exchange = new BatchExecExchange((ReadableConfig)tableConfig, newProperty, (RowType)source.getOutputType(), "Exchange");
        exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
        ExecEdge execEdge = ExecEdge.builder().source(source).target(exchange).build();
        exchange.setInputEdges(Collections.singletonList(execEdge));
        return exchange;
    }
}

