package org.apache.flink.test.recovery;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DispatcherProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.class */
public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
    private static ZooKeeperTestEnvironment zooKeeper;
    private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5);

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    protected static final String READY_MARKER_FILE_PREFIX = "ready_";
    protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
    protected static final String PROCEED_MARKER_FILE = "proceed";
    protected static final int PARALLELISM = 4;
    private final ExecutionMode executionMode;

    @BeforeClass
    public static void setup() {
        zooKeeper = new ZooKeeperTestEnvironment(1);
    }

    @Before
    public void cleanUp() throws Exception {
        zooKeeper.deleteAll();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zooKeeper != null) {
            zooKeeper.shutdown();
        }
    }

    public JobManagerHAProcessFailureRecoveryITCase(ExecutionMode executionMode) {
        this.executionMode = executionMode;
    }

    @Parameterized.Parameters(name = "ExecutionMode {0}")
    public static Collection<Object[]> executionMode() {
        return Arrays.asList(new Object[]{ExecutionMode.PIPELINED}, new Object[]{ExecutionMode.BATCH});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void testJobManagerFailure(String str, final File file, File file2) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, str);
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, file2.getAbsolutePath());
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("leader", 1, configuration, new String[0]);
        createRemoteEnvironment.setParallelism(PARALLELISM);
        createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        createRemoteEnvironment.getConfig().setExecutionMode(this.executionMode);
        createRemoteEnvironment.generateSequence(1L, 100000L).rebalance().map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.3
            private final File proceedFile;
            private boolean markerCreated = false;
            private boolean checkForProceedFile = true;

            {
                this.proceedFile = new File(file, JobManagerHAProcessFailureRecoveryITCase.PROCEED_MARKER_FILE);
            }

            public Long map(Long l) throws Exception {
                if (!this.markerCreated) {
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, JobManagerHAProcessFailureRecoveryITCase.READY_MARKER_FILE_PREFIX + getRuntimeContext().getIndexOfThisSubtask()));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return l;
            }
        }).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.2
            public Long reduce(Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }).flatMap(new RichFlatMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.1
            public void flatMap(Long l, Collector<Long> collector) throws Exception {
                Assert.assertEquals(5000050000L, l.longValue());
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, JobManagerHAProcessFailureRecoveryITCase.FINISH_MARKER_FILE_PREFIX + getRuntimeContext().getIndexOfThisSubtask()));
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Long) obj, (Collector<Long>) collector);
            }
        }).output(new DiscardingOutputFormat());
        createRemoteEnvironment.execute();
    }

    @Test
    public void testDispatcherProcessFailure() throws Exception {
        Time seconds = Time.seconds(30L);
        final File newFolder = this.temporaryFolder.newFolder();
        Assert.assertEquals(4L, 4L);
        DispatcherProcess[] dispatcherProcessArr = new DispatcherProcess[2];
        TaskManagerRunner[] taskManagerRunnerArr = new TaskManagerRunner[2];
        HighAvailabilityServices highAvailabilityServices = null;
        LeaderRetrievalService leaderRetrievalService = null;
        final File file = null;
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zooKeeper.getConnectString(), newFolder.getPath());
        createZooKeeperHAConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
        createZooKeeperHAConfig.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
        createZooKeeperHAConfig.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
        createZooKeeperHAConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        createZooKeeperHAConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
        createZooKeeperHAConfig.set(TaskManagerOptions.CPU_CORES, Double.valueOf(1.0d));
        AkkaRpcService createAndStart = AkkaRpcServiceUtils.remoteServiceBuilder(createZooKeeperHAConfig, "localhost", 0).createAndStart();
        try {
            try {
                Deadline fromNow = Deadline.fromNow(TEST_TIMEOUT);
                file = this.temporaryFolder.newFolder();
                dispatcherProcessArr[0] = new DispatcherProcess(0, createZooKeeperHAConfig);
                dispatcherProcessArr[0].startProcess();
                highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(createZooKeeperHAConfig, TestingUtils.defaultExecutor());
                PluginManager createPluginManagerFromRootFolder = PluginUtils.createPluginManagerFromRootFolder(createZooKeeperHAConfig);
                for (int i = 0; i < 2; i++) {
                    taskManagerRunnerArr[i] = new TaskManagerRunner(createZooKeeperHAConfig, createPluginManagerFromRootFolder, TaskManagerRunner::createTaskExecutorService);
                    taskManagerRunnerArr[i].start();
                }
                TestingListener testingListener = new TestingListener();
                leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
                leaderRetrievalService.start(testingListener);
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                waitForTaskManagers(2, (DispatcherGateway) createAndStart.connect(testingListener.getAddress(), DispatcherId.fromUuid(testingListener.getLeaderSessionID()), DispatcherGateway.class).get(), fromNow.timeLeft());
                final Throwable[] thArr = new Throwable[1];
                Thread thread = new Thread("Program Trigger") { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.4
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            JobManagerHAProcessFailureRecoveryITCase.this.testJobManagerFailure(JobManagerHAProcessFailureRecoveryITCase.zooKeeper.getConnectString(), file, newFolder);
                        } catch (Throwable th) {
                            th.printStackTrace();
                            thArr[0] = th;
                        }
                    }
                };
                thread.start();
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(file, READY_MARKER_FILE_PREFIX, PARALLELISM, fromNow.timeLeft().toMillis());
                dispatcherProcessArr[0].destroy();
                dispatcherProcessArr[1] = new DispatcherProcess(1, createZooKeeperHAConfig);
                dispatcherProcessArr[1].startProcess();
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, PROCEED_MARKER_FILE));
                thread.join(fromNow.timeLeft().toMillis());
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(file, FINISH_MARKER_FILE_PREFIX, 1, fromNow.timeLeft().toMillis());
                Assert.assertFalse("The program did not finish in time", thread.isAlive());
                if (thArr[0] != null) {
                    Throwable th = thArr[0];
                    th.printStackTrace();
                    Assert.fail("The program encountered a " + th.getClass().getSimpleName() + " : " + th.getMessage());
                }
                for (int i2 = 0; i2 < 2; i2++) {
                    if (taskManagerRunnerArr[i2] != null) {
                        taskManagerRunnerArr[i2].close();
                    }
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                for (DispatcherProcess dispatcherProcess : dispatcherProcessArr) {
                    if (dispatcherProcess != null) {
                        dispatcherProcess.destroy();
                    }
                }
                if (highAvailabilityServices != null) {
                    highAvailabilityServices.closeAndCleanupAllData();
                }
                RpcUtils.terminateRpcService(createAndStart, seconds);
                if (file != null) {
                    try {
                        FileUtils.deleteDirectory(file);
                    } catch (Throwable th2) {
                    }
                }
            } catch (Throwable th3) {
                th3.printStackTrace();
                for (DispatcherProcess dispatcherProcess2 : dispatcherProcessArr) {
                    if (dispatcherProcess2 != null) {
                        dispatcherProcess2.printProcessLog();
                    }
                }
                throw th3;
            }
        } catch (Throwable th4) {
            for (int i3 = 0; i3 < 2; i3++) {
                if (taskManagerRunnerArr[i3] != null) {
                    taskManagerRunnerArr[i3].close();
                }
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (DispatcherProcess dispatcherProcess3 : dispatcherProcessArr) {
                if (dispatcherProcess3 != null) {
                    dispatcherProcess3.destroy();
                }
            }
            if (highAvailabilityServices != null) {
                highAvailabilityServices.closeAndCleanupAllData();
            }
            RpcUtils.terminateRpcService(createAndStart, seconds);
            if (file != null) {
                try {
                    FileUtils.deleteDirectory(file);
                } catch (Throwable th5) {
                }
            }
            throw th4;
        }
    }

    private void waitForTaskManagers(int i, DispatcherGateway dispatcherGateway, Duration duration) throws ExecutionException, InterruptedException {
        FutureUtils.retrySuccessfulWithDelay(() -> {
            return dispatcherGateway.requestClusterOverview(Time.milliseconds(duration.toMillis()));
        }, Time.milliseconds(50L), Deadline.fromNow(Duration.ofMillis(duration.toMillis())), clusterOverview -> {
            return clusterOverview.getNumTaskManagersConnected() >= i;
        }, new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor())).get();
    }
}
