/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Preconditions;

public class TestingSplitEnumeratorContext<SplitT extends SourceSplit>
implements SplitEnumeratorContext<SplitT> {
    private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
    private final HashMap<Integer, SplitAssignmentState<SplitT>> splitAssignments = new HashMap();
    private final HashMap<Integer, List<SourceEvent>> events = new HashMap();
    private final HashMap<Integer, ReaderInfo> registeredReaders = new HashMap();
    private final int parallelism;

    public TestingSplitEnumeratorContext(int parallelism) {
        this.parallelism = parallelism;
    }

    public void triggerAllActions() {
        this.executor.triggerPeriodicScheduledTasks();
        this.executor.triggerAll();
    }

    public ManuallyTriggeredScheduledExecutorService getExecutorService() {
        return this.executor;
    }

    public Map<Integer, SplitAssignmentState<SplitT>> getSplitAssignments() {
        return this.splitAssignments;
    }

    public Map<Integer, List<SourceEvent>> getSentEvents() {
        return this.events;
    }

    public void registerReader(int subtask, String hostname) {
        Preconditions.checkState((!this.registeredReaders.containsKey(subtask) ? 1 : 0) != 0, (Object)"Reader already registered");
        this.registeredReaders.put(subtask, new ReaderInfo(subtask, hostname));
    }

    public MetricGroup metricGroup() {
        return new UnregisteredMetricsGroup();
    }

    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
        List eventsForSubTask = this.events.computeIfAbsent(subtaskId, key -> new ArrayList());
        eventsForSubTask.add(event);
    }

    public int currentParallelism() {
        return this.parallelism;
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.registeredReaders;
    }

    public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) {
        for (Map.Entry entry : newSplitAssignments.assignment().entrySet()) {
            SplitAssignmentState assignment = this.splitAssignments.computeIfAbsent((Integer)entry.getKey(), (Function<Integer, SplitAssignmentState<SplitT>>)((Function<Integer, SplitAssignmentState>)key -> new SplitAssignmentState()));
            assignment.splits.addAll((Collection)entry.getValue());
        }
    }

    public void signalNoMoreSplits(int subtask) {
        SplitAssignmentState assignment = this.splitAssignments.computeIfAbsent(subtask, key -> new SplitAssignmentState());
        assignment.noMoreSplits = true;
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
        this.executor.execute(TestingSplitEnumeratorContext.callableWithResultHandler(callable, handler));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
        this.executor.scheduleWithFixedDelay(TestingSplitEnumeratorContext.callableWithResultHandler(callable, handler), initialDelay, period, TimeUnit.MILLISECONDS);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.executor.execute(runnable);
    }

    private static <T> Runnable callableWithResultHandler(Callable<T> callable, BiConsumer<T, Throwable> handler) {
        return () -> {
            try {
                Object result = callable.call();
                handler.accept(result, null);
            }
            catch (Throwable t) {
                handler.accept(null, t);
            }
        };
    }

    public static final class SplitAssignmentState<SplitT extends SourceSplit> {
        final List<SplitT> splits = new ArrayList<SplitT>();
        boolean noMoreSplits;

        public List<SplitT> getAssignedSplits() {
            return this.splits;
        }

        public boolean hasReceivedNoMoreSplitsSignal() {
            return this.noMoreSplits;
        }
    }
}

