package org.apache.flink.iteration.operator;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.flink.iteration.IterationListener;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.broadcast.BroadcastOutput;
import org.apache.flink.iteration.broadcast.BroadcastOutputFactory;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerFactory;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerListener;
import org.apache.flink.iteration.proxy.ProxyOutput;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/iteration/operator/AbstractWrapperOperator.class */
public abstract class AbstractWrapperOperator<T> implements StreamOperator<IterationRecord<T>>, OperatorEpochWatermarkTrackerListener, BoundedMultiInput {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWrapperOperator.class);
    protected final StreamOperatorParameters<IterationRecord<T>> parameters;
    protected final StreamConfig streamConfig;
    protected final StreamTask<?, ?> containingTask;
    protected final Output<StreamRecord<IterationRecord<T>>> output;
    protected final StreamOperatorFactory<T> operatorFactory;
    protected final ProxyOutput<T> proxyOutput;
    protected final InternalOperatorMetricGroup metrics;
    protected final OperatorEpochWatermarkTracker epochWatermarkTracker;
    protected final String uniqueSenderId;
    protected final BroadcastOutput<IterationRecord<T>> eventBroadcastOutput;
    protected final EpochSupplier epochWatermarkSupplier = new EpochSupplier();
    protected final AbstractWrapperOperator<T>.IterationContext iterationContext = new IterationContext();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/operator/AbstractWrapperOperator$EpochSupplier.class */
    public static class EpochSupplier implements Supplier<Integer> {
        private Integer epoch;

        private EpochSupplier() {
        }

        public void set(Integer num) {
            this.epoch = num;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public Integer get() {
            return this.epoch;
        }
    }

    /* loaded from: input_file:org/apache/flink/iteration/operator/AbstractWrapperOperator$IterationContext.class */
    private class IterationContext implements IterationListener.Context {
        private IterationContext() {
        }

        @Override // org.apache.flink.iteration.IterationListener.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            AbstractWrapperOperator.this.proxyOutput.collect(outputTag, new StreamRecord<>(x));
        }
    }

    public AbstractWrapperOperator(StreamOperatorParameters<IterationRecord<T>> streamOperatorParameters, StreamOperatorFactory<T> streamOperatorFactory) {
        this.parameters = (StreamOperatorParameters) Objects.requireNonNull(streamOperatorParameters);
        this.streamConfig = (StreamConfig) Objects.requireNonNull(streamOperatorParameters.getStreamConfig());
        this.containingTask = (StreamTask) Objects.requireNonNull(streamOperatorParameters.getContainingTask());
        this.output = (Output) Objects.requireNonNull(streamOperatorParameters.getOutput());
        this.operatorFactory = (StreamOperatorFactory) Objects.requireNonNull(streamOperatorFactory);
        this.proxyOutput = new ProxyOutput<>(this.output);
        this.metrics = createOperatorMetricGroup(this.containingTask.getEnvironment(), this.streamConfig);
        this.epochWatermarkTracker = OperatorEpochWatermarkTrackerFactory.create(this.streamConfig, this.containingTask, this);
        this.uniqueSenderId = OperatorUtils.getUniqueSenderId(this.streamConfig.getOperatorID(), this.containingTask.getIndexInSubtaskGroup());
        this.eventBroadcastOutput = BroadcastOutputFactory.createBroadcastOutput(this.output, this.metrics.getIOMetricGroup().getNumRecordsOutCounter());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onEpochWatermarkEvent(int i, IterationRecord<?> iterationRecord) throws IOException {
        Preconditions.checkState(iterationRecord.getType() == IterationRecord.Type.EPOCH_WATERMARK, "The record " + iterationRecord + " is not epoch watermark.");
        this.epochWatermarkTracker.onEpochWatermark(i, iterationRecord.getSender(), iterationRecord.getEpoch());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyEpochWatermarkIncrement(IterationListener<?> iterationListener, int i) throws Exception {
        if (i != Integer.MAX_VALUE) {
            iterationListener.onEpochWatermarkIncremented(i, this.iterationContext, new TimestampedCollector(this.proxyOutput));
        } else {
            iterationListener.onIterationTerminated(this.iterationContext, new TimestampedCollector(this.proxyOutput));
        }
    }

    @Override // org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerListener
    public void onEpochWatermarkIncrement(int i) throws IOException {
        this.eventBroadcastOutput.broadcastEmit(new StreamRecord<>(IterationRecord.newEpochWatermark(i, this.uniqueSenderId)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setIterationContextRound(Integer num) {
        this.proxyOutput.setContextRound(num);
        this.epochWatermarkSupplier.set(num);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearIterationContextRound() {
        this.proxyOutput.setContextRound(null);
        this.epochWatermarkSupplier.set(null);
    }

    public void endInput(int i) throws Exception {
        this.epochWatermarkTracker.finish(i - 1);
    }

    private InternalOperatorMetricGroup createOperatorMetricGroup(Environment environment, StreamConfig streamConfig) {
        try {
            InternalOperatorMetricGroup orAddOperator = environment.getMetricGroup().getOrAddOperator(streamConfig.getOperatorID(), streamConfig.getOperatorName());
            if (streamConfig.isChainEnd()) {
                orAddOperator.getIOMetricGroup().reuseOutputMetricsForTask();
            }
            return orAddOperator;
        } catch (Exception e) {
            LOG.warn("An error occurred while instantiating task metrics.", e);
            return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }
    }
}
