/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.longtime.processor;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.longtime.LongTimePool;
import com.alibaba.dts.client.executor.longtime.unit.ExecutorUnit;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ReFillingProcessor
extends Thread
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(ReFillingProcessor.class);
    private ExecutorUnit executorUnit;
    private volatile boolean stop = true;
    private static final long REFILL_SLEEP_TIME_INTERVAL = 1000L;
    private static final long REFILL_PULLANDPUT_TIME_INTERVAL = 100L;
    private final ClientContextImpl clientContext;

    public ReFillingProcessor(ClientContextImpl clientContext, ExecutorUnit executorUnit) {
        this.clientContext = clientContext;
        this.executorUnit = executorUnit;
        super.setName("DtsReFillingProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    public void refresh(ExecutorUnit executorUnit) {
        this.executorUnit = executorUnit;
        super.setName("DtsReFillingProcessor-" + this.executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            this.stop = false;
            BlockingQueue<TaskSnapshot> queue = this.executorUnit.getQueue();
            BlockingQueue<TaskSnapshot> completeQueue = this.executorUnit.getCompletedqueue();
            while (!this.stop) {
                if (!this.isPause()) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (Throwable e) {
                        logger.error("[RefillingProcessor]:  RefillingProcessor sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    try {
                        this.pullAndPut(queue, completeQueue);
                    }
                    catch (Throwable e) {
                        logger.error("[RefillingProcessor]: pullAndPut error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                    }
                    continue;
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (Throwable e) {
                    logger.error("[RefillingProcessor]:  RefillingProcessor sleep error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
                }
            }
            return;
        }
        catch (Throwable e) {
            logger.error("[RefillingProcessor]: run error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            return;
        }
        finally {
            try {
                LongTimePool longTimePool = this.executorUnit.getLongTimePool();
                longTimePool.stopTask(this.executorUnit.getExecutableTask().getJob().getId(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            }
            catch (Throwable e) {
                logger.error("[RefillingProcessor]: finally stopTask error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            }
            finally {
                if (this.clientContext.getClientConfig().isFinishLog()) {
                    logger.warn("[RefillingProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
                }
            }
        }
    }

    private boolean isPause() {
        return this.executorUnit.isReleaseTaskFlag() || this.executorUnit.isPullTaskFlag();
    }

    private String isPausePrint() {
        return "isReleaseTaskFlag:" + this.executorUnit.isReleaseTaskFlag() + ",isPullTaskFlag:" + this.executorUnit.isPullTaskFlag();
    }

    private void pullAndPut(BlockingQueue<TaskSnapshot> queue, BlockingQueue<TaskSnapshot> completeQueue) {
        while (!this.isPause() && !completeQueue.isEmpty()) {
            TaskSnapshot taskSnapshot = null;
            try {
                taskSnapshot = completeQueue.poll(10000L, TimeUnit.MILLISECONDS);
                logger.info("[ReFillingProcessor]: completeQueue poll success, instanceId:" + taskSnapshot.getJobInstanceId() + ", taskid:" + taskSnapshot.getId());
            }
            catch (Throwable e) {
                logger.error("[RefillingProcessor]: take executableTask error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId(), e);
            }
            if (null == taskSnapshot) continue;
            try {
                queue.put(taskSnapshot);
                logger.info("[ReFillingProcessor]: queue put success, instanceId:" + taskSnapshot.getJobInstanceId() + ", taskid:" + taskSnapshot.getId());
            }
            catch (Throwable e) {
                logger.error("[RefillingProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
            }
        }
    }

    public boolean isStop() {
        return this.stop;
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }
}

