/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.discovery;

import com.netflix.config.ConfigurationManager;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.discovery.InstanceDiscovery;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InstanceObservable {
    private static final Logger logger = LoggerFactory.getLogger(InstanceObservable.class);
    private static final InstanceObservable INSTANCE = new InstanceObservable();
    private final DynamicIntProperty pollDelayMillis = DynamicPropertyFactory.getInstance().getIntProperty("turbine.discovery.pollDelayMillis", 60000);
    private final AtomicReference<CurrentState> currentState = new AtomicReference<CurrentState>(new CurrentState());
    private final ConcurrentHashMap<String, InstanceObserver> observers = new ConcurrentHashMap();
    private final AtomicReference<Map<String, Integer>> hostsUpPerCluster = new AtomicReference(new HashMap());
    private final Timer timer = new Timer();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicInteger heartbeat = new AtomicInteger();
    private InstanceDiscovery instanceDiscovery;
    private final TimerTask producer = new TimerTask(){

        @Override
        public void run() {
            if (InstanceObservable.this.observers == null || InstanceObservable.this.observers.size() == 0) {
                logger.info("No observers for InstanceObservable, will try again later");
                return;
            }
            List newList = null;
            try {
                newList = InstanceObservable.this.getInstanceList();
                logger.info("Retrieved hosts from InstanceDiscovery: " + newList.size());
                if (newList.size() < 10) {
                    logger.debug("Retrieved hosts from InstanceDiscovery: " + newList);
                }
                ArrayList previousList = new ArrayList(((CurrentState)InstanceObservable.this.currentState.get()).hostsUp);
                previousList.removeAll(newList);
                logger.info("Found hosts that have been previously terminated: " + previousList.size());
                CurrentState newState = new CurrentState();
                for (Instance host : newList) {
                    if (host.isUp()) {
                        newState.hostsUp.add(host);
                        continue;
                    }
                    newState.hostsDown.add(host);
                }
                newState.hostsDown.addAll(previousList);
                logger.info("Hosts up:" + newState.hostsUp.size() + ", hosts down: " + newState.hostsDown.size());
                InstanceObservable.this.currentState.set(newState);
                for (InstanceObserver watcher : InstanceObservable.this.observers.values()) {
                    if (((CurrentState)InstanceObservable.this.currentState.get()).hostsUp.size() > 0) {
                        try {
                            watcher.hostsUp(((CurrentState)InstanceObservable.this.currentState.get()).hostsUp);
                        }
                        catch (Throwable t) {
                            logger.error("Could not call hostUp on watcher: " + watcher.getName(), t);
                        }
                    }
                    if (((CurrentState)InstanceObservable.this.currentState.get()).hostsDown.size() <= 0) continue;
                    try {
                        watcher.hostsDown(((CurrentState)InstanceObservable.this.currentState.get()).hostsDown);
                    }
                    catch (Throwable t) {
                        logger.error("Could not call hostDown on watcher: " + watcher.getName(), t);
                    }
                }
                InstanceObservable.this.updateHostsCountsPerCluster(((CurrentState)InstanceObservable.this.currentState.get()).hostsUp);
                InstanceObservable.this.heartbeat.incrementAndGet();
            }
            catch (Throwable t) {
                logger.info("Failed to fetch instance info, will continue to run and will try again later", t);
                return;
            }
        }
    };

    public static InstanceObservable getInstance() {
        return INSTANCE;
    }

    private InstanceObservable() {
    }

    @Monitor(name="HostUp", type=DataSourceType.GAUGE)
    public int getCurrentHostUpCount() {
        return this.getCurrentHostsUp().size();
    }

    public HashSet<Instance> getCurrentHostsUp() {
        return this.currentState.get().hostsUp;
    }

    @Monitor(name="HostDown", type=DataSourceType.GAUGE)
    public int getCurrentHostDownCount() {
        return this.getCurrentHostsDown().size();
    }

    @Monitor(name="Heartbeat", type=DataSourceType.COUNTER)
    public int getHeartbeat() {
        return this.heartbeat.get();
    }

    public HashSet<Instance> getCurrentHostsDown() {
        return this.currentState.get().hostsDown;
    }

    public Set<String> getObservers() {
        return this.observers.keySet();
    }

    public void start(InstanceDiscovery iDiscovery) {
        if (this.started.get()) {
            throw new RuntimeException("InstanceDiscovery already started");
        }
        if (iDiscovery == null) {
            throw new RuntimeException("InstanceDiscovery is null");
        }
        this.instanceDiscovery = iDiscovery;
        logger.info("Starting InstanceObservable at frequency: " + this.pollDelayMillis.get() + " millis");
        this.timer.schedule(this.producer, 0L, (long)this.pollDelayMillis.get());
        this.started.set(true);
    }

    public void stop() {
        logger.info("InstanceObservable shutting down");
        this.timer.cancel();
        this.observers.clear();
    }

    public InstanceObserver register(InstanceObserver watcher) {
        InstanceObserver previous = this.observers.putIfAbsent(watcher.getName(), watcher);
        if (previous != null) {
            return previous;
        }
        return watcher;
    }

    public void deregister(InstanceObserver watcher) {
        this.observers.remove(watcher.getName());
    }

    public int getNumHostsUpForCluster(String cluster) {
        Integer hostCount = this.hostsUpPerCluster.get().get(cluster);
        if (hostCount == null) {
            return 0;
        }
        return hostCount;
    }

    private List<Instance> getInstanceList() throws Exception {
        ArrayList<Instance> instances = new ArrayList<Instance>();
        if (this.instanceDiscovery != null) {
            instances.addAll(this.instanceDiscovery.getInstanceList());
        }
        return instances;
    }

    private void updateHostsCountsPerCluster(HashSet<Instance> hostsUp) {
        if (hostsUp == null || hostsUp.size() == 0) {
            return;
        }
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        for (Instance host : hostsUp) {
            String cluster = host.getCluster();
            if (cluster == null) continue;
            Integer hostCount = (Integer)map.get(cluster);
            if (hostCount == null) {
                hostCount = new Integer(0);
            }
            Integer n = hostCount;
            Integer n2 = hostCount = Integer.valueOf(hostCount + 1);
            map.put(cluster, hostCount);
        }
        if (map.size() > 0) {
            this.hostsUpPerCluster.set(map);
        }
    }

    public static class UnitTest {
        private InstanceDiscovery instanceDiscovery;
        private LinkedBlockingQueue<List<Instance>> queue = new LinkedBlockingQueue();

        @Test
        public void testObservableThreadCorrectlyReportsHostsUpAndDown() throws Exception {
            ConfigurationManager.getConfigInstance().setProperty("turbine.discovery.pollDelayMillis", (Object)10);
            this.instanceDiscovery = (InstanceDiscovery)Mockito.mock(InstanceDiscovery.class);
            ((InstanceDiscovery)Mockito.doAnswer((Answer)new Answer<List<Instance>>(){

                public List<Instance> answer(InvocationOnMock invocation) throws Throwable {
                    System.out.println("waiting ...");
                    List list = (List)UnitTest.this.queue.take();
                    System.out.println("unblocked ...");
                    return list;
                }
            }).when((Object)this.instanceDiscovery)).getInstanceList();
            final HashSet<String> hostsUp = new HashSet<String>();
            final HashSet<String> hostsDown = new HashSet<String>();
            InstanceObserver observer = new InstanceObserver(){

                @Override
                public String getName() {
                    return "TestObserver";
                }

                @Override
                public void hostsUp(Collection<Instance> hosts) {
                    hostsUp.clear();
                    for (Instance host : hosts) {
                        System.out.println("Up: " + host.getHostname());
                        hostsUp.add(host.getHostname());
                    }
                }

                @Override
                public void hostsDown(Collection<Instance> hosts) {
                    hostsDown.clear();
                    for (Instance host : hosts) {
                        System.out.println("Down: " + host.getHostname());
                        hostsDown.add(host.getHostname());
                    }
                }
            };
            InstanceObservable observable = new InstanceObservable();
            observable.register(observer);
            observable.start(this.instanceDiscovery);
            ArrayList<Instance> list = new ArrayList<Instance>();
            list.add(new Instance("a", "cluster", true));
            list.add(new Instance("b", "cluster", false));
            list.add(new Instance("c", "cluster", true));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsUp, "a", "c");
            this.verifySet(hostsDown, "b");
            list = new ArrayList();
            list.add(new Instance("a", "cluster", false));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsDown, "a", "c");
            list = new ArrayList();
            list.add(new Instance("a", "cluster", false));
            list.add(new Instance("b", "cluster", true));
            list.add(new Instance("c", "cluster", true));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsUp, "b", "c");
            this.verifySet(hostsDown, "a");
            list = new ArrayList();
            list.add(new Instance("a", "cluster", true));
            list.add(new Instance("b", "cluster", true));
            list.add(new Instance("c", "cluster", false));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsUp, "a", "b");
            this.verifySet(hostsDown, "c");
            list = new ArrayList();
            list.add(new Instance("a", "cluster", true));
            list.add(new Instance("c", "cluster", false));
            list.add(new Instance("d", "cluster", true));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsUp, "a", "d");
            this.verifySet(hostsDown, "b", "c");
            list = new ArrayList();
            list.add(new Instance("a", "cluster", true));
            list.add(new Instance("c", "cluster", false));
            list.add(new Instance("d", "cluster", false));
            list.add(new Instance("e", "cluster", true));
            this.queue.put(list);
            Thread.sleep(20L);
            this.verifySet(hostsUp, "a", "e");
            this.verifySet(hostsDown, "c", "d");
            observable.stop();
        }

        private void verifySet(Set<String> set, String ... args) {
            for (String s : args) {
                Assert.assertTrue((boolean)set.remove(s));
            }
            Assert.assertTrue((boolean)set.isEmpty());
        }
    }

    private class CurrentState {
        private final HashSet<Instance> hostsUp = new HashSet();
        private final HashSet<Instance> hostsDown = new HashSet();

        private CurrentState() {
        }
    }

    public static interface InstanceObserver {
        public String getName();

        public void hostsUp(Collection<Instance> var1);

        public void hostsDown(Collection<Instance> var1);
    }
}

