/*
 * Decompiled with CFR 0.152.
 */
package udt;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import udt.CongestionControl;
import udt.UDPEndPoint;
import udt.UDTClient;
import udt.UDTPacket;
import udt.UDTSession;
import udt.packets.Acknowledgement;
import udt.packets.Acknowledgment2;
import udt.packets.DataPacket;
import udt.packets.KeepAlive;
import udt.packets.NegativeAcknowledgement;
import udt.sender.SenderLossList;
import udt.util.MeanThroughput;
import udt.util.MeanValue;
import udt.util.UDTStatistics;
import udt.util.UDTThreadFactory;
import udt.util.Util;

public class UDTSender {
    private static final Logger logger = Logger.getLogger(UDTClient.class.getName());
    private final UDPEndPoint endpoint;
    private final UDTSession session;
    private final UDTStatistics statistics;
    private final SenderLossList senderLossList;
    private final Map<Long, DataPacket> sendBuffer;
    private final BlockingQueue<DataPacket> sendQueue;
    private Thread senderThread;
    private final Object sendLock = new Object();
    private final AtomicInteger unacknowledged = new AtomicInteger(0);
    private volatile long currentSequenceNumber = 0L;
    private volatile long largestSentSequenceNumber = -1L;
    private volatile long lastAckSequenceNumber;
    private volatile boolean started = false;
    private volatile boolean stopped = false;
    private volatile boolean paused = false;
    private volatile CountDownLatch startLatch = new CountDownLatch(1);
    private final AtomicReference<CountDownLatch> waitForAckLatch = new AtomicReference();
    private final AtomicReference<CountDownLatch> waitForSeqAckLatch = new AtomicReference();
    private final boolean storeStatistics;
    private MeanValue dgSendTime;
    private MeanValue dgSendInterval;
    private MeanThroughput throughput;
    long iterationStart;

    public UDTSender(UDTSession session, UDPEndPoint endpoint) {
        if (!session.isReady()) {
            throw new IllegalStateException("UDTSession is not ready.");
        }
        this.endpoint = endpoint;
        this.session = session;
        this.statistics = session.getStatistics();
        this.senderLossList = new SenderLossList();
        this.sendBuffer = new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(), 0.75f, 2);
        this.sendQueue = new ArrayBlockingQueue<DataPacket>(1000);
        this.lastAckSequenceNumber = session.getInitialSequenceNumber();
        this.waitForAckLatch.set(new CountDownLatch(1));
        this.waitForSeqAckLatch.set(new CountDownLatch(1));
        this.storeStatistics = Boolean.getBoolean("udt.sender.storeStatistics");
        this.initMetrics();
        this.doStart();
    }

    private void initMetrics() {
        if (!this.storeStatistics) {
            return;
        }
        this.dgSendTime = new MeanValue("Datagram send time");
        this.statistics.addMetric(this.dgSendTime);
        this.dgSendInterval = new MeanValue("Datagram send interval");
        this.statistics.addMetric(this.dgSendInterval);
        this.throughput = new MeanThroughput("Throughput", this.session.getDatagramSize());
        this.statistics.addMetric(this.throughput);
    }

    public void start() {
        logger.info("Starting sender for " + this.session);
        this.startLatch.countDown();
        this.started = true;
    }

    private void doStart() {
        Runnable r = new Runnable(){

            @Override
            public void run() {
                try {
                    while (!UDTSender.this.stopped) {
                        UDTSender.this.startLatch.await();
                        UDTSender.this.paused = false;
                        UDTSender.this.senderAlgorithm();
                    }
                }
                catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
                catch (IOException ex) {
                    ex.printStackTrace();
                    logger.log(Level.SEVERE, "", ex);
                }
                logger.info("STOPPING SENDER for " + UDTSender.this.session);
            }
        };
        this.senderThread = UDTThreadFactory.get().newThread(r);
        this.senderThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send(DataPacket p) throws IOException {
        Object object = this.sendLock;
        synchronized (object) {
            if (this.storeStatistics) {
                this.dgSendInterval.end();
                this.dgSendTime.begin();
            }
            this.endpoint.doSend(p);
            if (this.storeStatistics) {
                this.dgSendTime.end();
                this.dgSendInterval.begin();
                this.throughput.end();
                this.throughput.begin();
            }
            this.sendBuffer.put(p.getPacketSequenceNumber(), p);
            this.unacknowledged.incrementAndGet();
        }
        this.statistics.incNumberOfSentDataPackets();
    }

    protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units) throws IOException, InterruptedException {
        if (!this.started) {
            this.start();
        }
        logger.info("Sending packet!!");
        return this.sendQueue.offer(p, timeout, units);
    }

    protected void receive(UDTPacket p) throws IOException {
        if (p instanceof Acknowledgement) {
            Acknowledgement acknowledgement = (Acknowledgement)p;
            this.onAcknowledge(acknowledgement);
        } else if (p instanceof NegativeAcknowledgement) {
            NegativeAcknowledgement nak = (NegativeAcknowledgement)p;
            this.onNAKPacketReceived(nak);
        } else if (p instanceof KeepAlive) {
            this.session.getSocket().getReceiver().resetEXPCount();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onAcknowledge(Acknowledgement acknowledgement) throws IOException {
        long rate;
        this.waitForAckLatch.get().countDown();
        this.waitForSeqAckLatch.get().countDown();
        CongestionControl cc = this.session.getCongestionControl();
        long rtt = acknowledgement.getRoundTripTime();
        if (rtt > 0L) {
            long rttVar = acknowledgement.getRoundTripTimeVar();
            cc.setRTT(rtt, rttVar);
            this.statistics.setRTT(rtt, rttVar);
        }
        if ((rate = acknowledgement.getPacketReceiveRate()) > 0L) {
            long linkCapacity = acknowledgement.getEstimatedLinkCapacity();
            cc.updatePacketArrivalRate(rate, linkCapacity);
            this.statistics.setPacketArrivalRate(cc.getPacketArrivalRate(), cc.getEstimatedLinkCapacity());
        }
        long ackNumber = acknowledgement.getAckNumber();
        cc.onACK(ackNumber);
        this.statistics.setCongestionWindowSize((long)cc.getCongestionWindowSize());
        boolean removed = false;
        for (long s = this.lastAckSequenceNumber; s < ackNumber; ++s) {
            Object object = this.sendLock;
            synchronized (object) {
                removed = this.sendBuffer.remove(s) != null;
            }
            if (!removed) continue;
            this.unacknowledged.decrementAndGet();
        }
        this.lastAckSequenceNumber = Math.max(this.lastAckSequenceNumber, ackNumber);
        this.sendAck2(ackNumber);
        this.statistics.incNumberOfACKReceived();
        if (this.storeStatistics) {
            this.statistics.storeParameters();
        }
    }

    protected void onNAKPacketReceived(NegativeAcknowledgement nak) {
        for (Integer i : nak.getDecodedLossInfo()) {
            this.senderLossList.insert((long)i);
        }
        this.session.getCongestionControl().onLoss(nak.getDecodedLossInfo());
        this.session.getSocket().getReceiver().resetEXPTimer();
        this.statistics.incNumberOfNAKReceived();
        if (logger.isLoggable(Level.FINER)) {
            logger.finer("NAK for " + nak.getDecodedLossInfo().size() + " packets lost, " + "set send period to " + this.session.getCongestionControl().getSendInterval());
        }
    }

    protected void sendKeepAlive() throws Exception {
        KeepAlive keepAlive = new KeepAlive();
        keepAlive.setSession(this.session);
        this.endpoint.doSend(keepAlive);
    }

    protected void sendAck2(long ackSequenceNumber) throws IOException {
        Acknowledgment2 ackOfAckPkt = new Acknowledgment2();
        ackOfAckPkt.setAckSequenceNumber(ackSequenceNumber);
        ackOfAckPkt.setSession(this.session);
        ackOfAckPkt.setDestinationID(this.session.getDestination().getSocketID());
        this.endpoint.doSend(ackOfAckPkt);
    }

    public void senderAlgorithm() throws InterruptedException, IOException {
        while (!this.paused) {
            this.iterationStart = Util.getCurrentTime();
            if (!this.senderLossList.isEmpty()) {
                Long entry = this.senderLossList.getFirstEntry();
                this.handleResubmit(entry);
            } else {
                int unAcknowledged = this.unacknowledged.get();
                if ((double)unAcknowledged < this.session.getCongestionControl().getCongestionWindowSize() && unAcknowledged < this.session.getFlowWindowSize()) {
                    DataPacket dp = this.sendQueue.poll(10000L, TimeUnit.MICROSECONDS);
                    if (dp != null) {
                        this.send(dp);
                        this.largestSentSequenceNumber = dp.getPacketSequenceNumber();
                    } else {
                        this.statistics.incNumberOfMissingDataEvents();
                    }
                } else {
                    if ((double)unAcknowledged >= this.session.getCongestionControl().getCongestionWindowSize()) {
                        this.statistics.incNumberOfCCWindowExceededEvents();
                    }
                    this.waitForAck();
                }
            }
            if (this.largestSentSequenceNumber % 16L == 0L) continue;
            long snd = (long)this.session.getCongestionControl().getSendInterval();
            long passed = Util.getCurrentTime() - this.iterationStart;
            int x = 0;
            while (snd - passed > 0L) {
                if (x == 0) {
                    this.statistics.incNumberOfCCSlowDownEvents();
                    ++x;
                }
                passed = Util.getCurrentTime() - this.iterationStart;
                if (!this.stopped) continue;
                return;
            }
        }
    }

    protected void handleResubmit(Long seqNumber) {
        try {
            DataPacket pktToRetransmit = this.sendBuffer.get(seqNumber);
            if (pktToRetransmit != null) {
                this.endpoint.doSend(pktToRetransmit);
                this.statistics.incNumberOfRetransmittedDataPackets();
            }
        }
        catch (Exception e) {
            logger.log(Level.WARNING, "", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void putUnacknowledgedPacketsIntoLossList() {
        Object object = this.sendLock;
        synchronized (object) {
            for (Long l : this.sendBuffer.keySet()) {
                this.senderLossList.insert(l);
            }
        }
    }

    public long getNextSequenceNumber() {
        ++this.currentSequenceNumber;
        return this.currentSequenceNumber;
    }

    public long getCurrentSequenceNumber() {
        return this.currentSequenceNumber;
    }

    public long getLargestSentSequenceNumber() {
        return this.largestSentSequenceNumber;
    }

    public long getLastAckSequenceNumber() {
        return this.lastAckSequenceNumber;
    }

    boolean haveAcknowledgementFor(long sequenceNumber) {
        return sequenceNumber <= this.lastAckSequenceNumber;
    }

    boolean isSentOut(long sequenceNumber) {
        return this.largestSentSequenceNumber >= sequenceNumber;
    }

    boolean haveLostPackets() {
        return !this.senderLossList.isEmpty();
    }

    public void waitForAck(long sequenceNumber) throws InterruptedException {
        while (!this.session.isShutdown() && !this.haveAcknowledgementFor(sequenceNumber)) {
            this.waitForSeqAckLatch.set(new CountDownLatch(1));
            this.waitForSeqAckLatch.get().await(10L, TimeUnit.MILLISECONDS);
        }
    }

    public void waitForAck() throws InterruptedException {
        this.waitForAckLatch.set(new CountDownLatch(1));
        this.waitForAckLatch.get().await(2L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.stopped = true;
    }

    public void pause() {
        this.startLatch = new CountDownLatch(1);
        this.paused = true;
    }
}

