package org.apache.dubbo.registry.kubernetes;

import com.alibaba.fastjson.JSONObject;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
import org.apache.dubbo.registry.kubernetes.util.KubernetesConfigUtils;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

/* loaded from: input_file:WEB-INF/lib/dubbo-3.0.4.jar:org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.class */
public class KubernetesServiceDiscovery extends AbstractServiceDiscovery {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private KubernetesClient kubernetesClient;
    private String currentHostname;
    private URL registryURL;
    private String namespace;
    private boolean enableRegister;
    public static final String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
    private static final ConcurrentHashMap<String, Watch> SERVICE_WATCHER = new ConcurrentHashMap<>(64);
    private static final ConcurrentHashMap<String, Watch> PODS_WATCHER = new ConcurrentHashMap<>(64);
    private static final ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER = new ConcurrentHashMap<>(64);
    private static final ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doInitialize(URL url) throws Exception {
        boolean z;
        Config createKubernetesConfig = KubernetesConfigUtils.createKubernetesConfig(url);
        this.kubernetesClient = new DefaultKubernetesClient(createKubernetesConfig);
        this.currentHostname = System.getenv("HOSTNAME");
        this.registryURL = url;
        this.namespace = createKubernetesConfig.getNamespace();
        this.enableRegister = url.getParameter(KubernetesClientConst.ENABLE_REGISTER, true);
        try {
            z = ((PodResource) this.kubernetesClient.pods().withName(this.currentHostname)).get() != null;
        } catch (Throwable th) {
            z = false;
        }
        if (z) {
            KubernetesMeshEnvListener.injectKubernetesEnv(this.kubernetesClient, this.namespace);
        } else {
            this.logger.error("Unable to access api server. Please check your url config. Master URL: " + createKubernetesConfig.getMasterUrl() + " Hostname: " + this.currentHostname);
        }
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doDestroy() throws Exception {
        SERVICE_WATCHER.forEach((str, watch) -> {
            watch.close();
        });
        SERVICE_WATCHER.clear();
        PODS_WATCHER.forEach((str2, watch2) -> {
            watch2.close();
        });
        PODS_WATCHER.clear();
        ENDPOINTS_WATCHER.forEach((str3, watch3) -> {
            watch3.close();
        });
        ENDPOINTS_WATCHER.clear();
        this.kubernetesClient.close();
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
        if (this.enableRegister) {
            ((PodResource) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(this.namespace)).withName(this.currentHostname)).edit(pod -> {
                return ((PodBuilder) new PodBuilder(pod).editOrNewMetadata().addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata())).endMetadata()).build();
            });
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Write Current Service Instance Metadata to Kubernetes pod. Current pod name: " + this.currentHostname);
            }
        }
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
        register(serviceInstance);
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
        if (this.enableRegister) {
            ((PodResource) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(this.namespace)).withName(this.currentHostname)).edit(pod -> {
                return ((PodBuilder) new PodBuilder(pod).editOrNewMetadata().removeFromAnnotations(KUBERNETES_PROPERTIES_KEY).endMetadata()).build();
            });
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + this.currentHostname);
            }
        }
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public Set<String> getServices() {
        return (Set) ((ServiceList) ((NonNamespaceOperation) this.kubernetesClient.services().inNamespace(this.namespace)).list()).getItems().stream().map(service -> {
            return service.getMetadata().getName();
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public List<ServiceInstance> getInstances(String str) throws NullPointerException {
        return toServiceInstance((Endpoints) ((Resource) ((NonNamespaceOperation) this.kubernetesClient.endpoints().inNamespace(this.namespace)).withName(str)).get(), str);
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener serviceInstancesChangedListener) throws NullPointerException, IllegalArgumentException {
        serviceInstancesChangedListener.getServiceNames().forEach(str -> {
            SERVICE_UPDATE_TIME.put(str, new AtomicLong(0L));
            watchEndpoints(serviceInstancesChangedListener, str);
            watchPods(serviceInstancesChangedListener, str);
            watchService(serviceInstancesChangedListener, str);
        });
    }

    private void watchEndpoints(final ServiceInstancesChangedListener serviceInstancesChangedListener, final String str) {
        ENDPOINTS_WATCHER.put(str, ((Resource) ((NonNamespaceOperation) this.kubernetesClient.endpoints().inNamespace(this.namespace)).withName(str)).watch(new Watcher<Endpoints>() { // from class: org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscovery.1
            public void eventReceived(Watcher.Action action, Endpoints endpoints) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Endpoint Event. Event type: " + action.name() + ". Current pod name: " + KubernetesServiceDiscovery.this.currentHostname);
                }
                KubernetesServiceDiscovery.this.notifyServiceChanged(str, serviceInstancesChangedListener);
            }

            public void onClose(WatcherException watcherException) {
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void watchPods(final ServiceInstancesChangedListener serviceInstancesChangedListener, final String str) {
        Map<String, String> serviceSelector = getServiceSelector(str);
        if (serviceSelector == null) {
            return;
        }
        PODS_WATCHER.put(str, ((FilterWatchListDeletable) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(this.namespace)).withLabels(serviceSelector)).watch(new Watcher<Pod>() { // from class: org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscovery.2
            public void eventReceived(Watcher.Action action, Pod pod) {
                if (Watcher.Action.MODIFIED.equals(action)) {
                    if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                        KubernetesServiceDiscovery.this.logger.debug("Received Pods Update Event. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname);
                    }
                    KubernetesServiceDiscovery.this.notifyServiceChanged(str, serviceInstancesChangedListener);
                }
            }

            public void onClose(WatcherException watcherException) {
            }
        }));
    }

    private void watchService(final ServiceInstancesChangedListener serviceInstancesChangedListener, final String str) {
        SERVICE_WATCHER.put(str, ((ServiceResource) ((NonNamespaceOperation) this.kubernetesClient.services().inNamespace(this.namespace)).withName(str)).watch(new Watcher<Service>() { // from class: org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscovery.3
            public void eventReceived(Watcher.Action action, Service service) {
                if (Watcher.Action.MODIFIED.equals(action)) {
                    if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                        KubernetesServiceDiscovery.this.logger.debug("Received Service Update Event. Update Pods Watcher. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname);
                    }
                    if (KubernetesServiceDiscovery.PODS_WATCHER.containsKey(str)) {
                        ((Watch) KubernetesServiceDiscovery.PODS_WATCHER.get(str)).close();
                        KubernetesServiceDiscovery.PODS_WATCHER.remove(str);
                    }
                    KubernetesServiceDiscovery.this.watchPods(serviceInstancesChangedListener, str);
                }
            }

            public void onClose(WatcherException watcherException) {
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyServiceChanged(String str, ServiceInstancesChangedListener serviceInstancesChangedListener) {
        long nanoTime = System.nanoTime();
        ServiceInstancesChangedEvent serviceInstancesChangedEvent = new ServiceInstancesChangedEvent(str, getInstances(str));
        AtomicLong atomicLong = SERVICE_UPDATE_TIME.get(str);
        long j = atomicLong.get();
        if (j <= nanoTime && atomicLong.compareAndSet(j, nanoTime)) {
            serviceInstancesChangedListener.onEvent(serviceInstancesChangedEvent);
        } else if (this.logger.isInfoEnabled()) {
            this.logger.info("Discard Service Instance Data. Possible Cause: Newer message has been processed or Failed to update time record by CAS. Current Data received time: " + nanoTime + ". Newer Data received time: " + j + ".");
        }
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public URL getUrl() {
        return this.registryURL;
    }

    private Map<String, String> getServiceSelector(String str) {
        Service service = (Service) ((ServiceResource) ((NonNamespaceOperation) this.kubernetesClient.services().inNamespace(this.namespace)).withName(str)).get();
        if (service == null) {
            return null;
        }
        return service.getSpec().getSelector();
    }

    private List<ServiceInstance> toServiceInstance(Endpoints endpoints, String str) {
        Map<String, String> serviceSelector = getServiceSelector(str);
        if (serviceSelector == null) {
            return new LinkedList();
        }
        Map map = (Map) ((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.kubernetesClient.pods().inNamespace(this.namespace)).withLabels(serviceSelector)).list()).getItems().stream().collect(Collectors.toMap(pod -> {
            return pod.getMetadata().getName();
        }, pod2 -> {
            return pod2;
        }));
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        Iterator it = endpoints.getSubsets().iterator();
        while (it.hasNext()) {
            hashSet.addAll((Collection) ((EndpointSubset) it.next()).getPorts().stream().map((v0) -> {
                return v0.getPort();
            }).collect(Collectors.toSet()));
        }
        Iterator it2 = endpoints.getSubsets().iterator();
        while (it2.hasNext()) {
            for (EndpointAddress endpointAddress : ((EndpointSubset) it2.next()).getAddresses()) {
                Pod pod3 = (Pod) map.get(endpointAddress.getTargetRef().getName());
                String ip = endpointAddress.getIp();
                if (pod3 == null) {
                    this.logger.warn("Unable to match Kubernetes Endpoint address with Pod. EndpointAddress Hostname: " + endpointAddress.getTargetRef().getName());
                } else {
                    hashSet.forEach(num -> {
                        DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance(str, ip, num, ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
                        String str2 = (String) pod3.getMetadata().getAnnotations().get(KUBERNETES_PROPERTIES_KEY);
                        if (!StringUtils.isNotEmpty(str2)) {
                            this.logger.warn("Unable to find Service Instance metadata in Pod Annotations. Possibly cause: provider has not been initialized successfully. EndpointAddress Hostname: " + endpointAddress.getTargetRef().getName());
                        } else {
                            defaultServiceInstance.getMetadata().putAll((Map) JSONObject.parseObject(str2, Map.class));
                            linkedList.add(defaultServiceInstance);
                        }
                    });
                }
            }
        }
        return linkedList;
    }

    @Deprecated
    public void setCurrentHostname(String str) {
        this.currentHostname = str;
    }

    @Deprecated
    public void setKubernetesClient(KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
    }
}
