/*
 * Decompiled with CFR 0.152.
 */
package hector.me.prettyprint.cassandra.connection;

import hector.me.prettyprint.cassandra.connection.HClientPool;
import hector.me.prettyprint.cassandra.connection.client.HClient;
import hector.me.prettyprint.cassandra.connection.client.HThriftClient;
import hector.me.prettyprint.cassandra.connection.factory.HClientFactory;
import hector.me.prettyprint.cassandra.service.CassandraHost;
import hector.me.prettyprint.hector.api.exceptions.HInactivePoolException;
import hector.me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import hector.me.prettyprint.hector.api.exceptions.HectorException;
import hector.me.prettyprint.hector.api.exceptions.HectorTransportException;
import java.util.HashSet;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ConcurrentHClientPool
implements HClientPool {
    private static final Log log = LogFactory.getLog(ConcurrentHClientPool.class);
    private final PriorityBlockingQueue<HClient> availableClientQueue;
    private final AtomicInteger activeClientsCount;
    private final AtomicInteger realActiveClientsCount;
    private final CassandraHost cassandraHost;
    private final AtomicInteger numBlocked;
    private final AtomicBoolean active;
    private final long maxWaitTimeWhenExhausted;
    private final HClientFactory clientFactory;
    private static final int MAX_IDLE_CLIENT = 3;

    public ConcurrentHClientPool(HClientFactory clientFactory, CassandraHost host) {
        this.clientFactory = clientFactory;
        this.cassandraHost = host;
        this.availableClientQueue = new PriorityBlockingQueue(this.cassandraHost.getMaxActive());
        this.activeClientsCount = new AtomicInteger(0);
        this.realActiveClientsCount = new AtomicInteger(0);
        this.numBlocked = new AtomicInteger();
        this.active = new AtomicBoolean(true);
        this.maxWaitTimeWhenExhausted = this.cassandraHost.getMaxWaitTimeWhenExhausted() < 0L ? 0L : this.cassandraHost.getMaxWaitTimeWhenExhausted();
        for (int i = 0; i < Math.min(3, this.cassandraHost.getMaxActive() / 3 + 1); ++i) {
            this.availableClientQueue.add(this.createClient());
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Concurrent Host pool started with %s active clients; max: %s exhausted wait: %s", this.getNumIdle(), this.cassandraHost.getMaxActive(), this.maxWaitTimeWhenExhausted));
        }
    }

    @Override
    public HClient borrowClient() throws HectorException {
        if (!this.active.get()) {
            throw new HInactivePoolException("Attempt to borrow on in-active pool: " + this.getName());
        }
        HClient cassandraClient = this.availableClientQueue.poll();
        int currentActiveClients = this.activeClientsCount.incrementAndGet();
        try {
            if (cassandraClient == null) {
                cassandraClient = currentActiveClients <= this.cassandraHost.getMaxActive() ? this.createClient() : this.waitForConnection();
            }
            if (cassandraClient == null) {
                throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
            }
        }
        catch (RuntimeException e) {
            this.activeClientsCount.decrementAndGet();
            throw e;
        }
        this.realActiveClientsCount.incrementAndGet();
        cassandraClient.startToUse();
        return cassandraClient;
    }

    private HClient waitForConnection() {
        HClient cassandraClient;
        block11: {
            cassandraClient = null;
            this.numBlocked.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("blocking on queue - current block count %s", this.numBlocked.get()));
            }
            try {
                if (this.maxWaitTimeWhenExhausted == 0L) {
                    while (cassandraClient == null && this.active.get()) {
                        try {
                            cassandraClient = this.availableClientQueue.poll(100L, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException ie) {
                            log.warn((Object)"InterruptedException poll operation on retry forever", (Throwable)ie);
                            break block11;
                        }
                    }
                    break block11;
                }
                try {
                    cassandraClient = this.availableClientQueue.poll(this.maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
                    if (cassandraClient == null) {
                        throw new HPoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s", Thread.currentThread().getName(), this.cassandraHost.getName()));
                    }
                }
                catch (InterruptedException ie) {
                    log.warn((Object)"Cassandra client acquisition interrupted", (Throwable)ie);
                }
            }
            finally {
                this.numBlocked.decrementAndGet();
            }
        }
        return cassandraClient;
    }

    private HClient createClient() {
        return this.clientFactory.createClient(this.cassandraHost).open();
    }

    @Override
    public void shutdown() {
        if (!this.active.compareAndSet(true, false)) {
            throw new IllegalArgumentException("shutdown() called for inactive pool: " + this.getName());
        }
        log.info((Object)String.format("Shutdown triggered on %s", this.getName()));
        HashSet clients = new HashSet();
        this.availableClientQueue.drainTo(clients);
        if (clients.size() > 0) {
            for (HClient hClient : clients) {
                hClient.close();
            }
        }
        log.info((Object)String.format("Shutdown complete on %s", this.getName()));
    }

    @Override
    public CassandraHost getCassandraHost() {
        return this.cassandraHost;
    }

    @Override
    public String getName() {
        return String.format("<ConcurrentCassandraClientPoolByHost>:{%s}", this.cassandraHost.getName());
    }

    @Override
    public int getNumActive() {
        return this.realActiveClientsCount.get();
    }

    @Override
    public int getNumBeforeExhausted() {
        return this.cassandraHost.getMaxActive() - this.realActiveClientsCount.get();
    }

    @Override
    public int getNumBlockedThreads() {
        return this.numBlocked.intValue();
    }

    @Override
    public int getNumIdle() {
        return this.availableClientQueue.size();
    }

    @Override
    public boolean isExhausted() {
        return this.getNumBeforeExhausted() == 0;
    }

    @Override
    public int getMaxActive() {
        return this.cassandraHost.getMaxActive();
    }

    @Override
    public boolean getIsActive() {
        return this.active.get();
    }

    @Override
    public String getStatusAsString() {
        return String.format("%s; IsActive?: %s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d", this.getName(), this.getIsActive(), this.getNumActive(), this.getNumBlockedThreads(), this.getNumIdle(), this.getNumBeforeExhausted());
    }

    @Override
    public void releaseClient(HClient client) throws HectorException {
        boolean open = client.isOpen();
        if (open) {
            if (this.active.get()) {
                this.addClientToPoolGently(client);
            } else {
                log.info((Object)String.format("Open client %s released to in-active pool for host %s. Closing.", client, this.cassandraHost));
                client.close();
            }
        } else {
            try {
                this.addClientToPoolGently(this.createClient());
            }
            catch (HectorTransportException e) {
                log.warn((Object)String.format("Transport exception in re-opening client in release on %s", this.getName()));
            }
        }
        this.realActiveClientsCount.decrementAndGet();
        this.activeClientsCount.decrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace((Object)String.format("Status of releaseClient %s to queue: %s", client.toString(), open));
        }
    }

    private void addClientToPoolGently(HClient client) {
        try {
            this.availableClientQueue.offer(client);
        }
        catch (IllegalStateException ise) {
            log.warn((Object)"Capacity hit adding client back to queue. Closing extra");
            client.close();
        }
    }

    @Override
    public void releaseIdleClients() {
        for (HThriftClient hThriftClient : this.availableClientQueue) {
            if (!hThriftClient.isIdle() || !this.availableClientQueue.remove(hThriftClient)) continue;
            hThriftClient.close();
        }
    }
}

