/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.thrift.thriftlib.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.cassandra.thrift.thriftlib.TByteArrayOutputStream;
import org.apache.cassandra.thrift.thriftlib.TException;
import org.apache.cassandra.thrift.thriftlib.protocol.TProtocol;
import org.apache.cassandra.thrift.thriftlib.server.TServer;
import org.apache.cassandra.thrift.thriftlib.transport.TFramedTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TIOStreamTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TMemoryInputTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TNonblockingServerTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TNonblockingTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TTransport;
import org.apache.cassandra.thrift.thriftlib.transport.TTransportException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TNonblockingServer
extends TServer {
    private static final Log LOGGER = LogFactory.getLog(TNonblockingServer.class);
    private volatile boolean stopped_ = true;
    private SelectThread selectThread_;
    private final long MAX_READ_BUFFER_BYTES;
    private long readBufferBytesAllocated = 0L;

    public TNonblockingServer(AbstractNonblockingServerArgs args) {
        super(args);
        this.MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
    }

    @Override
    public void serve() {
        if (!this.startListening()) {
            return;
        }
        if (!this.startSelectorThread()) {
            return;
        }
        this.setServing(true);
        this.joinSelector();
        this.setServing(false);
        this.stopListening();
    }

    protected boolean startListening() {
        try {
            this.serverTransport_.listen();
            return true;
        }
        catch (TTransportException ttx) {
            LOGGER.warn((Object)"Failed to start listening on server socket!", (Throwable)ttx);
            return false;
        }
    }

    protected void stopListening() {
        this.serverTransport_.close();
    }

    protected boolean startSelectorThread() {
        try {
            this.selectThread_ = new SelectThread((TNonblockingServerTransport)this.serverTransport_);
            this.stopped_ = false;
            this.selectThread_.start();
            return true;
        }
        catch (IOException e) {
            LOGGER.warn((Object)"Failed to start selector thread!", (Throwable)e);
            return false;
        }
    }

    protected void joinSelector() {
        try {
            this.selectThread_.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void stop() {
        this.stopped_ = true;
        if (this.selectThread_ != null) {
            this.selectThread_.wakeupSelector();
        }
    }

    protected boolean requestInvoke(FrameBuffer frameBuffer) {
        frameBuffer.invoke();
        return true;
    }

    protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
        this.selectThread_.requestSelectInterestChange(frameBuffer);
    }

    public boolean isStopped() {
        return this.selectThread_.isStopped();
    }

    protected class FrameBuffer {
        private static final int READING_FRAME_SIZE = 1;
        private static final int READING_FRAME = 2;
        private static final int READ_FRAME_COMPLETE = 3;
        private static final int AWAITING_REGISTER_WRITE = 4;
        private static final int WRITING = 6;
        private static final int AWAITING_REGISTER_READ = 7;
        private static final int AWAITING_CLOSE = 8;
        public final TNonblockingTransport trans_;
        private final SelectionKey selectionKey_;
        private int state_ = 1;
        private ByteBuffer buffer_;
        private TByteArrayOutputStream response_;

        public FrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey) {
            this.trans_ = trans;
            this.selectionKey_ = selectionKey;
            this.buffer_ = ByteBuffer.allocate(4);
        }

        public boolean read() {
            if (this.state_ == 1) {
                if (!this.internalRead()) {
                    return false;
                }
                if (this.buffer_.remaining() == 0) {
                    int frameSize = this.buffer_.getInt(0);
                    if (frameSize <= 0) {
                        LOGGER.warn((Object)("Read an invalid frame size of " + frameSize + ". Are you using TFramedTransport on the client side?"));
                        return false;
                    }
                    if ((long)frameSize > TNonblockingServer.this.MAX_READ_BUFFER_BYTES) {
                        LOGGER.error((Object)("Read a frame size of " + frameSize + ", which is bigger than the maximum allowable buffer size for ALL connections."));
                        return false;
                    }
                    if (TNonblockingServer.this.readBufferBytesAllocated + (long)frameSize > TNonblockingServer.this.MAX_READ_BUFFER_BYTES) {
                        return true;
                    }
                    TNonblockingServer.this.readBufferBytesAllocated = TNonblockingServer.this.readBufferBytesAllocated + (long)frameSize;
                    this.buffer_ = ByteBuffer.allocate(frameSize);
                    this.state_ = 2;
                } else {
                    return true;
                }
            }
            if (this.state_ == 2) {
                if (!this.internalRead()) {
                    return false;
                }
                if (this.buffer_.remaining() == 0) {
                    this.selectionKey_.interestOps(0);
                    this.state_ = 3;
                }
                return true;
            }
            LOGGER.warn((Object)("Read was called but state is invalid (" + this.state_ + ")"));
            return false;
        }

        public boolean write() {
            if (this.state_ == 6) {
                try {
                    if (this.trans_.write(this.buffer_) < 0) {
                        return false;
                    }
                }
                catch (IOException e) {
                    LOGGER.warn((Object)"Got an IOException during write!", (Throwable)e);
                    return false;
                }
                if (this.buffer_.remaining() == 0) {
                    this.prepareRead();
                }
                return true;
            }
            LOGGER.warn((Object)("Write was called, but state is invalid (" + this.state_ + ")"));
            return false;
        }

        public void changeSelectInterests() {
            if (this.state_ == 4) {
                this.selectionKey_.interestOps(4);
                this.state_ = 6;
            } else if (this.state_ == 7) {
                this.prepareRead();
            } else if (this.state_ == 8) {
                this.close();
                this.selectionKey_.cancel();
            } else {
                LOGGER.warn((Object)("changeSelectInterest was called, but state is invalid (" + this.state_ + ")"));
            }
        }

        public void close() {
            if (this.state_ == 2 || this.state_ == 3) {
                TNonblockingServer.this.readBufferBytesAllocated = TNonblockingServer.this.readBufferBytesAllocated - (long)this.buffer_.array().length;
            }
            this.trans_.close();
        }

        public boolean isFrameFullyRead() {
            return this.state_ == 3;
        }

        public void responseReady() {
            TNonblockingServer.this.readBufferBytesAllocated = TNonblockingServer.this.readBufferBytesAllocated - (long)this.buffer_.array().length;
            if (this.response_.len() == 0) {
                this.state_ = 7;
                this.buffer_ = null;
            } else {
                this.buffer_ = ByteBuffer.wrap(this.response_.get(), 0, this.response_.len());
                this.state_ = 4;
            }
            this.requestSelectInterestChange();
        }

        public void invoke() {
            TTransport inTrans = this.getInputTransport();
            TProtocol inProt = TNonblockingServer.this.inputProtocolFactory_.getProtocol(inTrans);
            TProtocol outProt = TNonblockingServer.this.outputProtocolFactory_.getProtocol(this.getOutputTransport());
            try {
                TNonblockingServer.this.processorFactory_.getProcessor(inTrans).process(inProt, outProt);
                this.responseReady();
                return;
            }
            catch (TException te) {
                LOGGER.warn((Object)"Exception while invoking!", (Throwable)te);
            }
            catch (Exception e) {
                LOGGER.warn((Object)"Unexpected exception while invoking!", (Throwable)e);
            }
            this.state_ = 8;
            this.requestSelectInterestChange();
        }

        private TTransport getInputTransport() {
            return new TMemoryInputTransport(this.buffer_.array());
        }

        private TTransport getOutputTransport() {
            this.response_ = new TByteArrayOutputStream();
            return TNonblockingServer.this.outputTransportFactory_.getTransport(new TIOStreamTransport(this.response_));
        }

        private boolean internalRead() {
            try {
                return this.trans_.read(this.buffer_) >= 0;
            }
            catch (IOException e) {
                LOGGER.warn((Object)"Got an IOException in internalRead!", (Throwable)e);
                return false;
            }
        }

        private void prepareRead() {
            this.selectionKey_.interestOps(1);
            this.buffer_ = ByteBuffer.allocate(4);
            this.state_ = 1;
        }

        private void requestSelectInterestChange() {
            if (Thread.currentThread() == TNonblockingServer.this.selectThread_) {
                this.changeSelectInterests();
            } else {
                TNonblockingServer.this.requestSelectInterestChange(this);
            }
        }
    }

    protected class SelectThread
    extends Thread {
        private final TNonblockingServerTransport serverTransport;
        private final Selector selector;
        private final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();

        public SelectThread(TNonblockingServerTransport serverTransport) throws IOException {
            this.serverTransport = serverTransport;
            this.selector = SelectorProvider.provider().openSelector();
            serverTransport.registerSelector(this.selector);
        }

        public boolean isStopped() {
            return TNonblockingServer.this.stopped_;
        }

        @Override
        public void run() {
            try {
                while (!TNonblockingServer.this.stopped_) {
                    this.select();
                    this.processInterestChanges();
                }
            }
            catch (Throwable t) {
                LOGGER.warn((Object)"run() exiting due to uncaught exception", t);
            }
            finally {
                TNonblockingServer.this.stopped_ = true;
            }
        }

        public void wakeupSelector() {
            this.selector.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void requestSelectInterestChange(FrameBuffer frameBuffer) {
            Set<FrameBuffer> set = this.selectInterestChanges;
            synchronized (set) {
                this.selectInterestChanges.add(frameBuffer);
            }
            this.selector.wakeup();
        }

        private void select() {
            try {
                this.selector.select();
                Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
                while (!TNonblockingServer.this.stopped_ && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (!key.isValid()) {
                        this.cleanupSelectionkey(key);
                        continue;
                    }
                    if (key.isAcceptable()) {
                        this.handleAccept();
                        continue;
                    }
                    if (key.isReadable()) {
                        this.handleRead(key);
                        continue;
                    }
                    if (key.isWritable()) {
                        this.handleWrite(key);
                        continue;
                    }
                    LOGGER.warn((Object)("Unexpected state in select! " + key.interestOps()));
                }
            }
            catch (IOException e) {
                LOGGER.warn((Object)"Got an IOException while selecting!", (Throwable)e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processInterestChanges() {
            Set<FrameBuffer> set = this.selectInterestChanges;
            synchronized (set) {
                for (FrameBuffer fb : this.selectInterestChanges) {
                    fb.changeSelectInterests();
                }
                this.selectInterestChanges.clear();
            }
        }

        private void handleAccept() throws IOException {
            block3: {
                SelectionKey clientKey = null;
                TNonblockingTransport client = null;
                try {
                    client = (TNonblockingTransport)this.serverTransport.accept();
                    clientKey = client.registerSelector(this.selector, 1);
                    FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
                    clientKey.attach(frameBuffer);
                }
                catch (TTransportException tte) {
                    LOGGER.warn((Object)"Exception trying to accept!", (Throwable)tte);
                    tte.printStackTrace();
                    if (clientKey != null) {
                        this.cleanupSelectionkey(clientKey);
                    }
                    if (client == null) break block3;
                    client.close();
                }
            }
        }

        private void handleRead(SelectionKey key) {
            FrameBuffer buffer = (FrameBuffer)key.attachment();
            if (!buffer.read()) {
                this.cleanupSelectionkey(key);
                return;
            }
            if (buffer.isFrameFullyRead() && !TNonblockingServer.this.requestInvoke(buffer)) {
                this.cleanupSelectionkey(key);
            }
        }

        private void handleWrite(SelectionKey key) {
            FrameBuffer buffer = (FrameBuffer)key.attachment();
            if (!buffer.write()) {
                this.cleanupSelectionkey(key);
            }
        }

        private void cleanupSelectionkey(SelectionKey key) {
            FrameBuffer buffer = (FrameBuffer)key.attachment();
            if (buffer != null) {
                buffer.close();
            }
            key.cancel();
        }
    }

    public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>>
    extends TServer.AbstractServerArgs<T> {
        public long maxReadBufferBytes = Long.MAX_VALUE;

        public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
            super(transport);
            this.transportFactory(new TFramedTransport.Factory());
        }
    }

    public static class Args
    extends AbstractNonblockingServerArgs<Args> {
        public Args(TNonblockingServerTransport transport) {
            super(transport);
        }
    }
}

