/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.parallel.unit;

import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.parallel.ParallelPool;
import com.alibaba.dts.client.executor.parallel.processor.ParallelTaskProcessor;
import com.alibaba.dts.client.executor.parallel.processor.PullProcessor;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.util.CollectionUtils;

public class ExecutorUnit
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(ExecutorUnit.class);
    private ExecutableTask executableTask;
    private PullProcessor pullProcessor = null;
    private BlockingQueue<TaskSnapshot> queue = null;
    private ParallelTaskProcessor[] parallelTaskProcessors = null;
    private final AtomicInteger threadCounter = new AtomicInteger();
    private final ParallelPool parallelPool;
    private final ClientContextImpl clientContext;

    public ExecutorUnit(ClientContextImpl clientContext, ParallelPool parallelPool, ExecutableTask executableTask) {
        this.clientContext = clientContext;
        this.parallelPool = parallelPool;
        this.executableTask = executableTask;
        int pageSize = this.clientContext.getClientConfig().getPageSize();
        Map<String, Integer> pageSizeMap = this.clientContext.getClientConfig().getPageSizeMap();
        if (!CollectionUtils.isEmpty(pageSizeMap) && pageSizeMap.get(executableTask.getJob().getJobProcessor()) != null) {
            pageSize = this.clientContext.getClientConfig().checkPageSize(pageSizeMap.get(executableTask.getJob().getJobProcessor()));
        }
        this.executableTask.setLength(pageSize);
    }

    public void refresh(ExecutableTask executableTask) {
        this.executableTask = executableTask;
        int pageSize = this.clientContext.getClientConfig().getPageSize();
        Map<String, Integer> pageSizeMap = this.clientContext.getClientConfig().getPageSizeMap();
        if (!CollectionUtils.isEmpty(pageSizeMap) && pageSizeMap.get(executableTask.getJob().getJobProcessor()) != null) {
            pageSize = this.clientContext.getClientConfig().checkPageSize(pageSizeMap.get(executableTask.getJob().getJobProcessor()));
        }
        this.executableTask.setLength(pageSize);
        for (int i = 0; i < this.parallelTaskProcessors.length; ++i) {
            this.parallelTaskProcessors[i].refresh(this, i);
        }
        this.pullProcessor.refresh(this);
    }

    public void init() throws InitException {
        this.pullProcessor = new PullProcessor(this.clientContext, this);
        this.queue = new LinkedBlockingQueue<TaskSnapshot>(this.clientContext.getClientConfig().getQueueSize());
        this.pullProcessor.start();
        int consumerThreads = this.clientContext.getClientConfig().getConsumerThreads();
        Map<String, Integer> consumerThreadsMap = this.clientContext.getClientConfig().getConsumerThreadsMap();
        if (!CollectionUtils.isEmpty(consumerThreadsMap) && consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()) != null) {
            consumerThreads = this.clientContext.getClientConfig().checkConsumerThreads(consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()));
        }
        if (this.executableTask.getRunThreads() > 0) {
            consumerThreads = this.executableTask.getRunThreads();
        }
        this.parallelTaskProcessors = new ParallelTaskProcessor[consumerThreads];
        for (int i = 0; i < consumerThreads; ++i) {
            this.parallelTaskProcessors[i] = new ParallelTaskProcessor(this.clientContext, this, i, this.threadCounter);
            this.parallelTaskProcessors[i].start();
        }
    }

    public void clear() {
        try {
            this.queue.clear();
        }
        catch (Throwable e) {
            logger.error("[ExecutorUnit]: clear error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
    }

    public void stopTask() {
        try {
            if (this.pullProcessor != null) {
                this.pullProcessor.setStop(true);
            }
            for (int i = 0; i < this.parallelTaskProcessors.length; ++i) {
                if (this.parallelTaskProcessors[i] == null) continue;
                this.parallelTaskProcessors[i].setStop(true);
            }
            this.clear();
        }
        catch (Throwable e) {
            logger.error("[ExecutorUnit]: stopTask error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
    }

    public void forceStopTask() {
        try {
            this.pullProcessor.stop();
        }
        catch (Throwable e) {
            logger.error("[ExecutorUnit]: forceStopTask pullProcessor error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
        }
        this.clear();
        for (int i = 0; i < this.parallelTaskProcessors.length; ++i) {
            try {
                this.parallelTaskProcessors[i].stop();
                continue;
            }
            catch (Throwable e) {
                logger.error("[ExecutorUnit]: forceStopTask parallelTaskProcessors error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), e);
            }
        }
    }

    public boolean isExecutorStop() {
        if (this.queue == null) {
            return true;
        }
        return this.queue.isEmpty() && this.threadCounter.get() == 0;
    }

    public boolean offer(TaskSnapshot taskSnapshot) {
        boolean result = false;
        try {
            result = this.queue.offer(taskSnapshot, 5000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            logger.error("[ExecutorUnit]: offer error, jobInstanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
        }
        return result;
    }

    public boolean isExistsProcessors() {
        Boolean result = false;
        if (this.parallelTaskProcessors == null) {
            return false;
        }
        for (int i = 0; i < this.parallelTaskProcessors.length; ++i) {
            if (this.parallelTaskProcessors[i].isStop()) continue;
            return true;
        }
        return result;
    }

    public ExecutableTask getExecutableTask() {
        return this.executableTask;
    }

    public BlockingQueue<TaskSnapshot> getQueue() {
        return this.queue;
    }

    public ParallelTaskProcessor[] getParallelTaskProcessors() {
        return this.parallelTaskProcessors;
    }

    public AtomicInteger getThreadCounter() {
        return this.threadCounter;
    }

    public ParallelPool getParallelPool() {
        return this.parallelPool;
    }

    public String toString() {
        return "ExecutorUnit [executableTask=" + this.executableTask + "]";
    }
}

