package org.apache.flink.table.runtime.operators.python.scalar;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.class */
public abstract class AbstractPythonScalarFunctionOperator extends AbstractStatelessFunctionOperator<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final String SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1";
    protected final PythonFunctionInfo[] scalarFunctions;
    protected final int[] forwardedFields;
    protected transient StreamRecordRowDataWrappingCollector rowDataWrapper;
    private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
    private transient Projection<RowData, BinaryRowData> udfInputProjection;
    protected transient JoinedRowData reuseJoinedRow;

    public AbstractPythonScalarFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int[] iArr, int[] iArr2) {
        super(configuration, rowType, rowType2, iArr);
        this.scalarFunctions = (PythonFunctionInfo[]) Preconditions.checkNotNull(pythonFunctionInfoArr);
        this.forwardedFields = (int[]) Preconditions.checkNotNull(iArr2);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        this.reuseJoinedRow = new JoinedRowData();
        this.udfInputProjection = createUdfInputProjection();
        this.forwardedFieldProjection = createForwardedFieldProjection();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.scalarFunctions[0].getPythonFunction().getPythonEnv();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedFunctions.Builder newBuilder = FlinkFnApi.UserDefinedFunctions.newBuilder();
        for (PythonFunctionInfo pythonFunctionInfo : this.scalarFunctions) {
            newBuilder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
        }
        newBuilder.setMetricEnabled(this.pythonConfig.isMetricEnabled());
        newBuilder.setProfileEnabled(this.pythonConfig.isProfileEnabled());
        return newBuilder.build();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public String getFunctionUrn() {
        return SCALAR_FUNCTION_URN;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void bufferInput(RowData rowData) {
        BinaryRowData copy = this.forwardedFieldProjection.apply(rowData).copy();
        copy.setRowKind(rowData.getRowKind());
        this.forwardedInputQueue.add(copy);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowData getFunctionInput(RowData rowData) {
        return this.udfInputProjection.apply(rowData);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowType createUserDefinedFunctionOutputType() {
        return new RowType(this.outputType.getFields().subList(this.forwardedFields.length, this.outputType.getFieldCount()));
    }

    private Projection<RowData, BinaryRowData> createUdfInputProjection() {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), "UdfInputProjection", this.inputType, this.userDefinedFunctionInputType, this.userDefinedFunctionInputOffsets).newInstance(Thread.currentThread().getContextClassLoader());
    }

    private Projection<RowData, BinaryRowData> createForwardedFieldProjection() {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), "ForwardedFieldProjection", this.inputType, new RowType((List) Arrays.stream(this.forwardedFields).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList())), this.forwardedFields).newInstance(Thread.currentThread().getContextClassLoader());
    }
}
