/*
 * Decompiled with CFR 0.152.
 */
package com.zx.sms.session;

import com.zx.sms.BaseMessage;
import com.zx.sms.common.SendFailException;
import com.zx.sms.common.SmsLifeTerminateException;
import com.zx.sms.common.storedMap.VersionObject;
import com.zx.sms.common.util.CachedMillisecondClock;
import com.zx.sms.config.PropertiesUtils;
import com.zx.sms.connect.manager.EndpointConnector;
import com.zx.sms.connect.manager.EndpointEntity;
import com.zx.sms.connect.manager.EndpointManager;
import com.zx.sms.session.cmpp.SessionState;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSessionStateManager<K, T extends BaseMessage>
extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSessionStateManager.class);
    private final Logger errlogger;
    private long msgReadCount = 0L;
    private long msgWriteCount = 0L;
    private EndpointEntity entity;
    private final long version = System.currentTimeMillis();
    private static final ScheduledThreadPoolExecutor msgResend = new ScheduledThreadPoolExecutor(Integer.valueOf(PropertiesUtils.getproperties("GlobalMsgResendThreadCount", "4")), new ThreadFactory(){
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "msgResend-" + this.threadNumber.getAndIncrement());
            t.setDaemon(true);
            if (t.getPriority() != 5) {
                t.setPriority(5);
            }
            return t;
        }
    }, new ThreadPoolExecutor.DiscardPolicy());
    private final ConcurrentHashMap<K, Entry> msgRetryMap = new ConcurrentHashMap();
    private final ConcurrentMap<K, VersionObject<T>> storeMap;
    private ChannelHandlerContext ctx;
    private boolean preSend;
    private boolean preSendover = false;

    public AbstractSessionStateManager(EndpointEntity entity, ConcurrentMap<K, VersionObject<T>> storeMap, boolean preSend) {
        this.entity = entity;
        this.errlogger = LoggerFactory.getLogger((String)("error." + entity.getId()));
        this.storeMap = storeMap;
        this.preSend = preSend;
    }

    public int getWaittingResp() {
        return this.storeMap.size();
    }

    public long getReadCount() {
        return this.msgReadCount;
    }

    public long getWriteCount() {
        return this.msgWriteCount;
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }

    private void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
        ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
        if (cob != null) {
            cob.setUserDefinedWritability(31, writable);
        }
    }

    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        ctx.executor().execute(new Runnable(){

            @Override
            public void run() {
                EndpointConnector<?> conn = EndpointManager.INS.getEndpointConnector(AbstractSessionStateManager.this.entity);
                Iterator itor = AbstractSessionStateManager.this.msgRetryMap.entrySet().iterator();
                while (itor.hasNext()) {
                    Channel ch;
                    boolean async;
                    Map.Entry entry = itor.next();
                    if (entry == null) continue;
                    Entry failedentry = (Entry)entry.getValue();
                    Object requestmsg = failedentry.request;
                    boolean bl = async = !failedentry.sync;
                    if (conn != null && (ch = conn.fetch()) != null && ch.isActive()) {
                        if (AbstractSessionStateManager.this.entity.isReSendFailMsg() && async) {
                            ch.writeAndFlush(requestmsg);
                            logger.warn("current channel {} is closed.send requestMsg {} from other channel {} which is active.", new Object[]{ctx.channel(), requestmsg, ch});
                        } else {
                            AbstractSessionStateManager.this.errlogger.error("Channel closed . Msg {} may not send Success. ", requestmsg);
                        }
                    }
                    AbstractSessionStateManager.this.cancelRetry(failedentry, ctx.channel());
                    AbstractSessionStateManager.this.responseFutureDone(failedentry, new IOException("channel closed."));
                    itor.remove();
                }
                if (AbstractSessionStateManager.this.preSend && !AbstractSessionStateManager.this.preSendover) {
                    for (Map.Entry storeentry : AbstractSessionStateManager.this.storeMap.entrySet()) {
                        if (conn == null) break;
                        Channel ch = conn.fetch();
                        if (ch == null || !ch.isActive()) continue;
                        Object key = storeentry.getKey();
                        VersionObject vobj = (VersionObject)storeentry.getValue();
                        long v = vobj.getVersion();
                        BaseMessage msg = (BaseMessage)vobj.getObj();
                        if (AbstractSessionStateManager.this.version <= v || msg == null) continue;
                        logger.debug("Send last failed msg . {}", (Object)msg);
                        ch.writeAndFlush((Object)msg);
                    }
                }
            }
        });
        ctx.fireChannelInactive();
    }

    protected abstract K getSequenceId(T var1);

    protected abstract boolean needSendAgainByResponse(T var1, T var2);

    protected abstract boolean closeWhenRetryFailed(T var1);

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ++this.msgReadCount;
        if (msg instanceof BaseMessage && ((BaseMessage)msg).isResponse()) {
            BaseMessage response = (BaseMessage)msg;
            K key = this.getSequenceId(response);
            VersionObject vobj = (VersionObject)this.storeMap.remove(key);
            if (vobj != null) {
                BaseMessage request = (BaseMessage)vobj.getObj();
                long sendtime = vobj.getVersion();
                response.setRequest(request);
                long delay = this.delaycheck(sendtime);
                if (delay > (long)(this.entity.getRetryWaitTimeSec() * 1000 / 4)) {
                    this.errlogger.warn("delaycheck . delay :{} , SequenceId :{}", (Object)delay, this.getSequenceId(response));
                    this.setchannelunwritable(ctx, 1000L);
                }
                Entry entry = this.msgRetryMap.get(key);
                if (this.needSendAgainByResponse(request, response)) {
                    this.cancelRetry(entry, ctx.channel());
                    this.setchannelunwritable(ctx, 40L);
                    this.reWriteLater(ctx, entry.request, ctx.newPromise(), 400);
                } else {
                    this.cancelRetry(entry, ctx.channel());
                    this.responseFutureDone(entry, response);
                    this.msgRetryMap.remove(key);
                }
            } else {
                this.errlogger.warn("receive ResponseMessage ,but not found related Request Msg. response:{}", (Object)response);
            }
        }
        ctx.fireChannelRead(msg);
    }

    private long delaycheck(long sendtime) {
        return CachedMillisecondClock.INS.now() - sendtime;
    }

    private void setchannelunwritable(final ChannelHandlerContext ctx, long millitime) {
        if (ctx.channel().isWritable()) {
            this.setUserDefinedWritability(ctx, false);
            ctx.executor().schedule(new Runnable(){

                @Override
                public void run() {
                    AbstractSessionStateManager.this.setUserDefinedWritability(ctx, true);
                }
            }, millitime, TimeUnit.MILLISECONDS);
        }
    }

    public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) throws Exception {
        if (message instanceof BaseMessage) {
            BaseMessage msg = (BaseMessage)message;
            if (msg.isRequest()) {
                this.writeWithWindow(ctx, msg, promise);
            } else {
                ctx.write((Object)msg, promise);
            }
        } else {
            ctx.write(message, promise);
        }
    }

    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == SessionState.Connect) {
            ctx.executor().execute(new Runnable(){

                @Override
                public void run() {
                    AbstractSessionStateManager.this.preSendMsg(ctx);
                }
            });
        }
        ctx.fireUserEventTriggered(evt);
    }

    private boolean writeWithWindow(ChannelHandlerContext ctx, T message, ChannelPromise promise) {
        try {
            this.safewrite(ctx, message, promise, false);
        }
        catch (Exception e) {
            promise.tryFailure((Throwable)e);
            logger.error("writeWithWindow: ", e.getCause() != null ? e.getCause() : e);
        }
        return true;
    }

    private void scheduleRetryMsg(final ChannelHandlerContext ctx, T message) {
        K seq = this.getSequenceId(message);
        final Entry entry = this.msgRetryMap.get(seq);
        if (entry != null) {
            AtomicReference ref = new AtomicReference();
            Runnable task = new Runnable((BaseMessage)message, ref, seq){
                private final /* synthetic */ BaseMessage val$message;
                private final /* synthetic */ AtomicReference val$ref;
                private final /* synthetic */ Object val$seq;
                {
                    this.val$message = baseMessage;
                    this.val$ref = atomicReference;
                    this.val$seq = object;
                }

                @Override
                public void run() {
                    try {
                        if (!ctx.channel().isActive()) {
                            return;
                        }
                        int times = entry.cnt.get();
                        logger.warn("entity : {} , retry Send Msg : {}", (Object)AbstractSessionStateManager.this.entity.getId(), (Object)this.val$message);
                        if (times >= AbstractSessionStateManager.this.entity.getMaxRetryCnt()) {
                            Future future = (Future)this.val$ref.get();
                            if (future != null) {
                                future.cancel(false);
                            }
                            AbstractSessionStateManager.this.cancelRetry(entry, ctx.channel());
                            AbstractSessionStateManager.this.responseFutureDone(entry, new SendFailException("retry send msg over " + times + " times"));
                            AbstractSessionStateManager.this.msgRetryMap.remove(this.val$seq);
                            AbstractSessionStateManager.this.storeMap.remove(this.val$seq);
                            AbstractSessionStateManager.this.errlogger.error("entity : {} , RetryFailed: {}", (Object)AbstractSessionStateManager.this.entity.getId(), (Object)this.val$message);
                            if (AbstractSessionStateManager.this.closeWhenRetryFailed(this.val$message)) {
                                logger.error("entity : {} , retry send {} times Message {} ,the connection may die.close it", new Object[]{AbstractSessionStateManager.this.entity.getId(), times, this.val$message});
                                ctx.close();
                            }
                        } else {
                            AbstractSessionStateManager abstractSessionStateManager = AbstractSessionStateManager.this;
                            abstractSessionStateManager.msgWriteCount = abstractSessionStateManager.msgWriteCount + 1L;
                            entry.cnt.incrementAndGet();
                            ctx.writeAndFlush((Object)this.val$message, ctx.newPromise());
                        }
                    }
                    catch (Throwable e) {
                        logger.error("retry Send Msg Error: {}", (Object)this.val$message);
                        logger.error("retry send Msg Error.", e);
                    }
                }
            };
            ScheduledFuture<?> future = msgResend.scheduleWithFixedDelay(task, this.entity.getRetryWaitTimeSec(), this.entity.getRetryWaitTimeSec(), TimeUnit.SECONDS);
            ref.set(future);
            entry.future = future;
            if (this.msgRetryMap.get(seq) == null) {
                future.cancel(false);
            }
        } else if (entry == null) {
            logger.warn("receive seq {} not exists in msgRetryMap,maybe response received before create retrytask .", seq);
        }
    }

    private Entry responseFutureDone(Entry entry, T response) {
        if (entry != null && entry.resfuture != null) {
            entry.resfuture.setSuccess(response);
            return entry;
        }
        return null;
    }

    private Entry responseFutureDone(Entry entry, Throwable cause) {
        if (entry != null && entry.resfuture != null) {
            entry.resfuture.tryFailure(cause);
            return entry;
        }
        return null;
    }

    private Entry cancelRetry(Entry entry, Channel channel) {
        if (entry != null && entry.future != null) {
            entry.future.cancel(false);
            if (entry.future instanceof RunnableScheduledFuture) {
                msgResend.remove((RunnableScheduledFuture)entry.future);
            }
            entry.future = null;
        } else {
            logger.debug("cancelRetry task failed.");
        }
        return entry;
    }

    private void preSendMsg(ChannelHandlerContext ctx) {
        boolean isbreak = false;
        if (this.preSend) {
            for (Map.Entry entry : this.storeMap.entrySet()) {
                if (!ctx.channel().isActive()) {
                    isbreak = true;
                    break;
                }
                Object key = entry.getKey();
                VersionObject vobj = (VersionObject)entry.getValue();
                long v = vobj.getVersion();
                BaseMessage msg = (BaseMessage)vobj.getObj();
                if (this.version <= v || msg == null) continue;
                logger.debug("Send last failed msg . {}", (Object)msg);
                this.writeWithWindow(ctx, msg, ctx.newPromise());
            }
        }
        this.preSendover = !isbreak;
    }

    private Promise<T> safewrite(final ChannelHandlerContext ctx, T message, ChannelPromise promise, boolean syn) {
        if (ctx.channel().isActive()) {
            if (message.isTerminated()) {
                this.errlogger.error("Msg Life over .{}", message);
                promise.tryFailure((Throwable)new SmsLifeTerminateException("Msg Life over"));
                DefaultPromise failed = new DefaultPromise(ctx.executor());
                failed.tryFailure((Throwable)new SmsLifeTerminateException("Msg Life over"));
                return failed;
            }
            K seq = this.getSequenceId(message);
            boolean has = this.msgRetryMap.containsKey(seq);
            Entry tmpentry = new Entry(this, message, syn);
            if (has) {
                Entry old = this.msgRetryMap.get(seq);
                if (!message.equals(old.request)) {
                    logger.error("has repeat Sequense {}", seq);
                    if (syn) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("seqId:").append(seq);
                        sb.append(".it Has a same sequenceId with another message:").append(old.request).append(". wait it complete.");
                        IOException cause = new IOException(sb.toString());
                        DefaultPromise failed = new DefaultPromise(ctx.executor());
                        failed.tryFailure((Throwable)cause);
                        return failed;
                    }
                    this.reWriteLater(ctx, message, promise, 250);
                    return null;
                }
            } else {
                tmpentry.resfuture = new DefaultPromise(ctx.executor());
                this.msgRetryMap.put(seq, tmpentry);
            }
            ++this.msgWriteCount;
            this.storeMap.put(seq, new VersionObject<T>(message));
            promise.addListener((GenericFutureListener)new ChannelFutureListener((BaseMessage)message, seq){
                private final /* synthetic */ BaseMessage val$message;
                private final /* synthetic */ Object val$seq;
                {
                    this.val$message = baseMessage;
                    this.val$seq = object;
                }

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        AbstractSessionStateManager.this.scheduleRetryMsg(ctx, this.val$message);
                    } else {
                        logger.error("remove fail message Sequense {}", this.val$seq);
                        AbstractSessionStateManager.this.storeMap.remove(this.val$seq);
                        Entry entry = (Entry)AbstractSessionStateManager.this.msgRetryMap.remove(this.val$seq);
                        AbstractSessionStateManager.this.responseFutureDone(entry, future.cause());
                    }
                }
            });
            ctx.writeAndFlush(message, promise);
            return tmpentry.resfuture;
        }
        StringBuilder sb = new StringBuilder();
        sb.append("Connection ").append(ctx.channel()).append(" has closed");
        IOException cause = new IOException(sb.toString());
        if (promise != null && !promise.isDone()) {
            promise.tryFailure((Throwable)cause);
        }
        DefaultPromise failed = new DefaultPromise(ctx.executor());
        failed.tryFailure((Throwable)cause);
        return failed;
    }

    private void reWriteLater(final ChannelHandlerContext ctx, T message, ChannelPromise promise, int delay) {
        msgResend.schedule(new Runnable((BaseMessage)message, promise){
            private final /* synthetic */ BaseMessage val$message;
            private final /* synthetic */ ChannelPromise val$promise;
            {
                this.val$message = baseMessage;
                this.val$promise = channelPromise;
            }

            @Override
            public void run() {
                try {
                    AbstractSessionStateManager.this.write(ctx, this.val$message, this.val$promise);
                }
                catch (Exception e) {
                    logger.error("has repeat Sequense ,and write Msg err {}", (Object)this.val$message);
                }
            }
        }, (long)delay, TimeUnit.MILLISECONDS);
    }

    public Promise<T> writeMessagesync(T message) {
        return this.safewrite(this.ctx, message, this.ctx.newPromise(), true);
    }

    public EndpointEntity getEntity() {
        return this.entity;
    }

    private class Entry {
        volatile Future future;
        AtomicInteger cnt = new AtomicInteger(1);
        T request;
        boolean sync = false;
        DefaultPromise<T> resfuture;
        final /* synthetic */ AbstractSessionStateManager this$0;

        /*
         * WARNING - Possible parameter corruption
         */
        Entry(T request, boolean sync) {
            this.this$0 = (AbstractSessionStateManager)n;
            this.request = request;
            this.sync = sync;
        }
    }
}

