/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class JobVertexThreadInfoTrackerTest
extends TestLogger {
    private static final int REQUEST_ID = 0;
    private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = JobVertexThreadInfoTrackerTest.createExecutionJobVertex();
    private static final ExecutionVertex[] TASK_VERTICES = EXECUTION_JOB_VERTEX.getTaskVertices();
    private static final JobID JOB_ID = new JobID();
    private static ThreadInfoSample threadInfoSample;
    private static JobVertexThreadInfoStats threadInfoStatsDefaultSample;
    private static final Duration CLEAN_UP_INTERVAL;
    private static final Duration STATS_REFRESH_INTERVAL;
    private static final Duration TIME_GAP;
    private static final Duration SMALL_TIME_GAP;
    private static final Duration REQUEST_TIMEOUT;
    private static final int NUMBER_OF_SAMPLES = 1;
    private static final int MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DELAY_BETWEEN_SAMPLES;
    @Rule
    public Timeout caseTimeout = new Timeout(10L, TimeUnit.SECONDS);
    private static ScheduledExecutorService executor;

    @BeforeClass
    public static void setUp() {
        threadInfoSample = (ThreadInfoSample)JvmUtils.createThreadInfoSample((long)Thread.currentThread().getId(), (int)100).get();
        threadInfoStatsDefaultSample = JobVertexThreadInfoTrackerTest.createThreadInfoStats(0, SMALL_TIME_GAP, Collections.singletonList(threadInfoSample));
        executor = Executors.newScheduledThreadPool(1);
    }

    @AfterClass
    public static void tearDown() {
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testGetThreadInfoStats() throws Exception {
        this.doInitialRequestAndVerifyResult(this.createThreadInfoTracker());
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        boolean requestId2 = true;
        JobVertexThreadInfoStats threadInfoStats2 = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP, null);
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample, threadInfoStats2);
        this.doInitialRequestAndVerifyResult(tracker);
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        Assert.assertEquals((Object)threadInfoStatsDefaultSample, result.get());
    }

    @Test
    public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10L);
        long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 10L;
        boolean requestId2 = true;
        JobVertexThreadInfoStats threadInfoStats2 = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP, Collections.singletonList(threadInfoSample));
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, threadInfoStatsRefreshInterval2, threadInfoStatsDefaultSample, threadInfoStats2);
        this.doInitialRequestAndVerifyResult(tracker);
        Thread.sleep(waitingTime);
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStats2, result);
        Assert.assertNotSame(result.get(), (Object)threadInfoStatsDefaultSample);
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        Duration cleanUpInterval2 = Duration.ofMillis(10L);
        long waitingTime = cleanUpInterval2.toMillis() + 10L;
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(cleanUpInterval2, STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
        this.doInitialRequestAndVerifyResult(tracker);
        Thread.sleep(waitingTime);
        tracker.cleanUpVertexStatsCache();
        Assert.assertFalse((boolean)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX).isPresent());
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.cleanUpVertexStatsCache();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    @Test
    public void testShutDown() throws Exception {
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.shutDown();
        Assert.assertFalse((boolean)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX).isPresent());
        Assert.assertFalse((boolean)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX).isPresent());
    }

    private void doInitialRequestAndVerifyResult(JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker) throws InterruptedException, ExecutionException {
        Assert.assertFalse((boolean)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX).isPresent());
        tracker.getResultAvailableFuture().get();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    private static void assertExpectedEqualsReceived(JobVertexThreadInfoStats expected, Optional<JobVertexThreadInfoStats> receivedOptional) {
        Assert.assertTrue((boolean)receivedOptional.isPresent());
        JobVertexThreadInfoStats received = receivedOptional.get();
        Assert.assertEquals((long)expected.getRequestId(), (long)received.getRequestId());
        Assert.assertEquals((long)expected.getEndTime(), (long)received.getEndTime());
        Assert.assertEquals((long)TASK_VERTICES.length, (long)received.getNumberOfSubtasks());
        for (List samples : received.getSamplesBySubtask().values()) {
            Assert.assertThat((Object)samples.isEmpty(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker() {
        return this.createThreadInfoTracker(CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
    }

    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(Duration cleanUpInterval, Duration threadInfoStatsRefreshInterval, JobVertexThreadInfoStats ... stats) {
        TestingThreadInfoRequestCoordinator coordinator = new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
        return JobVertexThreadInfoTrackerBuilder.newBuilder(JobVertexThreadInfoTrackerTest::createMockResourceManagerGateway, Function.identity(), (ScheduledExecutorService)executor, (Time)TestingUtils.TIMEOUT()).setCoordinator((ThreadInfoRequestCoordinator)coordinator).setCleanUpInterval(cleanUpInterval).setNumSamples(1).setStatsRefreshInterval(threadInfoStatsRefreshInterval).setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES).setMaxThreadInfoDepth(100).build();
    }

    private static JobVertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap, List<ThreadInfoSample> threadInfoSamples) {
        long startTime = System.currentTimeMillis();
        long endTime = startTime + timeGap.toMillis();
        HashMap<ExecutionAttemptID, List<ThreadInfoSample>> threadInfoRatiosByTask = new HashMap<ExecutionAttemptID, List<ThreadInfoSample>>();
        for (ExecutionVertex vertex : TASK_VERTICES) {
            threadInfoRatiosByTask.put(vertex.getCurrentExecutionAttempt().getAttemptId(), threadInfoSamples);
        }
        return new JobVertexThreadInfoStats(requestId, startTime, endTime, threadInfoRatiosByTask);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            JobVertex jobVertex = new JobVertex("testVertex");
            jobVertex.setInvokableClass(AbstractInvokable.class);
            return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }

    private static CompletableFuture<ResourceManagerGateway> createMockResourceManagerGateway() {
        Function<ResourceID, CompletableFuture<TaskExecutorThreadInfoGateway>> function = resourceID -> CompletableFuture.completedFuture(null);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestTaskExecutorGatewayFunction(function);
        return CompletableFuture.completedFuture(testingResourceManagerGateway);
    }

    static {
        CLEAN_UP_INTERVAL = Duration.ofSeconds(60L);
        STATS_REFRESH_INTERVAL = Duration.ofSeconds(60L);
        TIME_GAP = Duration.ofSeconds(60L);
        SMALL_TIME_GAP = Duration.ofMillis(1L);
        REQUEST_TIMEOUT = Duration.ofSeconds(10L);
        DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    }

    private static class TestingThreadInfoRequestCoordinator
    extends ThreadInfoRequestCoordinator {
        private final JobVertexThreadInfoStats[] jobVertexThreadInfoStats;
        private int counter = 0;

        TestingThreadInfoRequestCoordinator(Executor executor, Duration requestTimeout, JobVertexThreadInfoStats ... jobVertexThreadInfoStats) {
            super(executor, requestTimeout);
            this.jobVertexThreadInfoStats = jobVertexThreadInfoStats;
        }

        public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> ignored1, int ignored2, Duration ignored3, int ignored4) {
            return CompletableFuture.completedFuture(this.jobVertexThreadInfoStats[this.counter++ % this.jobVertexThreadInfoStats.length]);
        }
    }
}

