/*
 * Decompiled with CFR 0.152.
 */
package udt;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import udt.UDTSocket;
import udt.util.UDTStatistics;

public class UDTInputStream
extends InputStream {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final UDTSocket socket;
    private final PriorityBlockingQueue<AppData> appData;
    private final UDTStatistics statistics;
    private volatile long highestSequenceNumber = 0L;
    private final AtomicBoolean expectMoreData = new AtomicBoolean(true);
    private volatile boolean closed = false;
    private volatile boolean blocking = true;
    private final byte[] single = new byte[1];
    private AppData currentChunk = null;
    int offset = 0;

    public UDTInputStream(UDTSocket socket, UDTStatistics statistics) throws IOException {
        this.socket = socket;
        this.statistics = statistics;
        int capacity = socket != null ? 4 * socket.getSession().getFlowWindowSize() : 64;
        this.appData = new PriorityBlockingQueue(capacity);
    }

    public UDTInputStream(UDTSocket socket) throws IOException {
        this(socket, socket.getSession().getStatistics());
    }

    @Override
    public int read() throws IOException {
        this.log.info("Reading single byte");
        int b = 0;
        while (b == 0) {
            b = this.read(this.single);
        }
        if (b > 0) {
            return this.single[0];
        }
        return b;
    }

    @Override
    public int read(byte[] target, int off, int len) throws IOException {
        this.log.info("Reading data with offset '" + off + "' and len '" + len + "'");
        return this.read(target, off, len, 1);
    }

    private int read(byte[] target, int off, int len, int numCalls) throws IOException {
        this.log.info("Reading data with offset '" + off + "' and len '" + len + "'");
        if (target == null) {
            throw new NullPointerException();
        }
        if (off < 0 || len < 0 || len > target.length - off) {
            this.log.error("Throwing index out of bounds!");
            throw new IndexOutOfBoundsException();
        }
        if (len == 0) {
            return 0;
        }
        try {
            int read = 0;
            this.log.debug("About to update chunk");
            this.updateCurrentChunk(false);
            this.log.debug("Updated chunk...starting while");
            while (this.currentChunk != null) {
                byte[] data = this.currentChunk.data;
                int targetMax = target.length - read - off;
                int sourceMax = data.length - this.offset;
                int length = Math.min(targetMax, sourceMax);
                length = Math.min(length, len);
                System.arraycopy(data, this.offset, target, read + off, length);
                read += length;
                this.offset += length;
                if (this.offset >= data.length) {
                    this.currentChunk = null;
                    this.offset = 0;
                }
                if (read == target.length || read == len) {
                    this.log.info("Returning read of: " + read);
                    return read;
                }
                this.updateCurrentChunk(this.blocking && read == 0);
            }
            if (read > 0) {
                this.log.info("Returning positive read");
                return read;
            }
            if (this.closed) {
                this.log.info("Closed, returning -1");
                return -1;
            }
            if (this.expectMoreData.get() || !this.appData.isEmpty()) {
                this.log.info("Waiting for more data");
                Thread.sleep(100 * (numCalls * 2));
                return this.read(target, off, len, numCalls + 1);
            }
            this.log.info("Reached end -- no more data!!");
            return -1;
        }
        catch (Exception ex) {
            IOException e = new IOException("Exception during read!!");
            e.initCause(ex);
            throw e;
        }
    }

    @Override
    public int read(byte[] target) throws IOException {
        this.log.info("Reading with straight byte array");
        return this.read(target, 1);
    }

    private int read(byte[] target, int numCalls) throws IOException {
        this.log.info("Reading with straight byte array");
        try {
            int read = 0;
            this.updateCurrentChunk(false);
            while (this.currentChunk != null) {
                byte[] data = this.currentChunk.data;
                int length = Math.min(target.length - read, data.length - this.offset);
                System.arraycopy(data, this.offset, target, read, length);
                read += length;
                this.offset += length;
                if (this.offset >= data.length) {
                    this.currentChunk = null;
                    this.offset = 0;
                }
                if (read == target.length) {
                    this.log.info("Returning amount read: " + read);
                    return read;
                }
                this.updateCurrentChunk(this.blocking && read == 0);
            }
            if (read > 0) {
                this.log.info("Returning positive read");
                return read;
            }
            if (this.closed) {
                this.log.info("Closed, returning -1");
                return -1;
            }
            if (this.expectMoreData.get() || !this.appData.isEmpty()) {
                this.log.info("Waiting for more data");
                Thread.sleep(100 * (numCalls * 2));
                return this.read(target, numCalls + 1);
            }
            this.log.info("Reached end -- no more data!!");
            return -1;
        }
        catch (Exception ex) {
            IOException e = new IOException();
            e.initCause(ex);
            throw e;
        }
    }

    private void updateCurrentChunk(boolean block) throws IOException {
        block8: {
            if (this.currentChunk != null) {
                return;
            }
            while (true) {
                try {
                    if (block) {
                        this.currentChunk = this.appData.poll(1L, TimeUnit.MILLISECONDS);
                        while (!this.closed && this.currentChunk == null) {
                            this.currentChunk = this.appData.poll(1000L, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        this.currentChunk = this.appData.poll(10L, TimeUnit.MILLISECONDS);
                    }
                }
                catch (InterruptedException ie) {
                    IOException ex = new IOException();
                    ex.initCause(ie);
                    throw ex;
                }
                if (this.currentChunk == null) break block8;
                if (this.currentChunk.sequenceNumber == this.highestSequenceNumber + 1L) {
                    ++this.highestSequenceNumber;
                    return;
                }
                if (this.currentChunk.sequenceNumber > this.highestSequenceNumber) break;
                this.currentChunk = null;
                this.statistics.incNumberOfDuplicateDataPackets();
            }
            this.appData.offer(this.currentChunk);
            this.currentChunk = null;
            return;
        }
    }

    protected boolean haveNewData(long sequenceNumber, byte[] data) throws IOException {
        if (sequenceNumber <= this.highestSequenceNumber) {
            return true;
        }
        return this.appData.offer(new AppData(sequenceNumber, data));
    }

    @Override
    public void close() throws IOException {
        this.log.info("Closing input stream.");
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.noMoreData();
    }

    public UDTSocket getSocket() {
        return this.socket;
    }

    public void setBlocking(boolean block) {
        this.blocking = block;
    }

    protected void noMoreData() throws IOException {
        this.expectMoreData.set(false);
    }

    public static class AppData
    implements Comparable<AppData> {
        final long sequenceNumber;
        final byte[] data;

        AppData(long sequenceNumber, byte[] data) {
            this.sequenceNumber = sequenceNumber;
            this.data = data;
        }

        @Override
        public int compareTo(AppData o) {
            return (int)(this.sequenceNumber - o.sequenceNumber);
        }

        public String toString() {
            return this.sequenceNumber + "[" + this.data.length + "]";
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (int)(this.sequenceNumber ^ this.sequenceNumber >>> 32);
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            AppData other = (AppData)obj;
            return this.sequenceNumber == other.sequenceNumber;
        }
    }
}

