/*
 * Decompiled with CFR 0.152.
 */
package alluxio.worker.netty;

import alluxio.Configuration;
import alluxio.StorageTierAssoc;
import alluxio.WorkerStorageTierAssoc;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.InvalidWorkerStateException;
import alluxio.network.protocol.RPCBlockReadRequest;
import alluxio.network.protocol.RPCBlockReadResponse;
import alluxio.network.protocol.RPCBlockWriteRequest;
import alluxio.network.protocol.RPCBlockWriteResponse;
import alluxio.network.protocol.RPCErrorResponse;
import alluxio.network.protocol.RPCMessage;
import alluxio.network.protocol.RPCResponse;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataByteBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.netty.ClosableResourceChannelListener;
import alluxio.worker.netty.FileTransferType;
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
@NotThreadSafe
public final class DataServerHandler
extends SimpleChannelInboundHandler<RPCMessage> {
    private static final Logger LOG = LoggerFactory.getLogger((String)"alluxio.logger.type");
    private final BlockWorker mBlockWorker;
    private final Configuration mConfiguration;
    private final StorageTierAssoc mStorageTierAssoc;
    private final FileTransferType mTransferType;

    public DataServerHandler(BlockWorker blockWorker, Configuration configuration) {
        this.mBlockWorker = (BlockWorker)Preconditions.checkNotNull((Object)blockWorker);
        this.mConfiguration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.mStorageTierAssoc = new WorkerStorageTierAssoc(this.mConfiguration);
        this.mTransferType = (FileTransferType)this.mConfiguration.getEnum("alluxio.worker.network.netty.file.transfer", FileTransferType.class);
    }

    public void channelRead0(ChannelHandlerContext ctx, RPCMessage msg) throws IOException {
        switch (msg.getType()) {
            case RPC_BLOCK_READ_REQUEST: {
                assert (msg instanceof RPCBlockReadRequest);
                this.handleBlockReadRequest(ctx, (RPCBlockReadRequest)msg);
                break;
            }
            case RPC_BLOCK_WRITE_REQUEST: {
                assert (msg instanceof RPCBlockWriteRequest);
                this.handleBlockWriteRequest(ctx, (RPCBlockWriteRequest)msg);
                break;
            }
            default: {
                RPCErrorResponse resp = new RPCErrorResponse(RPCResponse.Status.UNKNOWN_MESSAGE_ERROR);
                ctx.writeAndFlush((Object)resp);
                throw new IllegalArgumentException("No handler implementation for rpc msg type: " + msg.getType());
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOG.warn("Exception thrown while processing request", cause);
        ctx.close();
    }

    private void handleBlockReadRequest(ChannelHandlerContext ctx, RPCBlockReadRequest req) throws IOException {
        block5: {
            BlockReader reader;
            long blockId = req.getBlockId();
            long offset = req.getOffset();
            long len = req.getLength();
            long lockId = req.getLockId();
            long sessionId = req.getSessionId();
            try {
                reader = this.mBlockWorker.readBlockRemote(sessionId, blockId, lockId);
            }
            catch (BlockDoesNotExistException e) {
                throw new IOException(e);
            }
            catch (InvalidWorkerStateException e) {
                throw new IOException(e);
            }
            try {
                req.validate();
                long fileLength = reader.getLength();
                this.validateBounds(req, fileLength);
                long readLength = this.returnLength(offset, len, fileLength);
                RPCBlockReadResponse resp = new RPCBlockReadResponse(blockId, offset, readLength, this.getDataBuffer(req, reader, readLength), RPCResponse.Status.SUCCESS);
                ChannelFuture future = ctx.writeAndFlush((Object)resp);
                future.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                future.addListener((GenericFutureListener)new ClosableResourceChannelListener((Closeable)reader));
                this.mBlockWorker.accessBlock(sessionId, blockId);
                LOG.info("Preparation for responding to remote block request for: {} done.", (Object)blockId);
            }
            catch (Exception e) {
                LOG.error("The file is not here : {}", (Object)e.getMessage(), (Object)e);
                RPCBlockReadResponse resp = RPCBlockReadResponse.createErrorResponse((RPCBlockReadRequest)req, (RPCResponse.Status)RPCResponse.Status.FILE_DNE);
                ChannelFuture future = ctx.writeAndFlush((Object)resp);
                future.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                if (reader == null) break block5;
                reader.close();
            }
        }
    }

    private void handleBlockWriteRequest(ChannelHandlerContext ctx, RPCBlockWriteRequest req) throws IOException {
        block4: {
            long sessionId = req.getSessionId();
            long blockId = req.getBlockId();
            long offset = req.getOffset();
            long length = req.getLength();
            DataBuffer data = req.getPayloadDataBuffer();
            BlockWriter writer = null;
            try {
                req.validate();
                ByteBuffer buffer = data.getReadOnlyByteBuffer();
                if (offset == 0L) {
                    this.mBlockWorker.createBlockRemote(sessionId, blockId, this.mStorageTierAssoc.getAlias(0), length);
                } else {
                    this.mBlockWorker.requestSpace(sessionId, blockId, length);
                }
                writer = this.mBlockWorker.getTempBlockWriterRemote(sessionId, blockId);
                writer.append(buffer);
                RPCBlockWriteResponse resp = new RPCBlockWriteResponse(sessionId, blockId, offset, length, RPCResponse.Status.SUCCESS);
                ChannelFuture future = ctx.writeAndFlush((Object)resp);
                future.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                future.addListener((GenericFutureListener)new ClosableResourceChannelListener((Closeable)writer));
            }
            catch (Exception e) {
                LOG.error("Error writing remote block : {}", (Object)e.getMessage(), (Object)e);
                RPCBlockWriteResponse resp = RPCBlockWriteResponse.createErrorResponse((RPCBlockWriteRequest)req, (RPCResponse.Status)RPCResponse.Status.WRITE_ERROR);
                ChannelFuture future = ctx.writeAndFlush((Object)resp);
                future.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                if (writer == null) break block4;
                writer.close();
            }
        }
    }

    private long returnLength(long offset, long len, long fileLength) {
        return len == -1L ? fileLength - offset : len;
    }

    private void validateBounds(RPCBlockReadRequest req, long fileLength) {
        Preconditions.checkArgument((req.getOffset() <= fileLength ? 1 : 0) != 0, (String)"Offset(%s) is larger than file length(%s)", (Object[])new Object[]{req.getOffset(), fileLength});
        Preconditions.checkArgument((req.getLength() == -1L || req.getOffset() + req.getLength() <= fileLength ? 1 : 0) != 0, (String)"Offset(%s) plus length(%s) is larger than file length(%s)", (Object[])new Object[]{req.getOffset(), req.getLength(), fileLength});
    }

    private DataBuffer getDataBuffer(RPCBlockReadRequest req, BlockReader reader, long readLength) throws IOException, IllegalArgumentException {
        switch (this.mTransferType) {
            case MAPPED: {
                ByteBuffer data = reader.read(req.getOffset(), (long)((int)readLength));
                return new DataByteBuffer(data, readLength);
            }
        }
        if (reader.getChannel() instanceof FileChannel) {
            return new DataFileChannel((FileChannel)reader.getChannel(), req.getOffset(), readLength);
        }
        reader.close();
        throw new IllegalArgumentException("Only FileChannel is supported!");
    }
}

