package org.apache.dubbo.registry.zookeeper;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.function.ThrowableConsumer;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;

/* loaded from: input_file:WEB-INF/lib/dubbo-3.1.7.jar:org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.class */
public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {
    private final ErrorTypeAwareLogger logger;
    public static final String DEFAULT_GROUP = "/services";
    private final CuratorFramework curatorFramework;
    private final String rootPath;
    private final ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
    private final Map<String, ZookeeperServiceDiscoveryChangeWatcher> watcherCaches;

    public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL url) {
        super(applicationModel, url);
        this.logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
        this.watcherCaches = new ConcurrentHashMap();
        try {
            this.curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(url, this);
            this.rootPath = CuratorFrameworkUtils.getRootPath(url);
            this.serviceDiscovery = CuratorFrameworkUtils.buildServiceDiscovery(this.curatorFramework, this.rootPath);
            this.serviceDiscovery.start();
        } catch (Exception e) {
            throw new IllegalStateException("Create zookeeper service discovery failed.", e);
        }
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doDestroy() throws Exception {
        this.serviceDiscovery.close();
        this.curatorFramework.close();
        this.watcherCaches.clear();
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doRegister(ServiceInstance serviceInstance) {
        try {
            this.serviceDiscovery.registerService(CuratorFrameworkUtils.build(serviceInstance));
        } catch (Exception e) {
            throw new RpcException(9, "Failed register instance " + serviceInstance.toString(), e);
        }
    }

    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
        if (serviceInstance != null) {
            doInServiceRegistry(serviceDiscovery -> {
                serviceDiscovery.unregisterService(CuratorFrameworkUtils.build(serviceInstance));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.dubbo.registry.client.AbstractServiceDiscovery
    public void doUpdate(ServiceInstance serviceInstance, ServiceInstance serviceInstance2) throws RuntimeException {
        if ("0".equals(ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance2))) {
            super.doUpdate(serviceInstance, serviceInstance2);
            return;
        }
        org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build = CuratorFrameworkUtils.build(serviceInstance);
        org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build2 = CuratorFrameworkUtils.build(serviceInstance2);
        if (!Objects.equals(build2.getName(), build.getName()) || !Objects.equals(build2.getId(), build.getId())) {
            super.doUpdate(serviceInstance, serviceInstance2);
            return;
        }
        try {
            this.serviceInstance = serviceInstance2;
            reportMetadata(serviceInstance2.getServiceMetadata());
            this.serviceDiscovery.updateService(build2);
        } catch (Exception e) {
            throw new RpcException(9, "Failed register instance " + serviceInstance2.toString(), e);
        }
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public Set<String> getServices() {
        return (Set) doInServiceDiscovery(serviceDiscovery -> {
            return new LinkedHashSet(serviceDiscovery.queryForNames());
        });
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public List<ServiceInstance> getInstances(String str) throws NullPointerException {
        return (List) doInServiceDiscovery(serviceDiscovery -> {
            return CuratorFrameworkUtils.build(this.registryURL, (Collection<org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance>>) serviceDiscovery.queryForInstances(str));
        });
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener serviceInstancesChangedListener) throws NullPointerException, IllegalArgumentException {
        if (this.instanceListeners.add(serviceInstancesChangedListener)) {
            serviceInstancesChangedListener.getServiceNames().forEach(str -> {
                registerServiceWatcher(str, serviceInstancesChangedListener);
            });
        }
    }

    @Override // org.apache.dubbo.registry.client.ServiceDiscovery
    public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener serviceInstancesChangedListener) throws IllegalArgumentException {
        if (this.instanceListeners.remove(serviceInstancesChangedListener)) {
            serviceInstancesChangedListener.getServiceNames().forEach(str -> {
                ZookeeperServiceDiscoveryChangeWatcher zookeeperServiceDiscoveryChangeWatcher = this.watcherCaches.get(str);
                if (zookeeperServiceDiscoveryChangeWatcher != null) {
                    zookeeperServiceDiscoveryChangeWatcher.getListeners().remove(serviceInstancesChangedListener);
                    if (zookeeperServiceDiscoveryChangeWatcher.getListeners().isEmpty()) {
                        this.watcherCaches.remove(str);
                        try {
                            zookeeperServiceDiscoveryChangeWatcher.getCacheInstance().close();
                        } catch (IOException e) {
                            this.logger.error(LoggerCodeConstants.REGISTRY_ZOOKEEPER_EXCEPTION, "curator stop watch failed", "", "Curator Stop service discovery watch failed. Service Name: " + str);
                        }
                    }
                }
            });
        }
    }

    private void doInServiceRegistry(ThrowableConsumer<ServiceDiscovery> throwableConsumer) {
        ThrowableConsumer.execute(this.serviceDiscovery, serviceDiscovery -> {
            throwableConsumer.accept(serviceDiscovery);
        });
    }

    private <R> R doInServiceDiscovery(ThrowableFunction<ServiceDiscovery, R> throwableFunction) {
        return (R) ThrowableFunction.execute(this.serviceDiscovery, throwableFunction);
    }

    protected void registerServiceWatcher(String str, ServiceInstancesChangedListener serviceInstancesChangedListener) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.watcherCaches.computeIfAbsent(str, str2 -> {
            ServiceCache build = this.serviceDiscovery.serviceCacheBuilder().name(str2).build();
            ZookeeperServiceDiscoveryChangeWatcher zookeeperServiceDiscoveryChangeWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, build, str2, countDownLatch);
            build.addListener(zookeeperServiceDiscoveryChangeWatcher);
            try {
                build.start();
                return zookeeperServiceDiscoveryChangeWatcher;
            } catch (Exception e) {
                throw new RpcException(9, "Failed subscribe service: " + str2, e);
            }
        }).addListener(serviceInstancesChangedListener);
        serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(str, getInstances(str)));
        countDownLatch.countDown();
    }
}
