/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask;
import org.apache.flink.connector.base.source.reader.fetcher.FetchTask;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
    private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");
    private final int id;
    private final BlockingDeque<SplitFetcherTask> taskQueue;
    private final Map<String, SplitT> assignedSplits;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;
    private final AtomicBoolean wakeUp;
    private final AtomicBoolean closed;
    private final FetchTask<E, SplitT> fetchTask;
    private volatile SplitFetcherTask runningTask = null;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private volatile boolean isIdle;

    SplitFetcher(int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook) {
        this.id = id;
        this.taskQueue = new LinkedBlockingDeque<SplitFetcherTask>();
        this.elementsQueue = elementsQueue;
        this.assignedSplits = new HashMap<String, SplitT>();
        this.splitReader = splitReader;
        this.errorHandler = errorHandler;
        this.shutdownHook = shutdownHook;
        this.isIdle = true;
        this.wakeUp = new AtomicBoolean(false);
        this.closed = new AtomicBoolean(false);
        this.fetchTask = new FetchTask<E, SplitT>(splitReader, elementsQueue, ids -> {
            ids.forEach(this.assignedSplits::remove);
            LOG.info("Finished reading from splits {}", ids);
        }, id);
    }

    @Override
    public void run() {
        LOG.info("Starting split fetcher {}", (Object)this.id);
        try {
            while (!this.closed.get()) {
                this.runOnce();
            }
        }
        catch (Throwable t) {
            this.errorHandler.accept(t);
        }
        finally {
            try {
                this.splitReader.close();
            }
            catch (Exception e) {
                this.errorHandler.accept(e);
            }
            LOG.info("Split fetcher {} exited.", (Object)this.id);
            this.shutdownHook.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runOnce() {
        try {
            this.runningTask = this.shouldRunFetchTask() ? this.fetchTask : this.taskQueue.take();
            LOG.debug("Prepare to run {}", (Object)this.runningTask);
            if (!this.wakeUp.get() && this.runningTask.run()) {
                LOG.debug("Finished running task {}", (Object)this.runningTask);
                this.runningTask = null;
                this.checkAndSetIdle();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("SplitFetcher thread %d received unexpected exception while polling the records", this.id), e);
        }
        this.maybeEnqueueTask(this.runningTask);
        AtomicBoolean atomicBoolean = this.wakeUp;
        synchronized (atomicBoolean) {
            this.runningTask = null;
            this.wakeUp.set(false);
            LOG.debug("Cleaned wakeup flag.");
        }
    }

    public void addSplits(List<SplitT> splitsToAdd) {
        this.enqueueTask(new AddSplitsTask<SplitT>(this.splitReader, splitsToAdd, this.assignedSplits));
        this.wakeUp(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueTask(SplitFetcherTask task) {
        Object object = this.lock;
        synchronized (object) {
            this.taskQueue.offer(task);
            this.isIdle = false;
        }
    }

    public SplitReader<E, SplitT> getSplitReader() {
        return this.splitReader;
    }

    public void shutdown() {
        if (this.closed.compareAndSet(false, true)) {
            LOG.info("Shutting down split fetcher {}", (Object)this.id);
            this.wakeUp(false);
        }
    }

    Map<String, SplitT> assignedSplits() {
        return this.assignedSplits;
    }

    boolean isIdle() {
        return this.isIdle;
    }

    boolean shouldRunFetchTask() {
        return this.taskQueue.isEmpty() && !this.assignedSplits.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void wakeUp(boolean taskOnly) {
        AtomicBoolean atomicBoolean = this.wakeUp;
        synchronized (atomicBoolean) {
            this.wakeUp.set(true);
            SplitFetcherTask currentTask = this.runningTask;
            if (this.isRunningTask(currentTask)) {
                LOG.debug("Waking up running task {}", (Object)currentTask);
                currentTask.wakeUp();
            } else if (!taskOnly) {
                LOG.debug("Waking up fetcher thread.");
                this.taskQueue.add(WAKEUP_TASK);
            }
        }
    }

    private void maybeEnqueueTask(SplitFetcherTask task) {
        if (!this.closed.get() && this.isRunningTask(task) && task != this.fetchTask && !this.taskQueue.offerFirst(task)) {
            throw new RuntimeException("The task queue is full. This is only theoretically possible when really bad thing happens.");
        }
        if (task != null) {
            LOG.debug("Enqueued task {}", (Object)task);
        }
    }

    private boolean isRunningTask(SplitFetcherTask task) {
        return task != null && task != WAKEUP_TASK;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndSetIdle() {
        if (this.shouldIdle()) {
            Object object = this.lock;
            synchronized (object) {
                if (this.shouldIdle()) {
                    this.isIdle = true;
                }
            }
            this.elementsQueue.notifyAvailable();
        }
    }

    private boolean shouldIdle() {
        return this.assignedSplits.isEmpty() && this.taskQueue.isEmpty();
    }

    private static class DummySplitFetcherTask
    implements SplitFetcherTask {
        private final String name;

        private DummySplitFetcherTask(String name) {
            this.name = name;
        }

        @Override
        public boolean run() {
            return false;
        }

        @Override
        public void wakeUp() {
        }

        public String toString() {
            return this.name;
        }
    }
}

