package org.apache.flink.iteration.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.iteration.IterationID;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.broadcast.BroadcastOutput;
import org.apache.flink.iteration.broadcast.BroadcastOutputFactory;
import org.apache.flink.iteration.checkpoint.Checkpoints;
import org.apache.flink.iteration.checkpoint.CheckpointsBroker;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheSnapshot;
import org.apache.flink.iteration.operator.event.CoordinatorCheckpointEvent;
import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
import org.apache.flink.iteration.operator.event.SubtaskAlignedEvent;
import org.apache.flink.iteration.operator.event.TerminatingOnInitializeEvent;
import org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor;
import org.apache.flink.iteration.operator.headprocessor.HeadOperatorState;
import org.apache.flink.iteration.operator.headprocessor.RegularHeadOperatorRecordProcessor;
import org.apache.flink.iteration.operator.headprocessor.TerminatingHeadOperatorRecordProcessor;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.iteration.utils.ReflectionUtils;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumerWithPartialRecordLength;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.statefun.flink.core.feedback.FeedbackChannelBroker;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/iteration/operator/HeadOperator.class */
public class HeadOperator extends AbstractStreamOperator<IterationRecord<?>> implements OneInputStreamOperator<IterationRecord<?>, IterationRecord<?>>, FeedbackConsumer<StreamRecord<IterationRecord<?>>>, OperatorEventHandler, BoundedOneInput {
    public static final OutputTag<IterationRecord<Void>> ALIGN_NOTIFY_OUTPUT_TAG = new OutputTag<>("aligned", new IterationRecordTypeInfo(BasicTypeInfo.VOID_TYPE_INFO));
    private final IterationID iterationId;
    private final int feedbackIndex;
    private final boolean isCriteriaStream;
    private final OperatorEventGateway operatorEventGateway;
    private final MailboxExecutor mailboxExecutor;
    private transient BroadcastOutput<?> eventBroadcastOutput;
    private transient ContextImpl processorContext;
    private HeadOperatorStatus status;
    private HeadOperatorRecordProcessor recordProcessor;
    private HeadOperatorCheckpointAligner checkpointAligner;
    private ListState<Integer> parallelismState;
    private ListState<Integer> statusState;
    private ListState<HeadOperatorState> processorState;
    private Checkpoints<IterationRecord<?>> checkpoints;

    /* loaded from: input_file:org/apache/flink/iteration/operator/HeadOperator$ContextImpl.class */
    private class ContextImpl implements HeadOperatorRecordProcessor.Context {
        private ContextImpl() {
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public StreamConfig getStreamConfig() {
            return HeadOperator.this.config;
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public TaskInfo getTaskInfo() {
            return HeadOperator.this.getContainingTask().getEnvironment().getTaskInfo();
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public void output(StreamRecord<IterationRecord<?>> streamRecord) {
            HeadOperator.this.output.collect(streamRecord);
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public void output(OutputTag<IterationRecord<?>> outputTag, StreamRecord<IterationRecord<?>> streamRecord) {
            HeadOperator.this.output.collect(outputTag, streamRecord);
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public void broadcastOutput(StreamRecord<IterationRecord<?>> streamRecord) {
            try {
                HeadOperator.this.eventBroadcastOutput.broadcastEmit(streamRecord);
            } catch (IOException e) {
                throw new FlinkRuntimeException("Failed to broadcast event", e);
            }
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public void updateEpochToCoordinator(int i, long j) {
            HeadOperator.this.operatorEventGateway.sendEventToCoordinator(new SubtaskAlignedEvent(i, j, HeadOperator.this.isCriteriaStream));
        }

        @Override // org.apache.flink.iteration.operator.headprocessor.HeadOperatorRecordProcessor.Context
        public void notifyTerminatingOnInitialize() {
            HeadOperator.this.operatorEventGateway.sendEventToCoordinator(TerminatingOnInitializeEvent.INSTANCE);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/iteration/operator/HeadOperator$HeadOperatorStatus.class */
    public enum HeadOperatorStatus {
        RUNNING,
        TERMINATING,
        TERMINATED
    }

    public HeadOperator(IterationID iterationID, int i, boolean z, MailboxExecutor mailboxExecutor, OperatorEventGateway operatorEventGateway, ProcessingTimeService processingTimeService) {
        this.iterationId = (IterationID) Objects.requireNonNull(iterationID);
        this.feedbackIndex = i;
        this.isCriteriaStream = z;
        this.mailboxExecutor = (MailboxExecutor) Objects.requireNonNull(mailboxExecutor);
        this.operatorEventGateway = (OperatorEventGateway) Objects.requireNonNull(operatorEventGateway);
        ((AbstractStreamOperator) this).processingTimeService = processingTimeService;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<IterationRecord<?>>> output) {
        super.setup(streamTask, streamConfig, output);
        this.eventBroadcastOutput = BroadcastOutputFactory.createBroadcastOutput(output, this.metrics.getIOMetricGroup().getNumRecordsOutCounter());
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.parallelismState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("parallelism", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(this.parallelismState, "parallelism").ifPresent(num -> {
            Preconditions.checkState(num.intValue() == getRuntimeContext().getNumberOfParallelSubtasks(), "The head operator is recovered with parallelism changed from " + num + " to " + getRuntimeContext().getNumberOfParallelSubtasks());
        });
        this.processorContext = new ContextImpl();
        this.statusState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("status", Integer.class));
        this.status = HeadOperatorStatus.values()[((Integer) OperatorStateUtils.getUniqueElement(this.statusState, "status").orElse(0)).intValue()];
        if (this.status == HeadOperatorStatus.RUNNING) {
            this.recordProcessor = new RegularHeadOperatorRecordProcessor(this.processorContext);
        } else {
            this.recordProcessor = new TerminatingHeadOperatorRecordProcessor(this.processorContext);
        }
        this.processorState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("processorState", HeadOperatorState.class));
        OperatorStateUtils.getUniqueElement(this.processorState, "processorState").ifPresent(headOperatorState -> {
            this.recordProcessor.initializeState(headOperatorState, stateInitializationContext.getRawOperatorStateInputs());
        });
        this.checkpointAligner = new HeadOperatorCheckpointAligner();
        Path dataCachePath = OperatorUtils.getDataCachePath(getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration(), getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.checkpoints = new Checkpoints<>(this.config.getTypeSerializerOut(getClass().getClassLoader()), dataCachePath.getFileSystem(), OperatorUtils.createDataCacheFileGenerator(dataCachePath, "header-cp", getOperatorConfig().getOperatorID()));
        CheckpointsBroker.get().setCheckpoints(OperatorUtils.createFeedbackKey(this.iterationId, this.feedbackIndex).withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber()), this.checkpoints);
        try {
            Iterator it = stateInitializationContext.getRawOperatorStateInputs().iterator();
            while (it.hasNext()) {
                DataCacheSnapshot.replay(((StatePartitionStreamProvider) it.next()).getStream(), this.checkpoints.getTypeSerializer(), this.checkpoints.getFileSystem(), iterationRecord -> {
                    this.recordProcessor.processFeedbackElement(new StreamRecord<>(iterationRecord));
                });
            }
            registerFeedbackConsumer(runnable -> {
                if (this.status != HeadOperatorStatus.TERMINATED) {
                    MailboxExecutor mailboxExecutor = this.mailboxExecutor;
                    runnable.getClass();
                    mailboxExecutor.execute(runnable::run, "Head feedback");
                }
            });
        } catch (Exception e) {
            throw new FlinkRuntimeException("Failed to replay the records", e);
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        HeadOperatorCheckpointAligner headOperatorCheckpointAligner = this.checkpointAligner;
        HeadOperatorStatus headOperatorStatus = this.status;
        MailboxExecutor mailboxExecutor = this.mailboxExecutor;
        mailboxExecutor.getClass();
        headOperatorCheckpointAligner.waitTillCoordinatorNotified(headOperatorStatus, j, mailboxExecutor::yield);
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.parallelismState.clear();
        if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.parallelismState.update(Collections.singletonList(Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks())));
        }
        this.statusState.update(Collections.singletonList(Integer.valueOf(this.status.ordinal())));
        this.processorState.update(Collections.singletonList(this.recordProcessor.snapshotState()));
        if (this.status == HeadOperatorStatus.RUNNING) {
            this.checkpoints.startLogging(stateSnapshotContext.getCheckpointId(), stateSnapshotContext.getRawOperatorStateOutput());
        }
        this.checkpointAligner.onStateSnapshot(stateSnapshotContext.getCheckpointId()).forEach(this::processGloballyAlignedEvent);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        super.notifyCheckpointAborted(j);
        this.checkpointAligner.onCheckpointAborted(j).forEach(this::processGloballyAlignedEvent);
    }

    public void processElement(StreamRecord<IterationRecord<?>> streamRecord) throws Exception {
        this.recordProcessor.processElement(streamRecord);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processFeedback(StreamRecord<IterationRecord<?>> streamRecord) throws Exception {
        if (((IterationRecord) streamRecord.getValue()).getType() == IterationRecord.Type.BARRIER) {
            this.checkpoints.commitCheckpointsUntil(((IterationRecord) streamRecord.getValue()).getCheckpointId());
            return;
        }
        this.checkpoints.append(streamRecord.getValue());
        if (this.recordProcessor.processFeedbackElement(streamRecord)) {
            Preconditions.checkState(this.status == HeadOperatorStatus.TERMINATING);
            this.status = HeadOperatorStatus.TERMINATED;
        }
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        if (operatorEvent instanceof GloballyAlignedEvent) {
            this.checkpointAligner.checkHoldingGloballyAlignedEvent((GloballyAlignedEvent) operatorEvent).ifPresent(this::processGloballyAlignedEvent);
        } else {
            if (!(operatorEvent instanceof CoordinatorCheckpointEvent)) {
                throw new FlinkRuntimeException("Unsupported operator event: " + operatorEvent);
            }
            this.checkpointAligner.coordinatorNotify((CoordinatorCheckpointEvent) operatorEvent);
        }
    }

    private void processGloballyAlignedEvent(GloballyAlignedEvent globallyAlignedEvent) {
        if (this.recordProcessor.onGloballyAligned(globallyAlignedEvent)) {
            this.status = HeadOperatorStatus.TERMINATING;
            this.recordProcessor = new TerminatingHeadOperatorRecordProcessor(this.processorContext);
        }
    }

    public void endInput() throws Exception {
        if (this.status == HeadOperatorStatus.RUNNING) {
            this.recordProcessor.processElement(new StreamRecord<>(IterationRecord.newEpochWatermark(0, "fake")));
        }
        Preconditions.checkState(getContainingTask().getEnvironment().getAllInputGates().length == 1);
        Preconditions.checkState(getContainingTask().getEnvironment().getAllInputGates()[0].getNumberOfInputChannels() == 1);
        InputChannel channel = getContainingTask().getEnvironment().getAllInputGates()[0].getChannel(0);
        boolean z = false;
        long j = 0;
        while (!z && this.status != HeadOperatorStatus.TERMINATED) {
            this.mailboxExecutor.tryYield();
            Thread.sleep(200L);
            Iterator<AbstractEvent> it = parseInputChannelEvents(channel).iterator();
            while (it.hasNext()) {
                CheckpointBarrier checkpointBarrier = (AbstractEvent) it.next();
                if (checkpointBarrier instanceof CheckpointBarrier) {
                    CheckpointBarrier checkpointBarrier2 = checkpointBarrier;
                    if (checkpointBarrier2.getId() > j) {
                        getContainingTask().triggerCheckpointAsync(new CheckpointMetaData(checkpointBarrier2.getId(), checkpointBarrier2.getTimestamp()), checkpointBarrier2.getCheckpointOptions());
                        j = checkpointBarrier2.getId();
                    }
                } else if (checkpointBarrier instanceof EndOfPartitionEvent) {
                    z = true;
                }
            }
        }
        while (this.status != HeadOperatorStatus.TERMINATED) {
            this.mailboxExecutor.yield();
        }
    }

    public void close() throws Exception {
        if (this.checkpoints != null) {
            this.checkpoints.close();
        }
    }

    private void registerFeedbackConsumer(Executor executor) {
        OperatorUtils.registerFeedbackConsumer(FeedbackChannelBroker.get().getChannel(OperatorUtils.createFeedbackKey(this.iterationId, this.feedbackIndex).withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber())), this, executor);
    }

    private List<AbstractEvent> parseInputChannelEvents(InputChannel inputChannel) throws Exception {
        ArrayList arrayList = new ArrayList();
        if (inputChannel instanceof RemoteInputChannel) {
            Class<?> cls = Class.forName("org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$SequenceBuffer");
            Iterator it = ((PrioritizedDeque) ReflectionUtils.getFieldValue(inputChannel, RemoteInputChannel.class, "receivedBuffers")).iterator();
            while (it.hasNext()) {
                Buffer buffer = (Buffer) ReflectionUtils.getFieldValue(it.next(), cls, "buffer");
                if (!buffer.isBuffer()) {
                    arrayList.add(EventSerializer.fromBuffer(buffer, getClass().getClassLoader()));
                }
            }
        } else if (inputChannel instanceof LocalInputChannel) {
            Iterator it2 = ((PrioritizedDeque) ReflectionUtils.getFieldValue((PipelinedSubpartition) ReflectionUtils.getFieldValue((PipelinedSubpartitionView) ReflectionUtils.getFieldValue(inputChannel, LocalInputChannel.class, "subpartitionView"), PipelinedSubpartitionView.class, "parent"), PipelinedSubpartition.class, "buffers")).iterator();
            while (it2.hasNext()) {
                BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength = (BufferConsumerWithPartialRecordLength) it2.next();
                if (!bufferConsumerWithPartialRecordLength.getBufferConsumer().isBuffer()) {
                    arrayList.add(EventSerializer.fromBuffer(bufferConsumerWithPartialRecordLength.getBufferConsumer().copy().build(), getClass().getClassLoader()));
                }
            }
        } else {
            LOG.warn("Unknown input channel type: " + inputChannel);
        }
        return arrayList;
    }

    @VisibleForTesting
    public OperatorEventGateway getOperatorEventGateway() {
        return this.operatorEventGateway;
    }

    @VisibleForTesting
    MailboxExecutor getMailboxExecutor() {
        return this.mailboxExecutor;
    }

    @VisibleForTesting
    HeadOperatorRecordProcessor getRecordProcessor() {
        return this.recordProcessor;
    }

    @VisibleForTesting
    public HeadOperatorStatus getStatus() {
        return this.status;
    }
}
