/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file;

import alluxio.client.AlluxioStorageType;
import alluxio.client.BoundedStream;
import alluxio.client.Seekable;
import alluxio.client.block.BlockInStream;
import alluxio.client.block.BufferedBlockOutStream;
import alluxio.client.block.LocalBlockInStream;
import alluxio.client.block.UnderStoreBlockInStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.policy.FileWriteLocationPolicy;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.master.block.BlockId;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class FileInStream
extends InputStream
implements BoundedStream,
Seekable {
    private static final Logger LOG = LoggerFactory.getLogger((String)"alluxio.logger.type");
    private final AlluxioStorageType mAlluxioStorageType;
    private final long mBlockSize;
    private final FileWriteLocationPolicy mLocationPolicy;
    private final long mFileLength;
    private final FileSystemContext mContext;
    private final URIStatus mStatus;
    private static final String BLOCK_ID_NOT_CACHED = "The block with ID {} could not be cached into Alluxio storage.";
    private boolean mClosed;
    private boolean mShouldCacheCurrentBlock;
    private long mPos;
    private BlockInStream mCurrentBlockInStream;
    private BufferedBlockOutStream mCurrentCacheStream;

    public FileInStream(URIStatus status, InStreamOptions options) {
        this.mStatus = status;
        this.mBlockSize = status.getBlockSizeBytes();
        this.mFileLength = status.getLength();
        this.mContext = FileSystemContext.INSTANCE;
        this.mAlluxioStorageType = options.getAlluxioStorageType();
        this.mShouldCacheCurrentBlock = this.mAlluxioStorageType.isStore();
        this.mClosed = false;
        this.mLocationPolicy = options.getLocationPolicy();
        if (this.mShouldCacheCurrentBlock) {
            Preconditions.checkNotNull((Object)options.getLocationPolicy(), (Object)"The location policy is not specified");
        }
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
        }
        this.closeCacheStream();
        this.mClosed = true;
    }

    @Override
    public int read() throws IOException {
        if (this.mPos >= this.mFileLength) {
            return -1;
        }
        this.checkAndAdvanceBlockInStream();
        int data = this.mCurrentBlockInStream.read();
        ++this.mPos;
        if (this.mShouldCacheCurrentBlock) {
            try {
                this.mCurrentCacheStream.write(data);
            }
            catch (IOException e) {
                LOG.warn(BLOCK_ID_NOT_CACHED, (Object)this.getCurrentBlockId(), (Object)e);
                this.mShouldCacheCurrentBlock = false;
            }
        }
        return data;
    }

    @Override
    public int read(byte[] b) throws IOException {
        return this.read(b, 0, b.length);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        Preconditions.checkArgument((b != null ? 1 : 0) != 0, (Object)"Read buffer cannot be null");
        Preconditions.checkArgument((off >= 0 && len >= 0 && len + off <= b.length ? 1 : 0) != 0, (String)"Buffer length: %s, offset: %s, len: %s", (Object[])new Object[]{b.length, off, len});
        if (len == 0) {
            return 0;
        }
        if (this.mPos >= this.mFileLength) {
            return -1;
        }
        int currentOffset = off;
        int bytesLeftToRead = len;
        while (bytesLeftToRead > 0 && this.mPos < this.mFileLength) {
            this.checkAndAdvanceBlockInStream();
            int bytesToRead = (int)Math.min((long)bytesLeftToRead, this.mCurrentBlockInStream.remaining());
            int bytesRead = this.mCurrentBlockInStream.read(b, currentOffset, bytesToRead);
            if (bytesRead > 0 && this.mShouldCacheCurrentBlock) {
                try {
                    this.mCurrentCacheStream.write(b, currentOffset, bytesRead);
                }
                catch (IOException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, (Object)this.getCurrentBlockId(), (Object)e);
                    this.mShouldCacheCurrentBlock = false;
                }
            }
            if (bytesRead == -1) continue;
            this.mPos += (long)bytesRead;
            bytesLeftToRead -= bytesRead;
            currentOffset += bytesRead;
        }
        return len - bytesLeftToRead;
    }

    @Override
    public long remaining() {
        return this.mFileLength - this.mPos;
    }

    @Override
    public void seek(long pos) throws IOException {
        if (this.mPos == pos) {
            return;
        }
        Preconditions.checkArgument((pos >= 0L ? 1 : 0) != 0, (String)"Seek position is negative: %s", (Object[])new Object[]{pos});
        Preconditions.checkArgument((pos < this.mFileLength ? 1 : 0) != 0, (String)"Seek position past end of file: %s", (Object[])new Object[]{pos});
        this.seekBlockInStream(pos);
        this.checkAndAdvanceBlockInStream();
        this.mCurrentBlockInStream.seek(this.mPos % this.mBlockSize);
    }

    @Override
    public long skip(long n) throws IOException {
        if (n <= 0L) {
            return 0L;
        }
        long toSkip = Math.min(n, this.mFileLength - this.mPos);
        long newPos = this.mPos + toSkip;
        long toSkipInBlock = newPos / this.mBlockSize > this.mPos / this.mBlockSize ? newPos % this.mBlockSize : toSkip;
        this.seekBlockInStream(newPos);
        this.checkAndAdvanceBlockInStream();
        if (toSkipInBlock != this.mCurrentBlockInStream.skip(toSkipInBlock)) {
            throw new IOException(ExceptionMessage.INSTREAM_CANNOT_SKIP.getMessage(new Object[]{toSkip}));
        }
        return toSkip;
    }

    private void checkAndAdvanceBlockInStream() throws IOException {
        long currentBlockId = this.getCurrentBlockId();
        if (this.mCurrentBlockInStream == null || this.mCurrentBlockInStream.remaining() == 0L) {
            this.closeCacheStream();
            this.updateBlockInStream(currentBlockId);
            if (this.mShouldCacheCurrentBlock) {
                try {
                    long blockSize = this.getCurrentBlockSize();
                    WorkerNetAddress address = this.mLocationPolicy.getWorkerForNextBlock(this.mContext.getAluxioBlockStore().getWorkerInfoList(), blockSize);
                    this.mCurrentCacheStream = this.mContext.getAluxioBlockStore().getOutStream(currentBlockId, blockSize, address);
                }
                catch (IOException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, (Object)currentBlockId, (Object)e);
                    this.mShouldCacheCurrentBlock = false;
                }
                catch (AlluxioException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, (Object)currentBlockId, (Object)e);
                    throw new IOException(e);
                }
            }
        }
    }

    private void closeCacheStream() throws IOException {
        if (this.mCurrentCacheStream == null) {
            return;
        }
        if (this.mCurrentCacheStream.remaining() == 0L) {
            this.mCurrentCacheStream.close();
        } else {
            this.mCurrentCacheStream.cancel();
        }
        this.mShouldCacheCurrentBlock = false;
    }

    private long getCurrentBlockId() {
        if (this.mPos == this.mFileLength) {
            return -1L;
        }
        int index = (int)(this.mPos / this.mBlockSize);
        Preconditions.checkState((index < this.mStatus.getBlockIds().size() ? 1 : 0) != 0, (Object)"Current block index exceeds max index");
        return this.mStatus.getBlockIds().get(index);
    }

    private long getCurrentBlockSize() {
        long lastBlockSize = this.mFileLength % this.mBlockSize;
        if (this.mFileLength - this.mPos > lastBlockSize) {
            return this.mBlockSize;
        }
        return lastBlockSize;
    }

    private void seekBlockInStream(long newPos) throws IOException {
        long oldBlockId = this.getCurrentBlockId();
        this.mPos = newPos;
        this.closeCacheStream();
        long currentBlockId = this.getCurrentBlockId();
        if (oldBlockId != currentBlockId) {
            this.updateBlockInStream(currentBlockId);
            if (this.mPos % this.mBlockSize == 0L && this.mShouldCacheCurrentBlock) {
                try {
                    long blockSize = this.getCurrentBlockSize();
                    WorkerNetAddress address = this.mLocationPolicy.getWorkerForNextBlock(this.mContext.getAluxioBlockStore().getWorkerInfoList(), blockSize);
                    this.mCurrentCacheStream = this.mContext.getAluxioBlockStore().getOutStream(currentBlockId, blockSize, address);
                }
                catch (IOException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, (Object)this.getCurrentBlockId(), (Object)e);
                    this.mShouldCacheCurrentBlock = false;
                }
                catch (AlluxioException e) {
                    LOG.warn(BLOCK_ID_NOT_CACHED, (Object)currentBlockId, (Object)e);
                    throw new IOException(e);
                }
            } else {
                this.mShouldCacheCurrentBlock = false;
            }
        }
    }

    private void updateBlockInStream(long blockId) throws IOException {
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
        }
        try {
            if (this.mAlluxioStorageType.isPromote()) {
                try {
                    this.mContext.getAluxioBlockStore().promote(blockId);
                }
                catch (IOException e) {
                    LOG.warn("Promotion of block with ID {} failed.", (Object)blockId, (Object)e);
                }
            }
            this.mCurrentBlockInStream = this.mContext.getAluxioBlockStore().getInStream(blockId);
            this.mShouldCacheCurrentBlock = !(this.mCurrentBlockInStream instanceof LocalBlockInStream) && this.mAlluxioStorageType.isStore();
        }
        catch (IOException e) {
            LOG.debug("Failed to get BlockInStream for block with ID {}, using UFS instead. {}", (Object)blockId, (Object)e);
            if (!this.mStatus.isPersisted()) {
                LOG.error("Could not obtain data for block with ID {} from Alluxio. The block will not be persisted in the under file storage.", (Object)blockId);
                throw e;
            }
            long blockStart = BlockId.getSequenceNumber((long)blockId) * this.mBlockSize;
            this.mCurrentBlockInStream = new UnderStoreBlockInStream(blockStart, this.mBlockSize, this.mStatus.getUfsPath());
            this.mShouldCacheCurrentBlock = this.mAlluxioStorageType.isStore();
        }
    }
}

