/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.CancelableInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriFunction;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskExecutorOperatorEventHandlingTest
extends TestLogger {
    private MetricRegistryImpl metricRegistry;
    private TestingRpcService rpcService;

    @Before
    public void setup() {
        this.rpcService = new TestingRpcService();
        this.metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
        this.metricRegistry.startQueryService((RpcService)this.rpcService, new ResourceID("mqs"));
    }

    @After
    public void teardown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.stopService().get();
        }
        if (this.metricRegistry != null) {
            this.metricRegistry.shutdown().get();
        }
    }

    @Test
    public void eventHandlingInTaskFailureFailsTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID eid = new ExecutionAttemptID();
        try (TaskSubmissionTestEnvironment env = this.createExecutorWithRunningTask(jobId, eid, OperatorEventFailingInvokable.class);){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            CompletableFuture resultFuture = tmGateway.sendOperatorEventToTask(eid, new OperatorID(), new SerializedValue((Object)new TestOperatorEvent()));
            Assert.assertThat((Object)resultFuture, (Matcher)FlinkMatchers.futureWillCompleteExceptionally(FlinkException.class, (Duration)Duration.ofSeconds(10L)));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)((Task)env.getTaskSlotTable().getTask(eid)).getExecutionState());
        }
    }

    @Test
    public void eventToCoordinatorDeliveryFailureFailsTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID eid = new ExecutionAttemptID();
        try (TaskSubmissionTestEnvironment env = this.createExecutorWithRunningTask(jobId, eid, OperatorEventSendingInvokable.class);){
            Task task = (Task)env.getTaskSlotTable().getTask(eid);
            task.getExecutingThread().join(10000L);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        }
    }

    private TaskSubmissionTestEnvironment createExecutorWithRunningTask(JobID jobId, ExecutionAttemptID executionAttemptId, Class<? extends AbstractInvokable> invokableClass) throws Exception {
        TaskDeploymentDescriptor tdd = TaskExecutorOperatorEventHandlingTest.createTaskDeploymentDescriptor(jobId, executionAttemptId, invokableClass);
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        JobMasterId token = JobMasterId.generate();
        TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId).setJobMasterId(token).setSlotSize(1).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, taskRunningFuture).setMetricQueryServiceAddress(this.metricRegistry.getMetricQueryServiceGatewayRpcAddress()).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> token).setOperatorEventSender((TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>>)((TriFunction)(eio, oid, value) -> {
            throw new RuntimeException();
        })).build()).build();
        env.getTaskSlotTable().allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds((long)60L));
        TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
        tmGateway.submitTask(tdd, env.getJobMasterId(), Time.seconds((long)10L)).get();
        taskRunningFuture.get();
        return env;
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId, ExecutionAttemptID executionAttemptId, Class<? extends AbstractInvokable> invokableClass) throws IOException {
        return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(jobId, "test job", executionAttemptId, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "test task", 64, 3, 17, 0, new Configuration(), new Configuration(), invokableClass.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
    }

    public static final class OperatorEventSendingInvokable
    extends CancelableInvokable {
        public OperatorEventSendingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public void doInvoke() throws Exception {
            this.getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(new OperatorID(), new SerializedValue((Object)new TestOperatorEvent()));
            this.waitUntilCancelled();
        }
    }

    public static final class OperatorEventFailingInvokable
    extends CancelableInvokable {
        public OperatorEventFailingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public void doInvoke() throws InterruptedException {
            this.waitUntilCancelled();
        }

        public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
            throw new FlinkException("test exception");
        }
    }
}

