package org.apache.flink.kubernetes;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.util.ResourceManagerUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver.class */
public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver<KubernetesWorkerNode> {
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d";
    private final String clusterId;
    private final String webInterfaceUrl;
    private final FlinkKubeClient flinkKubeClient;
    private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures;
    private long currentMaxAttemptId;
    private long currentMaxPodId;
    private Optional<KubernetesWatch> podsWatchOpt;
    private volatile boolean running;
    private FlinkPod taskManagerPodTemplate;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$PodCallbackHandlerImpl.class */
    public class PodCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
        private PodCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onAdded(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list, PodEvent.ADDED);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onModified(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list, PodEvent.MODIFIED);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onDeleted(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list, PodEvent.DELETED);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onError(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list, PodEvent.ERROR);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void handleError(Throwable th) {
            if (th instanceof KubernetesTooOldResourceVersionException) {
                KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                    if (KubernetesResourceManagerDriver.this.running) {
                        KubernetesResourceManagerDriver.this.podsWatchOpt.ifPresent((v0) -> {
                            v0.close();
                        });
                        KubernetesResourceManagerDriver.this.log.info("Creating a new watch on TaskManager pods.");
                        try {
                            KubernetesResourceManagerDriver.this.podsWatchOpt = KubernetesResourceManagerDriver.this.watchTaskManagerPods();
                        } catch (Exception e) {
                            KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(e);
                        }
                    }
                });
            } else {
                KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$PodEvent.class */
    public enum PodEvent {
        ADDED,
        MODIFIED,
        DELETED,
        ERROR
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$RetryableException.class */
    public static class RetryableException extends FlinkException {
        private static final long serialVersionUID = 1;

        RetryableException(String str) {
            super(str);
        }
    }

    public KubernetesResourceManagerDriver(Configuration configuration, FlinkKubeClient flinkKubeClient, KubernetesResourceManagerDriverConfiguration kubernetesResourceManagerDriverConfiguration) {
        super(configuration, GlobalConfiguration.loadConfiguration());
        this.currentMaxAttemptId = 0L;
        this.currentMaxPodId = 0L;
        this.clusterId = (String) Preconditions.checkNotNull(kubernetesResourceManagerDriverConfiguration.getClusterId());
        this.webInterfaceUrl = kubernetesResourceManagerDriverConfiguration.getWebInterfaceUrl();
        this.flinkKubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.requestResourceFutures = new HashMap();
        this.running = false;
    }

    protected void initializeInternal() throws Exception {
        this.podsWatchOpt = watchTaskManagerPods();
        File taskManagerPodTemplateFileInPod = KubernetesUtils.getTaskManagerPodTemplateFileInPod();
        if (taskManagerPodTemplateFileInPod.exists()) {
            this.taskManagerPodTemplate = KubernetesUtils.loadPodFromTemplateFile(this.flinkKubeClient, taskManagerPodTemplateFileInPod, Constants.MAIN_CONTAINER_NAME);
        } else {
            this.taskManagerPodTemplate = new FlinkPod.Builder().build();
        }
        updateKubernetesServiceTargetPortIfNecessary();
        recoverWorkerNodesFromPreviousAttempts();
        this.running = true;
    }

    public void terminate() throws Exception {
        if (this.running) {
            this.running = false;
            Exception exc = null;
            try {
                this.podsWatchOpt.ifPresent((v0) -> {
                    v0.close();
                });
            } catch (Exception e) {
                exc = e;
            }
            try {
                this.flinkKubeClient.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    public void deregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        this.log.info("Deregistering Flink Kubernetes cluster, clusterId: {}, diagnostics: {}", this.clusterId, str == null ? "" : str);
        this.flinkKubeClient.stopAndCleanupCluster(this.clusterId);
    }

    public CompletableFuture<KubernetesWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        KubernetesTaskManagerParameters createKubernetesTaskManagerParameters = createKubernetesTaskManagerParameters(taskExecutorProcessSpec, getBlockedNodeRetriever().getAllBlockedNodeIds());
        KubernetesPod buildTaskManagerKubernetesPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(this.taskManagerPodTemplate, createKubernetesTaskManagerParameters);
        String name = buildTaskManagerKubernetesPod.getName();
        CompletableFuture<KubernetesWorkerNode> completableFuture = new CompletableFuture<>();
        this.requestResourceFutures.put(name, completableFuture);
        this.log.info("Creating new TaskManager pod with name {} and resource <{},{}>.", new Object[]{name, Integer.valueOf(createKubernetesTaskManagerParameters.getTaskManagerMemoryMB()), Double.valueOf(createKubernetesTaskManagerParameters.getTaskManagerCPU())});
        CompletableFuture<Void> createTaskManagerPod = this.flinkKubeClient.createTaskManagerPod(buildTaskManagerKubernetesPod);
        FutureUtils.assertNoException(createTaskManagerPod.handleAsync((r9, th) -> {
            if (th != null) {
                this.log.warn("Could not create pod {}, exception: {}", name, th);
                CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(buildTaskManagerKubernetesPod.getName());
                if (remove == null) {
                    return null;
                }
                remove.completeExceptionally(th);
                return null;
            }
            if (!completableFuture.isCancelled()) {
                this.log.info("Pod {} is created.", name);
                return null;
            }
            stopPod(name);
            this.log.info("pod {} is cancelled before create pod finish, stop it.", name);
            return null;
        }, (Executor) getMainThreadExecutor()));
        FutureUtils.assertNoException(completableFuture.handle((kubernetesWorkerNode, th2) -> {
            if (th2 == null) {
                return null;
            }
            if (!(th2 instanceof CancellationException)) {
                if (th2 instanceof RetryableException) {
                    return null;
                }
                this.log.error("Error completing resource request.", th2);
                ExceptionUtils.rethrow(th2);
                return null;
            }
            this.requestResourceFutures.remove(buildTaskManagerKubernetesPod.getName());
            if (!createTaskManagerPod.isDone()) {
                return null;
            }
            this.log.info("pod {} is cancelled before scheduled, stop it.", name);
            stopPod(buildTaskManagerKubernetesPod.getName());
            return null;
        }));
        return completableFuture;
    }

    public void releaseResource(KubernetesWorkerNode kubernetesWorkerNode) {
        String resourceID = kubernetesWorkerNode.getResourceID().toString();
        this.log.info("Stopping TaskManager pod {}.", resourceID);
        stopPod(resourceID);
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        List<KubernetesPod> podsWithLabels = this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerSelectors(this.clusterId));
        ArrayList arrayList = new ArrayList();
        for (KubernetesPod kubernetesPod : podsWithLabels) {
            KubernetesWorkerNode kubernetesWorkerNode = new KubernetesWorkerNode(new ResourceID(kubernetesPod.getName()));
            long attempt = kubernetesWorkerNode.getAttempt();
            if (attempt > this.currentMaxAttemptId) {
                this.currentMaxAttemptId = attempt;
            }
            if (kubernetesPod.isTerminated() || !kubernetesPod.isScheduled()) {
                stopPod(kubernetesPod.getName());
            } else {
                arrayList.add(kubernetesWorkerNode);
            }
        }
        Logger logger = this.log;
        Integer valueOf = Integer.valueOf(arrayList.size());
        long j = this.currentMaxAttemptId + 1;
        this.currentMaxAttemptId = j;
        logger.info("Recovered {} pods from previous attempts, current attempt id is {}.", valueOf, Long.valueOf(j));
        getResourceEventHandler().onPreviousAttemptWorkersRecovered(arrayList);
    }

    private void updateKubernetesServiceTargetPortIfNecessary() throws Exception {
        if (KubernetesUtils.isHostNetwork(this.flinkConfig)) {
            int intValue = ResourceManagerUtils.parseRestBindPortFromWebInterfaceUrl(this.webInterfaceUrl).intValue();
            Preconditions.checkArgument(intValue > 0, "Failed to parse rest port from " + this.webInterfaceUrl);
            this.flinkKubeClient.updateServiceTargetPort(ExternalServiceDecorator.getExternalServiceName(this.clusterId), Constants.REST_PORT_NAME, intValue).get();
            if (HighAvailabilityMode.isHighAvailabilityModeActivated(this.flinkConfig)) {
                return;
            }
            String internalServiceName = InternalServiceDecorator.getInternalServiceName(this.clusterId);
            this.flinkKubeClient.updateServiceTargetPort(internalServiceName, Constants.BLOB_SERVER_PORT_NAME, Integer.parseInt(this.flinkConfig.getString(BlobServerOptions.PORT))).get();
            this.flinkKubeClient.updateServiceTargetPort(internalServiceName, Constants.JOB_MANAGER_RPC_PORT_NAME, this.flinkConfig.getInteger(JobManagerOptions.PORT)).get();
        }
    }

    private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(TaskExecutorProcessSpec taskExecutorProcessSpec, Set<String> set) {
        long j = this.currentMaxPodId + 1;
        this.currentMaxPodId = j;
        String format = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, Long.valueOf(this.currentMaxAttemptId), Long.valueOf(j));
        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(this.flinkConfig, taskExecutorProcessSpec);
        Configuration configuration = new Configuration(this.flinkConfig);
        configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, format);
        return new KubernetesTaskManagerParameters(this.flinkConfig, format, BootstrapTools.getDynamicPropertiesAsString(this.flinkClientConfig, configuration), ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec), create, ExternalResourceUtils.getExternalResourceConfigurationKeys(this.flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handlePodEventsInMainThread(List<KubernetesPod> list, PodEvent podEvent) {
        getMainThreadExecutor().execute(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                KubernetesPod kubernetesPod = (KubernetesPod) it.next();
                if (podEvent == PodEvent.DELETED || kubernetesPod.isTerminated()) {
                    onPodTerminated(kubernetesPod);
                } else if (kubernetesPod.isScheduled()) {
                    onPodScheduled(kubernetesPod);
                }
            }
        });
    }

    private void onPodScheduled(KubernetesPod kubernetesPod) {
        String name = kubernetesPod.getName();
        CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(name);
        if (remove == null) {
            this.log.debug("Ignore TaskManager pod that is already added: {}", name);
        } else {
            this.log.info("Received new TaskManager pod: {}", name);
            remove.complete(new KubernetesWorkerNode(new ResourceID(name)));
        }
    }

    private void onPodTerminated(KubernetesPod kubernetesPod) {
        String name = kubernetesPod.getName();
        this.log.debug("TaskManager pod {} is terminated.", name);
        CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(name);
        if (remove != null) {
            this.log.warn("Pod {} is terminated before being scheduled.", name);
            remove.completeExceptionally(new RetryableException("Pod is terminated."));
        }
        getResourceEventHandler().onWorkerTerminated(new ResourceID(name), kubernetesPod.getTerminatedDiagnostics());
        stopPod(name);
    }

    private void stopPod(String str) {
        this.flinkKubeClient.stopPod(str).whenComplete((r7, th) -> {
            if (th != null) {
                this.log.warn("Could not remove TaskManager pod {}, exception: {}", str, th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
        return Optional.of(this.flinkKubeClient.watchPodsAndDoCallback(KubernetesUtils.getTaskManagerSelectors(this.clusterId), new PodCallbackHandlerImpl()));
    }
}
