/*
 * Decompiled with CFR 0.152.
 */
package alluxio.worker.nio;

import alluxio.Configuration;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.DataServer;
import alluxio.worker.DataServerMessage;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class NIODataServer
implements Runnable,
DataServer {
    private static final Logger LOG = LoggerFactory.getLogger((String)"alluxio.logger.type");
    private final InetSocketAddress mAddress;
    private ServerSocketChannel mServerChannel;
    private Selector mSelector;
    private final Map<SocketChannel, DataServerMessage> mSendingData = Collections.synchronizedMap(new HashMap());
    private final Map<SocketChannel, DataServerMessage> mReceivingData = Collections.synchronizedMap(new HashMap());
    private final BlockWorker mBlockWorker;
    private final Thread mListenerThread;
    private volatile boolean mShutdown = false;
    private volatile boolean mShutdownComplete = false;

    public NIODataServer(InetSocketAddress address, BlockWorker blockWorker, Configuration configuration) {
        LOG.info("Starting DataServer @ {}", (Object)address);
        NetworkAddressUtils.assertValidPort((InetSocketAddress)((InetSocketAddress)Preconditions.checkNotNull((Object)address)));
        this.mAddress = address;
        this.mBlockWorker = (BlockWorker)Preconditions.checkNotNull((Object)blockWorker);
        try {
            this.mSelector = this.initSelector();
            this.mListenerThread = new Thread(this);
            this.mListenerThread.start();
        }
        catch (IOException e) {
            LOG.error(e.getMessage() + this.mAddress, (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.mSelector, 1);
    }

    @Override
    public void close() throws IOException {
        this.mShutdown = true;
        this.mServerChannel.close();
        this.mSelector.close();
    }

    @Override
    public String getBindHost() {
        return this.mServerChannel.socket().getInetAddress().getHostAddress();
    }

    @Override
    public int getPort() {
        return this.mServerChannel.socket().getLocalPort();
    }

    @Override
    public boolean isClosed() {
        return this.mShutdownComplete;
    }

    private Selector initSelector() throws IOException {
        AbstractSelector socketSelector = SelectorProvider.provider().openSelector();
        try {
            this.mServerChannel = ServerSocketChannel.open();
            this.mServerChannel.configureBlocking(false);
            this.mServerChannel.socket().bind(this.mAddress);
            this.mServerChannel.register(socketSelector, 16);
            return socketSelector;
        }
        catch (IOException e) {
            try {
                ((Selector)socketSelector).close();
            }
            catch (IOException ex) {
                LOG.warn("Unable to close socket selector. Exception: {}", (Object)ex.getMessage());
            }
            throw e;
        }
        catch (RuntimeException e) {
            try {
                ((Selector)socketSelector).close();
            }
            catch (IOException ex) {
                LOG.warn("Unable to close socket selector. Exception: {}", (Object)ex.getMessage());
            }
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void read(SelectionKey key) throws Exception {
        int numRead;
        DataServerMessage tMessage;
        SocketChannel socketChannel = (SocketChannel)key.channel();
        if (this.mReceivingData.containsKey(socketChannel)) {
            tMessage = this.mReceivingData.get(socketChannel);
        } else {
            tMessage = DataServerMessage.createBlockRequestMessage();
            this.mReceivingData.put(socketChannel, tMessage);
        }
        try {
            numRead = tMessage.recv(socketChannel);
        }
        catch (IOException e) {
            key.cancel();
            socketChannel.close();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (numRead == -1) {
            key.channel().close();
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (tMessage.isMessageReady()) {
            ByteBuffer data;
            if (tMessage.getBlockId() <= 0L) {
                LOG.error("Invalid block id {}", (Object)tMessage.getBlockId());
                return;
            }
            key.interestOps(4);
            long blockId = tMessage.getBlockId();
            LOG.info("Get request for blockId: {}", (Object)blockId);
            long lockId = tMessage.getLockId();
            long sessionId = tMessage.getSessionId();
            int dataLen = 0;
            try (BlockReader reader = this.mBlockWorker.readBlockRemote(sessionId, blockId, lockId);){
                data = reader.read(tMessage.getOffset(), tMessage.getLength());
                this.mBlockWorker.accessBlock(sessionId, blockId);
                dataLen = data.limit();
            }
            DataServerMessage tResponseMessage = DataServerMessage.createBlockResponseMessage((boolean)true, (long)blockId, (long)tMessage.getOffset(), (long)dataLen, (ByteBuffer)data);
            tResponseMessage.setLockId(lockId);
            this.mSendingData.put(socketChannel, tResponseMessage);
        }
    }

    @Override
    public void run() {
        while (!this.mShutdown) {
            try {
                this.mSelector.select();
                if (this.mShutdown) break;
                Iterator<SelectionKey> selectKeys = this.mSelector.selectedKeys().iterator();
                while (selectKeys.hasNext()) {
                    SelectionKey key = selectKeys.next();
                    selectKeys.remove();
                    if (!key.isValid()) continue;
                    if (key.isAcceptable()) {
                        this.accept(key);
                        continue;
                    }
                    if (key.isReadable()) {
                        this.read(key);
                        continue;
                    }
                    if (!key.isWritable()) continue;
                    this.write(key);
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                if (this.mShutdown) break;
                try {
                    this.close();
                }
                catch (Exception e2) {
                    LOG.error("Exception when closing data server. message: {}", (Object)e2.getMessage());
                }
                this.mShutdownComplete = true;
                throw new RuntimeException(e);
            }
        }
        this.mShutdownComplete = true;
    }

    private void write(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        DataServerMessage sendMessage = this.mSendingData.get(socketChannel);
        boolean closeChannel = false;
        try {
            sendMessage.send(socketChannel);
        }
        catch (IOException e) {
            closeChannel = true;
            LOG.error(e.getMessage());
        }
        if (sendMessage.finishSending() || closeChannel) {
            try {
                key.channel().close();
            }
            catch (IOException e) {
                LOG.error(e.getMessage());
            }
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            sendMessage.close();
        }
    }
}

