package org.apache.flink.iteration.operator.allround;

import java.io.IOException;
import java.util.Collections;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.iteration.IterationListener;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.AbstractWrapperOperator;
import org.apache.flink.iteration.operator.OperatorStateUtils;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.class */
public abstract class AbstractAllRoundWrapperOperator<T, S extends StreamOperator<T>> extends AbstractWrapperOperator<T> {
    protected final S wrappedOperator;
    private int latestEpochWatermark;
    private ListState<Integer> parallelismState;
    private ListState<Integer> latestEpochWatermarkState;

    /* loaded from: input_file:org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator$RecordingStreamTaskStateInitializer.class */
    private static class RecordingStreamTaskStateInitializer implements StreamTaskStateInitializer {
        private final StreamTaskStateInitializer wrapped;
        StreamOperatorStateContext lastCreated;

        public RecordingStreamTaskStateInitializer(StreamTaskStateInitializer streamTaskStateInitializer) {
            this.wrapped = streamTaskStateInitializer;
        }

        public StreamOperatorStateContext streamOperatorStateContext(@Nonnull OperatorID operatorID, @Nonnull String str, @Nonnull ProcessingTimeService processingTimeService, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> typeSerializer, @Nonnull CloseableRegistry closeableRegistry, @Nonnull MetricGroup metricGroup, double d, boolean z) throws Exception {
            this.lastCreated = this.wrapped.streamOperatorStateContext(operatorID, str, processingTimeService, keyContext, typeSerializer, closeableRegistry, metricGroup, d, z);
            return this.lastCreated;
        }
    }

    public AbstractAllRoundWrapperOperator(StreamOperatorParameters<IterationRecord<T>> streamOperatorParameters, StreamOperatorFactory<T> streamOperatorFactory) {
        super(streamOperatorParameters, streamOperatorFactory);
        this.latestEpochWatermark = -1;
        this.wrappedOperator = (S) StreamOperatorFactoryUtil.createOperator(streamOperatorFactory, streamOperatorParameters.getContainingTask(), OperatorUtils.createWrappedOperatorConfig(streamOperatorParameters.getStreamConfig()), this.proxyOutput, streamOperatorParameters.getOperatorEventDispatcher()).f0;
        OperatorUtils.processOperatorOrUdfIfSatisfy(this.wrappedOperator, EpochAware.class, epochAware -> {
            epochAware.setEpochSupplier(this.epochWatermarkSupplier);
        });
    }

    @Override // org.apache.flink.iteration.operator.AbstractWrapperOperator, org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerListener
    public void onEpochWatermarkIncrement(int i) throws IOException {
        if (i > this.latestEpochWatermark) {
            this.latestEpochWatermark = i;
            setIterationContextRound(Integer.valueOf(i));
            OperatorUtils.processOperatorOrUdfIfSatisfy(this.wrappedOperator, IterationListener.class, iterationListener -> {
                notifyEpochWatermarkIncrement(iterationListener, i);
            });
            clearIterationContextRound();
        }
        super.onEpochWatermarkIncrement(i);
    }

    public void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        RecordingStreamTaskStateInitializer recordingStreamTaskStateInitializer = new RecordingStreamTaskStateInitializer(streamTaskStateInitializer);
        this.wrappedOperator.initializeState(recordingStreamTaskStateInitializer);
        Preconditions.checkState(recordingStreamTaskStateInitializer.lastCreated != null);
        OperatorStateBackend operatorStateBackend = recordingStreamTaskStateInitializer.lastCreated.operatorStateBackend();
        this.parallelismState = operatorStateBackend.getUnionListState(new ListStateDescriptor("parallelism", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(this.parallelismState, "parallelism").ifPresent(num -> {
            Preconditions.checkState(num.intValue() == this.containingTask.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(), "The all-round wrapper operator is recovered with parallelism changed from " + num + " to " + this.containingTask.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks());
        });
        this.latestEpochWatermarkState = operatorStateBackend.getListState(new ListStateDescriptor("latestEpoch", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(this.latestEpochWatermarkState, "latestEpoch").ifPresent(num2 -> {
            this.latestEpochWatermark = num2.intValue();
        });
    }

    public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        this.parallelismState.clear();
        if (this.containingTask.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
            this.parallelismState.update(Collections.singletonList(Integer.valueOf(this.containingTask.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks())));
        }
        this.latestEpochWatermarkState.update(Collections.singletonList(Integer.valueOf(this.latestEpochWatermark)));
        return this.wrappedOperator.snapshotState(j, j2, checkpointOptions, checkpointStreamFactory);
    }

    public void open() throws Exception {
        this.wrappedOperator.open();
    }

    public void finish() throws Exception {
        this.wrappedOperator.finish();
    }

    public void close() throws Exception {
        this.wrappedOperator.close();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.wrappedOperator.prepareSnapshotPreBarrier(j);
    }

    public void setKeyContextElement1(StreamRecord<?> streamRecord) throws Exception {
        this.wrappedOperator.setKeyContextElement1(streamRecord);
    }

    public void setKeyContextElement2(StreamRecord<?> streamRecord) throws Exception {
        this.wrappedOperator.setKeyContextElement2(streamRecord);
    }

    public OperatorMetricGroup getMetricGroup() {
        return this.wrappedOperator.getMetricGroup();
    }

    public OperatorID getOperatorID() {
        return this.wrappedOperator.getOperatorID();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.wrappedOperator.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.wrappedOperator.notifyCheckpointAborted(j);
    }

    public void setCurrentKey(Object obj) {
        this.wrappedOperator.setCurrentKey(obj);
    }

    public Object getCurrentKey() {
        return this.wrappedOperator.getCurrentKey();
    }

    @VisibleForTesting
    int getLatestEpochWatermark() {
        return this.latestEpochWatermark;
    }
}
