/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class BatchExecHashJoin
extends ExecNodeBase<RowData>
implements BatchExecNode<RowData>,
SingleTransformationTranslator<RowData> {
    private final JoinSpec joinSpec;
    private final boolean leftIsBuild;
    private final int estimatedLeftAvgRowSize;
    private final int estimatedRightAvgRowSize;
    private final long estimatedLeftRowCount;
    private final long estimatedRightRowCount;
    private final boolean tryDistinctBuildRow;

    public BatchExecHashJoin(ReadableConfig tableConfig, JoinSpec joinSpec, int estimatedLeftAvgRowSize, int estimatedRightAvgRowSize, long estimatedLeftRowCount, long estimatedRightRowCount, boolean leftIsBuild, boolean tryDistinctBuildRow, InputProperty leftInputProperty, InputProperty rightInputProperty, RowType outputType, String description) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecHashJoin.class), ExecNodeContext.newPersistedConfig(BatchExecHashJoin.class, tableConfig), Arrays.asList(leftInputProperty, rightInputProperty), (LogicalType)outputType, description);
        this.joinSpec = joinSpec;
        this.leftIsBuild = leftIsBuild;
        this.estimatedLeftAvgRowSize = estimatedLeftAvgRowSize;
        this.estimatedRightAvgRowSize = estimatedRightAvgRowSize;
        this.estimatedLeftRowCount = estimatedLeftRowCount;
        this.estimatedRightRowCount = estimatedRightRowCount;
        this.tryDistinctBuildRow = tryDistinctBuildRow;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        int[] probeKeys;
        long probeRowCount;
        RowType probeType;
        GeneratedProjection probeProj;
        Transformation<?> probeTransform;
        int[] buildKeys;
        long buildRowCount;
        int buildRowSize;
        RowType buildType;
        GeneratedProjection buildProj;
        Transformation<?> buildTransform;
        boolean reverseJoin;
        ExecEdge leftInputEdge = this.getInputEdges().get(0);
        ExecEdge rightInputEdge = this.getInputEdges().get(1);
        Transformation<?> leftInputTransform = leftInputEdge.translateToPlan(planner);
        Transformation<?> rightInputTransform = rightInputEdge.translateToPlan(planner);
        RowType leftType = (RowType)leftInputEdge.getOutputType();
        RowType rightType = (RowType)rightInputEdge.getOutputType();
        JoinUtil.validateJoinSpec(this.joinSpec, leftType, rightType, false);
        int[] leftKeys = this.joinSpec.getLeftKeys();
        int[] rightKeys = this.joinSpec.getRightKeys();
        LogicalType[] keyFieldTypes = (LogicalType[])IntStream.of(leftKeys).mapToObj(arg_0 -> ((RowType)leftType).getTypeAt(arg_0)).toArray(LogicalType[]::new);
        RowType keyType = RowType.of((LogicalType[])keyFieldTypes);
        GeneratedJoinCondition condFunc = JoinUtil.generateConditionFunction((ReadableConfig)config, planner.getFlinkContext().getClassLoader(), this.joinSpec.getNonEquiCondition().orElse(null), (LogicalType)leftType, (LogicalType)rightType);
        GeneratedProjection leftProj = ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(config, planner.getFlinkContext().getClassLoader()), "HashJoinLeftProjection", leftType, keyType, leftKeys);
        GeneratedProjection rightProj = ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(config, planner.getFlinkContext().getClassLoader()), "HashJoinRightProjection", rightType, keyType, rightKeys);
        boolean bl = reverseJoin = !this.leftIsBuild;
        if (this.leftIsBuild) {
            buildTransform = leftInputTransform;
            buildProj = leftProj;
            buildType = leftType;
            buildRowSize = this.estimatedLeftAvgRowSize;
            buildRowCount = this.estimatedLeftRowCount;
            buildKeys = leftKeys;
            probeTransform = rightInputTransform;
            probeProj = rightProj;
            probeType = rightType;
            probeRowCount = this.estimatedRightRowCount;
            probeKeys = rightKeys;
        } else {
            buildTransform = rightInputTransform;
            buildProj = rightProj;
            buildType = rightType;
            buildRowSize = this.estimatedRightAvgRowSize;
            buildRowCount = this.estimatedRightRowCount;
            buildKeys = rightKeys;
            probeTransform = leftInputTransform;
            probeProj = leftProj;
            probeType = leftType;
            probeRowCount = this.estimatedLeftRowCount;
            probeKeys = leftKeys;
        }
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        HashJoinType hashJoinType = HashJoinType.of((boolean)this.leftIsBuild, (boolean)joinType.isLeftOuter(), (boolean)joinType.isRightOuter(), (joinType == FlinkJoinType.SEMI ? 1 : 0) != 0, (joinType == FlinkJoinType.ANTI ? 1 : 0) != 0);
        long externalBufferMemory = ((MemorySize)config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)).getBytes();
        long managedMemory = this.getLargeManagedMemory(joinType, config);
        SortMergeJoinFunction sortMergeJoinFunction = SorMergeJoinOperatorUtil.getSortMergeJoinFunction(planner.getFlinkContext().getClassLoader(), config, joinType, leftType, rightType, leftKeys, rightKeys, keyType, this.leftIsBuild, this.joinSpec.getFilterNulls(), condFunc, 1.0 * (double)externalBufferMemory / (double)managedMemory);
        SimpleOperatorFactory operator = LongHashJoinGenerator.support(hashJoinType, keyType, this.joinSpec.getFilterNulls()) ? LongHashJoinGenerator.gen(config, planner.getFlinkContext().getClassLoader(), hashJoinType, keyType, buildType, probeType, buildKeys, probeKeys, buildRowSize, buildRowCount, reverseJoin, condFunc, this.leftIsBuild, sortMergeJoinFunction) : SimpleOperatorFactory.of((StreamOperator)HashJoinOperator.newHashJoinOperator((HashJoinType)hashJoinType, (boolean)this.leftIsBuild, (GeneratedJoinCondition)condFunc, (boolean)reverseJoin, (boolean[])this.joinSpec.getFilterNulls(), (GeneratedProjection)buildProj, (GeneratedProjection)probeProj, (boolean)this.tryDistinctBuildRow, (int)buildRowSize, (long)buildRowCount, (long)probeRowCount, (RowType)keyType, (SortMergeJoinFunction)sortMergeJoinFunction));
        return ExecNodeUtil.createTwoInputTransformation(buildTransform, probeTransform, this.createTransformationName(config), this.createTransformationDescription(config), operator, InternalTypeInfo.of((LogicalType)this.getOutputType()), probeTransform.getParallelism(), managedMemory);
    }

    private long getLargeManagedMemory(FlinkJoinType joinType, ExecNodeConfig config) {
        long hashJoinManagedMemory = ((MemorySize)config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY)).getBytes();
        long externalBufferMemory = ((MemorySize)config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)).getBytes();
        long sortMemory = ((MemorySize)config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY)).getBytes();
        int externalBufferNum = 1;
        if (joinType == FlinkJoinType.FULL) {
            externalBufferNum = 2;
        }
        long sortMergeJoinManagedMemory = externalBufferMemory * (long)externalBufferNum + sortMemory * 2L;
        return Math.max(hashJoinManagedMemory, sortMergeJoinManagedMemory);
    }
}

