/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.grid;

import com.alibaba.dts.client.executor.grid.flowcontrol.FlowControlParameterWatcher;
import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.grid.queue.send.SendManager;
import com.alibaba.dts.client.executor.grid.unit.FlexibleThreadPoolExecutor;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.AccessException;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.util.BytesUtil;
import com.alibaba.dts.common.util.BytesUtil4Client;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class GridTaskSender {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(GridTaskSender.class);
    private SendManager sendManager = new SendManager();
    private long maxBodySize;
    private long flowControlCountGate = 500000L;
    private final ClientContextImpl clientContext;
    private ConcurrentHashMap<Long, BlockingQueue<List<TaskEvent>>> tasksForInsertBufferMap = new ConcurrentHashMap();
    private ConcurrentHashMap<Long, Boolean> tasksForInsertBufferMapFlag = new ConcurrentHashMap();
    private ExecutorService reSendExecutorService = new FlexibleThreadPoolExecutor(8, 16, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new NamedThreadFactory("SchedulerX-Tasks-ReSend-Thread-"));

    public GridTaskSender(ClientContextImpl clientContext) {
        this.clientContext = clientContext;
    }

    private void initSchedulerXSendQueue() throws InitException {
        if (this.clientContext == null) {
            logger.equals("[GridTaskSender] clientContext is null!");
            throw new InitException("[GridTaskSender] clientContext is null!");
        }
        this.sendManager.init(this.clientContext);
    }

    public void init() throws InitException {
        try {
            this.maxBodySize = this.clientContext.getClientConfig().getMaxBodySize();
            this.initSchedulerXSendQueue();
            this.initTasksForInsertBufferConsumer();
            logger.info("[GridTaskSender]: init over");
        }
        catch (Throwable e) {
            logger.error("[GridTaskSender]: init error:", e);
        }
    }

    private void initTasksForInsertBufferConsumer() throws InitException {
        new TaskForInsertConsumer().init();
    }

    public Result<Boolean> dispatchTaskList(List<? extends Object> taskList, String taskName, JobContext jobContext, int dispatchMode) {
        long jobId = jobContext.getJob().getId();
        long jobInstanceId = jobContext.getJobInstanceSnapshot().getId();
        logger.info("{} tasks dispatched, jobId={}, jobInstanceId={}", taskList.size(), jobId, jobInstanceId);
        if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(jobContext.getJobInstanceSnapshot().getId())) {
            logger.warn("[GridTaskSender]: dispatchTaskList is interrupted forcedly:,jobId=" + jobId + ",jobInstanceId=" + jobInstanceId + ",total tasks=" + taskList.size());
            return new Result<Boolean>(false);
        }
        boolean result = true;
        ArrayList<TaskEvent> taskEvents = new ArrayList<TaskEvent>();
        try {
            List<RemoteMachine> nodes = null;
            int retryCount = 10;
            while (retryCount > 0 && ((nodes = this.clientContext.getNodeRemoting().getNodes(this.clientContext.getClientConfig().getGroupId(), jobId, jobContext.getJob().getType())) == null || nodes.isEmpty())) {
                logger.warn("there is no working nodes existed, groupId={}, jobId={}, so it will retry {} times", this.clientContext.getClientConfig().getGroupId(), jobId, retryCount);
                if (--retryCount <= 0) {
                    throw new RuntimeException("there is no working nodes existed, groupId={}, jobId={}, and it has retried for 10 times");
                }
                TimeUnit.SECONDS.sleep(10L);
            }
            this.sendManager.resetRoutesMachines(jobContext.getJob().getId(), nodes);
            long successProcess = 0L;
            long totalStart = System.currentTimeMillis();
            int batchSize = 3000;
            for (int position = 0; position < taskList.size(); ++position) {
                TaskEvent taskEvent;
                TaskSnapshot taskSnapshot;
                if (this.isInterruptedInstance(jobContext.getJobInstanceSnapshot().getId())) {
                    logger.warn("[GridTaskSender]: dispatchTaskList is interrupted forcedly:,jobId:" + jobContext.getJob().getId() + ",jobInstanceId:" + jobContext.getJobInstanceSnapshot().getId() + ",total tasks:" + taskList.size() + ",put schedulerXSendQueue tasks:" + successProcess);
                    Result<Boolean> resultIntercept = new Result<Boolean>(false);
                    resultIntercept.setResultCode(ResultCode.TASK_SEND_INTERRUPT);
                    return resultIntercept;
                }
                ExecutableTask executableTask = null;
                long start = System.currentTimeMillis();
                if (dispatchMode == 1) {
                    executableTask = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                    this.fillTaskSnapshot(executableTask, taskList.get(position), taskName);
                    TaskEvent taskEvent2 = new TaskEvent();
                    taskEvent2.setExecutableTask(executableTask);
                    taskEvent2.setTask(taskList.get(position));
                    if ((position + 1) % batchSize == 0) {
                        taskEvents.add(taskEvent2);
                        this.clientContext.getFlowControlChain().control(jobContext);
                        this.persistTasks(taskEvents, jobInstanceId);
                        this.sendManager.putTasksToRouteQueue(taskEvents, jobInstanceId);
                        long end = System.currentTimeMillis();
                        logger.debug("[GridTaskSender]: dispatchTaskList,executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, taskName={}, consumptionTime={}, size={}", executableTask.getJob().getId(), executableTask.getJobInstanceSnapshot().getId(), taskName, end - start, taskEvents.size());
                        taskEvents.clear();
                    } else {
                        taskEvents.add(taskEvent2);
                    }
                    ++successProcess;
                    continue;
                }
                if (dispatchMode == 2) {
                    taskSnapshot = (TaskSnapshot)taskList.get(position);
                    executableTask = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                    executableTask.setTaskSnapshot(taskSnapshot);
                    executableTask.getTaskSnapshot().setReSent(true);
                    executableTask.setCompensation(true);
                    taskEvent = new TaskEvent();
                    taskEvent.setExecutableTask(executableTask);
                    taskEvent.setTask(this.getTaskObject(taskSnapshot));
                    this.sendManager.putSingleTaskToRouteQueue(taskEvent);
                    ++successProcess;
                    continue;
                }
                if (dispatchMode != 3) continue;
                taskSnapshot = (TaskSnapshot)taskList.get(position);
                executableTask = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                executableTask.setTaskSnapshot(taskSnapshot);
                taskEvent = new TaskEvent();
                taskEvent.setExecutableTask(executableTask);
                taskEvent.setTask(this.getTaskObject(taskSnapshot));
                this.sendManager.putSingleTaskToRouteQueue(taskEvent);
                ++successProcess;
            }
            if (dispatchMode == 1 && taskEvents.size() > 0) {
                this.clientContext.getFlowControlChain().control(jobContext);
                this.persistTasks(taskEvents, jobInstanceId);
                this.sendManager.putTasksToRouteQueue(taskEvents, jobInstanceId);
                taskEvents.clear();
            }
            long totalEnd = System.currentTimeMillis();
            logger.debug("[GridTaskSender]: dispatchTaskList,executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, total={}, put schedulerXSendQueue tasks={}, taskName={}, consumptionTime={}", jobContext.getJob().getId(), jobContext.getJobInstanceSnapshot().getId(), taskList.size(), successProcess, taskName, totalEnd - totalStart);
        }
        catch (Throwable e) {
            result = false;
            logger.error("[GridTaskSender]: dispatchTaskList error,", e);
        }
        return new Result<Boolean>(result);
    }

    private int calculateBatchSize(List<? extends Object> taskList, List<RemoteMachine> nodes) {
        int batchSize = this.clientContext.getNodeConfig().getTaskInsertBatchSize();
        if (batchSize == 0) {
            batchSize = taskList.size() < nodes.size() * 2 ? 1 : taskList.size() / (nodes.size() * 2);
            if (batchSize > 3000) {
                batchSize = 3000;
            }
            return batchSize;
        }
        return batchSize;
    }

    public Result<Boolean> dispatchTaskList(List<? extends Object> taskList, String taskName, JobContext jobContext) {
        return this.dispatchTaskList(taskList, taskName, jobContext, 1);
    }

    public Result<Boolean> dispatchCompensateTaskList(List<TaskSnapshot> taskList, JobContext jobContext) {
        return this.dispatchTaskList(taskList, null, jobContext, 2);
    }

    public Result<Boolean> dispatchRetryTaskList(List<TaskSnapshot> taskList, JobContext jobContext) {
        return this.dispatchTaskList(taskList, null, jobContext, 3);
    }

    private void fillTaskSnapshot(ExecutableTask executableTask, Object task, String taskName) {
        byte[] body = null;
        try {
            body = BytesUtil4Client.objectToBytes(task);
        }
        catch (Throwable e) {
            logger.error("[GridTaskSender]: fillTaskSnapshot objectToBytes error, taskName:" + taskName + ", task:" + task, e);
        }
        if (BytesUtil4Client.isEmpty(body)) {
            logger.error("[GridTaskSender]: fillTaskSnapshot objectToBytes body is empty, taskName:" + taskName + ", task:" + task);
            return;
        }
        if ((long)body.length > this.maxBodySize) {
            throw new RuntimeException("[GridTaskSender]: single task is too large, more than 64KB");
        }
        TaskSnapshot taskSnapshot = new TaskSnapshot();
        taskSnapshot.setGmtCreate(new Date());
        taskSnapshot.setGmtModified(new Date());
        taskSnapshot.setJobInstanceId(executableTask.getJobInstanceSnapshot().getId());
        taskSnapshot.setJobProcessor(executableTask.getJob().getJobProcessor());
        taskSnapshot.setBody(body);
        taskSnapshot.setStatus(0);
        taskSnapshot.setTaskName(taskName);
        taskSnapshot.setRetryCount(0);
        executableTask.setTaskSnapshot(taskSnapshot);
    }

    protected Object getTaskObject(TaskSnapshot taskSnapshot) {
        Object resultTask = null;
        if ("defaultTaskName4DtsServerSelf".equals(taskSnapshot.getTaskName())) {
            if (BytesUtil.isEmpty(taskSnapshot.getBody())) {
                logger.error("[GridTaskSender]: BytesUtil setTask bytesToObject error, body is empty, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId());
                return null;
            }
            try {
                resultTask = BytesUtil.bytesToObject(taskSnapshot.getBody());
            }
            catch (Throwable e) {
                logger.error("[GridTaskSender]: BytesUtil setTask bytesToObject error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
            }
        } else {
            if (BytesUtil4Client.isEmpty(taskSnapshot.getBody())) {
                logger.error("[GridTaskSender]: BytesUtil4Client setTask bytesToObject error, body is empty, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId());
                return null;
            }
            try {
                resultTask = BytesUtil4Client.bytesToObject(taskSnapshot.getBody());
            }
            catch (Throwable e) {
                logger.error("[GridTaskSender]: BytesUtil4Client setTask bytesToObject error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), e);
            }
        }
        return resultTask;
    }

    private void persistTasks(List<TaskEvent> taskEvents, long jobInstanceId) {
        if (taskEvents.isEmpty()) {
            return;
        }
        ArrayList<TaskSnapshot> taskSnapshots = new ArrayList<TaskSnapshot>();
        if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(jobInstanceId)) {
            return;
        }
        for (TaskEvent taskEvent : taskEvents) {
            TaskSnapshot taskSnapshot = taskEvent.getExecutableTask().getTaskSnapshot();
            taskSnapshot.setId(this.clientContext.getIdWorker().nextId());
            taskSnapshots.add(taskSnapshot);
        }
        try {
            this.clientContext.getStore().getTaskSnapshotDao().insertBatch(taskSnapshots);
        }
        catch (AccessException e) {
            logger.error("failed to insert tasksnapshots batch. jobInstanceId=" + jobInstanceId, e);
        }
    }

    public void addInterruptedJobInstance(long instanceId) {
        try {
            this.clientContext.getGridJobManager().addInterruptedJobInstance(instanceId);
            logger.info("[GridTaskSender]: addInterceptInstance instanceId:" + instanceId);
        }
        catch (Throwable e) {
            logger.error("[GridTaskSender]: addInterceptInstance error,instanceId:" + instanceId, e);
        }
    }

    public void removeInterruptedJobInstance(long instanceId) {
        try {
            this.clientContext.getGridJobManager().removeInterruptedJobInstance(instanceId);
            logger.info("[GridTaskSender]: removeInterceptInstance instanceId:" + instanceId);
        }
        catch (Throwable e) {
            logger.error("[GridTaskSender]: removeInterceptInstance error,instanceId:" + instanceId, e);
        }
    }

    public boolean isInterruptedInstance(long instanceId) {
        return this.clientContext.getGridJobManager().containsInterruptedJobInstance(instanceId);
    }

    public ExecutorService getReSendExecutorService() {
        return this.reSendExecutorService;
    }

    public ConcurrentHashMap<Long, BlockingQueue<List<TaskEvent>>> getTasksForInsertBufferMap() {
        return this.tasksForInsertBufferMap;
    }

    public void clearInsertBuffer(long jobInstanceId) {
        this.tasksForInsertBufferMap.remove(jobInstanceId);
        this.tasksForInsertBufferMapFlag.remove(jobInstanceId);
    }

    public SendManager getSendManager() {
        return this.sendManager;
    }

    class TaskForInsertConsumer {
        private ExecutorService bossThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory("SchedulerX-TaskForInsertConsumer-Boss-"));
        private ExecutorService workerThreadPool = Executors.newFixedThreadPool(16, new NamedThreadFactory("SchedulerX-TaskForInsertConsumer-Worker-"));

        TaskForInsertConsumer() {
        }

        public void init() throws InitException {
            try {
                this.bossThreadPool.submit(new Runnable(){

                    @Override
                    public void run() {
                        while (true) {
                            try {
                                while (true) {
                                    for (Map.Entry entry : GridTaskSender.this.tasksForInsertBufferMap.entrySet()) {
                                        final Long jobInstanceId = (Long)entry.getKey();
                                        final BlockingQueue queue = (BlockingQueue)entry.getValue();
                                        if (GridTaskSender.this.tasksForInsertBufferMapFlag.containsKey(jobInstanceId)) continue;
                                        GridTaskSender.this.tasksForInsertBufferMapFlag.put(jobInstanceId, true);
                                        TaskForInsertConsumer.this.workerThreadPool.submit(new Runnable(){

                                            @Override
                                            public void run() {
                                                block2: while (GridTaskSender.this.tasksForInsertBufferMap.containsKey(jobInstanceId)) {
                                                    try {
                                                        List taskEvents = (List)queue.poll(10L, TimeUnit.SECONDS);
                                                        if (taskEvents == null || taskEvents.size() <= 0) continue;
                                                        while (true) {
                                                            if (FlowControlParameterWatcher.dbTasksCount.get() < GridTaskSender.this.flowControlCountGate) {
                                                                TaskEvent taskEvent = (TaskEvent)taskEvents.get(0);
                                                                GridTaskSender.this.persistTasks(taskEvents, jobInstanceId);
                                                                GridTaskSender.this.sendManager.putTasksToRouteQueue(taskEvents, jobInstanceId);
                                                                logger.debug("[TaskForInsertConsumer]: dispatchTaskList, executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, size={}", taskEvent.getExecutableTask().getJob().getId(), taskEvent.getExecutableTask().getJobInstanceSnapshot().getId(), taskEvents.size());
                                                                continue block2;
                                                            }
                                                            TimeUnit.SECONDS.sleep(5L);
                                                        }
                                                    }
                                                    catch (Throwable throwable) {
                                                        logger.error("TaskForInsertConsumer error", throwable);
                                                    }
                                                }
                                            }
                                        });
                                    }
                                    TimeUnit.SECONDS.sleep(1L);
                                }
                            }
                            catch (Throwable throwable) {
                                logger.error("TaskForInsertConsumer error", throwable);
                                continue;
                            }
                            break;
                        }
                    }
                });
            }
            catch (Throwable throwable) {
                throw new InitException(throwable);
            }
        }
    }
}

