/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.streaming;

import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.streaming.RelevanceConfig;
import com.netflix.turbine.streaming.RelevanceKey;
import com.netflix.turbine.streaming.StreamingDataHandler;
import com.netflix.turbine.streaming.servlet.TurbineStreamServlet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.codehaus.jackson.PrettyPrinter;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TurbineStreamingConnection<T extends TurbineData>
implements TurbineDataHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(TurbineStreamingConnection.class);
    private static final Object CurrentTime = "currentTime";
    protected final String name;
    protected final StreamingDataHandler streamHandler;
    private volatile boolean stopMonitoring = false;
    private volatile long lastEvent = -1L;
    protected int streamingDelay = 100;
    private final PerformanceCriteria perfCriteria;
    protected final Set<String> filterPrefixes;
    private final Set<String> statsTypeFilter;
    private final Set<String> dataNames;
    private final Map<String, RelevantMetrics> relevantMetrics;
    protected ConcurrentHashMap<String, Object> streamingConnectionSession = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, String> dataHash = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AtomicLong> lastOutputForTypeCache = new ConcurrentHashMap();
    private final ObjectWriter objectWriter;

    public TurbineStreamingConnection(StreamingDataHandler sHandler, Collection<TurbineStreamServlet.FilterCriteria> criteria, int delay) throws Exception {
        this.streamHandler = sHandler;
        this.dataNames = this.getNameFilters(criteria);
        this.statsTypeFilter = this.getTypeFilters(criteria);
        this.filterPrefixes = this.getFilterPrefixes(criteria);
        this.name = "StreamingHandler_" + UUID.randomUUID().toString();
        this.streamingDelay = delay;
        this.perfCriteria = new BroweserPerfCriteria();
        this.relevantMetrics = new HashMap<String, RelevantMetrics>();
        Set<RelevanceConfig> relevanceConfig = this.getRelevanceConfig(criteria);
        logger.info("Relevance config: " + relevanceConfig);
        for (RelevanceConfig rConfig : relevanceConfig) {
            this.relevantMetrics.put(rConfig.type, new RelevantMetrics(rConfig));
        }
        logger.info("Relevance metrics config: " + this.relevantMetrics);
        ObjectMapper objectMapper = new ObjectMapper();
        this.objectWriter = objectMapper.prettyPrintingWriter((PrettyPrinter)new MinimalPrettyPrinter());
    }

    @Override
    public String getName() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitOnConnection() {
        try {
            while (!this.stopMonitoring) {
                if (this.lastEvent > -1L && System.currentTimeMillis() - this.lastEvent > 10000L) {
                    logger.info("ERROR: We haven't heard from the monitor in a while so are killing the handler and will stop streaming to client.");
                    this.stopMonitoring = true;
                }
                try {
                    this.streamHandler.noData();
                    long start = System.currentTimeMillis();
                    for (RelevantMetrics metrics : this.relevantMetrics.values()) {
                        Set deleteSet;
                        while (metrics.state.get() == State.DoNotSort && System.currentTimeMillis() - start < 3000L) {
                            Thread.sleep(10L);
                        }
                        if (metrics.state.get() == State.DoNotSort) continue;
                        boolean deleteData = metrics.sort();
                        if (metrics.state.get() == State.StartSorting) {
                            for (String key : (Set)metrics.topN.get()) {
                                TurbineData json = (TurbineData)metrics.dataMap.get(key);
                                this.streamHandler.writeData(this.objectWriter.writeValueAsString((Object)json));
                            }
                            metrics.state.set(State.Sorted);
                        }
                        if (!deleteData || (deleteSet = (Set)metrics.deletedSet.get()) == null || deleteSet.size() <= 0) continue;
                        this.streamHandler.deleteData(((RelevantMetrics)metrics).config.type, deleteSet);
                    }
                }
                catch (Exception e) {
                    if ("Broken pipe".equals(e.getMessage())) {
                        logger.debug("Broken pipe (most likely client disconnected) when writing to response stream", (Throwable)e);
                    } else {
                        logger.error("Got exception when writing to response stream", (Throwable)e);
                    }
                    this.stopMonitoring = true;
                }
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException e) {
                    this.stopMonitoring = true;
                    logger.info("Got interrupted when waiting on connection", (Throwable)e);
                }
            }
        }
        catch (Throwable t) {
            logger.error("Caught throwable when waiting on connection", t);
        }
    }

    @Override
    public void handleData(Collection<T> data) {
        if (this.stopMonitoring) {
            return;
        }
        this.writeToStream(data);
    }

    @Override
    public void handleHostLost(Instance host) {
    }

    protected void writeToStream(Collection<? extends TurbineData> dataCollection) {
        this.lastEvent = System.currentTimeMillis();
        try {
            for (TurbineData turbineData : dataCollection) {
                try {
                    String dataKey;
                    if (this.statsTypeFilter.size() != 0 && !this.statsTypeFilter.contains(turbineData.getType()) || this.dataNames.size() != 0 && !this.dataNames.contains(turbineData.getName())) continue;
                    if (this.filterPrefixes.size() > 0) {
                        boolean allowData = false;
                        for (String f : this.filterPrefixes) {
                            if (!turbineData.getName().contains(f)) continue;
                            allowData = true;
                            break;
                        }
                        if (!allowData) continue;
                    }
                    if (!this.isDataWithinStreamingWindow(dataKey = this.getDataKey(turbineData))) continue;
                    Map<String, Object> attrs = turbineData.getAttributes();
                    attrs.remove(CurrentTime);
                    HashMap<String, Map<String, ? extends Number>> nestedAttrs = turbineData.getNestedMapAttributes();
                    if (nestedAttrs != null && nestedAttrs.keySet().size() > 0) {
                        for (String nestedMapKey : nestedAttrs.keySet()) {
                            Map<String, ? extends Number> nestedMap = nestedAttrs.get(nestedMapKey);
                            if (nestedMap == null) continue;
                            attrs.put(nestedMapKey, nestedMap);
                        }
                    }
                    String jsonStringForDataHash = this.objectWriter.writeValueAsString(attrs);
                    String lastMessageForDataType = this.dataHash.get(dataKey);
                    if (lastMessageForDataType != null && lastMessageForDataType.equals(jsonStringForDataHash)) {
                        logger.debug("We skipped delivering a message since it hadn't changed: " + dataKey);
                        continue;
                    }
                    this.dataHash.put(dataKey, jsonStringForDataHash);
                    RelevantMetrics metrics = this.relevantMetrics.get(turbineData.getType());
                    if (metrics != null) {
                        String key = turbineData.getName();
                        metrics.put(key, turbineData);
                        if (metrics.state.get() == State.DoNotSort && metrics.dataMap.size() >= ((RelevantMetrics)metrics).config.topN) {
                            metrics.state.compareAndSet(State.DoNotSort, State.StartSorting);
                        }
                        if (metrics.state.get() != State.Sorted || !metrics.isInTopN(turbineData.getName())) continue;
                        this.streamHandler.writeData(jsonStringForDataHash);
                        continue;
                    }
                    this.streamHandler.writeData(jsonStringForDataHash);
                }
                catch (IOException e) {
                    throw e;
                }
                catch (Exception e) {
                    if (turbineData == null) {
                        logger.error("Failed to process data but will continue processing in loop.", (Throwable)e);
                        continue;
                    }
                    logger.error("Failed to process data with type:" + turbineData.getType() + " but will continue processing in loop.", (Throwable)e);
                }
            }
        }
        catch (IOException e) {
            logger.debug("We lost the client connection. Will stop monitoring and streaming.", (Throwable)e);
            this.stopMonitoring = true;
        }
        catch (Throwable t) {
            logger.error("An unknown error occurred trying to write data to client stream. Will stop monitoring and streaming.", t);
            this.stopMonitoring = true;
        }
    }

    private Set<String> getNameFilters(Collection<TurbineStreamServlet.FilterCriteria> criteria) {
        HashSet<String> set = new HashSet<String>();
        for (TurbineStreamServlet.FilterCriteria filter : criteria) {
            if (filter.name == null) continue;
            set.add(filter.name);
        }
        return set;
    }

    private Set<String> getTypeFilters(Collection<TurbineStreamServlet.FilterCriteria> criteria) {
        HashSet<String> set = new HashSet<String>();
        for (TurbineStreamServlet.FilterCriteria filter : criteria) {
            if (filter.type != null) {
                set.add(filter.type);
            }
            if (filter.relevanceConfig == null || filter.relevanceConfig.type == null) continue;
            set.add(filter.relevanceConfig.type);
        }
        return set;
    }

    private Set<String> getFilterPrefixes(Collection<TurbineStreamServlet.FilterCriteria> criteria) {
        HashSet<String> set = new HashSet<String>();
        for (TurbineStreamServlet.FilterCriteria filter : criteria) {
            if (filter.prefix == null) continue;
            set.add(filter.prefix);
        }
        return set;
    }

    private Set<RelevanceConfig> getRelevanceConfig(Collection<TurbineStreamServlet.FilterCriteria> criteria) {
        HashSet<RelevanceConfig> set = new HashSet<RelevanceConfig>();
        for (TurbineStreamServlet.FilterCriteria filter : criteria) {
            if (filter.relevanceConfig == null) continue;
            set.add(filter.relevanceConfig);
        }
        return set;
    }

    private String getDataKey(TurbineData data) {
        String dataKey = data.getClass().getSimpleName() + "_" + data.getKey();
        if (data instanceof DataFromSingleInstance) {
            dataKey = dataKey + "_" + ((DataFromSingleInstance)data).getHost().getHostname();
        }
        return dataKey;
    }

    private boolean isDataWithinStreamingWindow(String dataKey) {
        long currentTime = System.currentTimeMillis();
        AtomicLong lastOutputForType = this.lastOutputForTypeCache.get(dataKey);
        long currentLastOutputForType = -1L;
        if (lastOutputForType == null) {
            AtomicLong previous = this.lastOutputForTypeCache.putIfAbsent(dataKey, new AtomicLong(currentTime));
            return previous == null;
        }
        currentLastOutputForType = lastOutputForType.get();
        if (this.streamingDelay != -1 && currentTime < currentLastOutputForType + (long)this.streamingDelay) {
            return false;
        }
        return lastOutputForType.compareAndSet(currentLastOutputForType, currentTime);
    }

    @Override
    public PerformanceCriteria getCriteria() {
        return this.perfCriteria;
    }

    public static class UnitTest {
        @Test
        public void testFilterStream() throws Exception {
            final AtomicInteger dataCount = new AtomicInteger(0);
            final AtomicInteger pingCount = new AtomicInteger(0);
            final AtomicInteger deleteCount = new AtomicInteger(0);
            final AtomicReference<Object> text = new AtomicReference<Object>(null);
            final HashMap testAttrs = new HashMap();
            final Instance host = new Instance("host", "cluster", true);
            final AtomicBoolean stop = new AtomicBoolean(false);
            StreamingDataHandler handler = new StreamingDataHandler(){

                @Override
                public void writeData(String data) throws Exception {
                    System.out.println("Data: " + data);
                    if (stop.get()) {
                        throw new RuntimeException("stop!");
                    }
                    dataCount.incrementAndGet();
                    text.set(data);
                }

                @Override
                public void deleteData(String type, Set<String> names) throws Exception {
                    if (stop.get()) {
                        throw new RuntimeException("stop!");
                    }
                    deleteCount.incrementAndGet();
                }

                @Override
                public void noData() throws Exception {
                    if (stop.get()) {
                        throw new RuntimeException("stop!");
                    }
                    pingCount.incrementAndGet();
                }
            };
            ArrayList<TurbineStreamServlet.FilterCriteria> filterCriteria = new ArrayList<TurbineStreamServlet.FilterCriteria>();
            filterCriteria.add(TurbineStreamServlet.FilterCriteria.parseCriteria("type:testType|name:testName"));
            final TurbineStreamingConnection connection = new TurbineStreamingConnection(handler, filterCriteria, 10);
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            ArrayList<Future<Integer>> futures = new ArrayList<Future<Integer>>();
            for (int i = 0; i < 3; ++i) {
                futures.add(threadPool.submit(new Callable<Integer>(){
                    final AtomicInteger count = new AtomicInteger(0);
                    final Random random = new Random();

                    @Override
                    public Integer call() throws Exception {
                        while (!stop.get()) {
                            Collection<TurbineData> data = this.getRandomData();
                            connection.handleData(data);
                            this.count.addAndGet(data.size());
                            Thread.sleep(50L);
                        }
                        return this.count.get();
                    }

                    private Collection<TurbineData> getRandomData() {
                        int size = this.random.nextInt(10);
                        ArrayList<TurbineData> list = new ArrayList<TurbineData>();
                        for (int i = 0; i < size; ++i) {
                            list.add(new DataFromSingleInstance(null, "testType", "testName", host, testAttrs, 0L));
                            list.add(new DataFromSingleInstance(null, "foo", "bar", host, testAttrs, 0L));
                        }
                        return list;
                    }
                }));
            }
            final Timer timer = new Timer();
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    stop.set(true);
                    timer.cancel();
                }
            }, 3000L);
            connection.waitOnConnection();
            threadPool.shutdownNow();
            int sum = 0;
            for (Future future : futures) {
                sum += ((Integer)future.get()).intValue();
            }
            Assert.assertTrue((sum > dataCount.get() ? 1 : 0) != 0);
            Assert.assertTrue((1 == dataCount.get() ? 1 : 0) != 0);
            Assert.assertTrue((0 == deleteCount.get() ? 1 : 0) != 0);
            Assert.assertTrue((pingCount.get() >= 1 ? 1 : 0) != 0);
            Assert.assertTrue((String)text.get(), (boolean)((String)text.get()).contains("testType"));
            Assert.assertTrue((boolean)((String)text.get()).contains("testName"));
            Assert.assertFalse((boolean)((String)text.get()).contains("foo"));
            Assert.assertFalse((boolean)((String)text.get()).contains("bar"));
        }
    }

    class RelevantMetrics {
        private final RelevanceConfig config;
        private AtomicReference<Set<String>> topN;
        private AtomicReference<Set<String>> deletedSet;
        private final AtomicReference<State> state = new AtomicReference<State>(State.DoNotSort);
        private ConcurrentHashMap<String, TurbineData> dataMap = new ConcurrentHashMap();

        public RelevantMetrics(RelevanceConfig c) {
            this.config = c;
            if (this.config != null) {
                logger.info("Relevance metrics are enabled: " + this.config.toString());
                this.topN = new AtomicReference<Object>(null);
                this.deletedSet = new AtomicReference<Object>(null);
            }
        }

        public void put(String key, TurbineData data) {
            this.dataMap.put(key, data);
        }

        public boolean sort() {
            ConcurrentSkipListSet<RelevanceKey> set = new ConcurrentSkipListSet<RelevanceKey>(new RelevanceKey.RelevanceComparator());
            for (String s : this.dataMap.keySet()) {
                RelevanceKey key = new RelevanceKey(s, this.config.items, this.dataMap.get(s).getNumericAttributes());
                set.add(key);
            }
            HashSet<String> newSet = new HashSet<String>();
            Iterator<RelevanceKey> iter = set.descendingIterator();
            for (int n = 0; iter.hasNext() && n < this.config.topN; ++n) {
                newSet.add(iter.next().getName());
            }
            Set oldSet = this.topN.get();
            this.topN.set(newSet);
            this.deletedSet.set(null);
            if (oldSet == null) {
                oldSet = this.dataMap.keySet();
            }
            HashSet previousSet = new HashSet(oldSet);
            previousSet.removeAll(newSet);
            if (previousSet.size() > 0) {
                this.deletedSet.set(previousSet);
                return true;
            }
            return false;
        }

        public boolean isInTopN(String key) {
            Set<String> topNSet = this.topN.get();
            if (topNSet == null || topNSet.size() == 0) {
                return true;
            }
            return topNSet.contains(key);
        }

        public String toString() {
            return this.config.toString();
        }
    }

    private static enum State {
        DoNotSort,
        StartSorting,
        Sorted;

    }

    private class BroweserPerfCriteria
    implements PerformanceCriteria {
        private BroweserPerfCriteria() {
        }

        @Override
        public boolean isCritical() {
            return false;
        }

        @Override
        public int getMaxQueueSize() {
            return 1000;
        }

        @Override
        public int numThreads() {
            return 1;
        }
    }
}

