/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.elasticsearch.processor.impl;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.search.elasticsearch.logging.impl.Log;
import org.hibernate.search.elasticsearch.processor.impl.AbstractBarrierElasticsearchWorkOrchestrator;
import org.hibernate.search.elasticsearch.processor.impl.BarrierElasticsearchWorkOrchestrator;
import org.hibernate.search.elasticsearch.processor.impl.FlushableElasticsearchWorkOrchestrator;
import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWork;
import org.hibernate.search.exception.ErrorHandler;
import org.hibernate.search.util.impl.Closer;
import org.hibernate.search.util.impl.CollectionHelper;
import org.hibernate.search.util.impl.Executors;
import org.hibernate.search.util.impl.Futures;
import org.hibernate.search.util.logging.impl.LoggerFactory;

class BatchingSharedElasticsearchWorkOrchestrator
extends AbstractBarrierElasticsearchWorkOrchestrator
implements BarrierElasticsearchWorkOrchestrator,
AutoCloseable {
    private static final Log LOG = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private final FlushableElasticsearchWorkOrchestrator delegate;
    private final ErrorHandler errorHandler;
    private final int changesetsPerBatch;
    private final ExecutorService executor;
    private final BlockingQueue<Changeset> changesetQueue;
    private final List<Changeset> changesetBuffer;
    private final AtomicBoolean processingScheduled;
    private final Phaser phaser = new Phaser(){

        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            return false;
        }
    };

    public BatchingSharedElasticsearchWorkOrchestrator(String name, int maxChangesetsPerBatch, boolean fair, FlushableElasticsearchWorkOrchestrator delegate, ErrorHandler errorHandler) {
        super(name);
        this.delegate = delegate;
        this.errorHandler = errorHandler;
        this.changesetsPerBatch = maxChangesetsPerBatch;
        this.changesetQueue = new ArrayBlockingQueue<Changeset>(maxChangesetsPerBatch, fair);
        this.changesetBuffer = CollectionHelper.newArrayList((int)maxChangesetsPerBatch);
        this.executor = Executors.newFixedThreadPool((int)1, (String)name);
        this.processingScheduled = new AtomicBoolean(false);
    }

    public BarrierElasticsearchWorkOrchestrator createChild(String name) {
        return new ChildOrchestrator(name);
    }

    @Override
    protected CompletableFuture<Void> doSubmit(Iterable<ElasticsearchWork<?>> works) throws InterruptedException {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.changesetQueue.put(new Changeset(works, future));
        this.ensureProcessingScheduled();
        return future;
    }

    private void ensureProcessingScheduled() {
        block10: {
            if (!this.processingScheduled.get()) {
                this.phaser.register();
                try {
                    if (this.processingScheduled.compareAndSet(false, true)) {
                        try {
                            this.executor.submit(this::processBatch);
                            break block10;
                        }
                        catch (Throwable e) {
                            try {
                                this.processingScheduled.set(false);
                            }
                            catch (Throwable e2) {
                                e.addSuppressed(e2);
                            }
                            throw e;
                        }
                    }
                    this.phaser.arriveAndDeregister();
                }
                catch (Throwable e) {
                    try {
                        this.phaser.arriveAndDeregister();
                    }
                    catch (Throwable e2) {
                        e.addSuppressed(e2);
                    }
                    throw e;
                }
            }
        }
    }

    @Override
    public void awaitCompletion() throws InterruptedException {
        int phaseBeforeUnarrivedPartiesCheck = this.phaser.getPhase();
        if (this.phaser.getUnarrivedParties() > 0) {
            this.phaser.awaitAdvanceInterruptibly(phaseBeforeUnarrivedPartiesCheck);
        }
    }

    @Override
    protected void doClose() {
        try (Closer closer = new Closer();){
            closer.push(() -> this.awaitCompletionBeforeClose(this.getName()));
            closer.push(this.executor::shutdownNow);
            closer.push(this.phaser::forceTermination);
        }
    }

    private void awaitCompletionBeforeClose(String name) {
        try {
            this.awaitCompletion();
        }
        catch (InterruptedException e) {
            LOG.interruptedWhileWaitingForIndexActivity(name, e);
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processBatch() {
        try {
            CompletableFuture<Void> future;
            try {
                FlushableElasticsearchWorkOrchestrator flushableElasticsearchWorkOrchestrator = this.delegate;
                synchronized (flushableElasticsearchWorkOrchestrator) {
                    this.delegate.reset();
                    this.changesetBuffer.clear();
                    this.changesetQueue.drainTo(this.changesetBuffer, this.changesetsPerBatch);
                    for (Changeset changeset : this.changesetBuffer) {
                        try {
                            this.delegate.submit(changeset.works).whenComplete(Futures.copyHandler((CompletableFuture)changeset.future));
                        }
                        catch (Throwable e) {
                            changeset.future.completeExceptionally(e);
                            throw e;
                        }
                    }
                    future = this.delegate.flush();
                }
            }
            finally {
                try {
                    this.processingScheduled.set(false);
                    if (!this.changesetQueue.isEmpty()) {
                        this.ensureProcessingScheduled();
                    }
                }
                catch (Throwable e) {
                    this.errorHandler.handleException("Error while ensuring the next submitted asynchronous Elasticsearch works will be processed", e);
                }
            }
            future.join();
        }
        catch (Throwable e) {
            this.errorHandler.handleException("Error while processing Elasticsearch works", e);
        }
        finally {
            this.phaser.arriveAndDeregister();
        }
    }

    private class ChildOrchestrator
    extends AbstractBarrierElasticsearchWorkOrchestrator
    implements BarrierElasticsearchWorkOrchestrator {
        protected ChildOrchestrator(String name) {
            super(name);
        }

        @Override
        protected CompletableFuture<Void> doSubmit(Iterable<ElasticsearchWork<?>> works) throws InterruptedException {
            return BatchingSharedElasticsearchWorkOrchestrator.this.submit(works);
        }

        @Override
        public void awaitCompletion() throws InterruptedException {
            BatchingSharedElasticsearchWorkOrchestrator.this.awaitCompletion();
        }

        @Override
        protected void doClose() {
            BatchingSharedElasticsearchWorkOrchestrator.this.awaitCompletionBeforeClose(this.getName());
        }
    }

    private static class Changeset {
        private final Iterable<ElasticsearchWork<?>> works;
        private final CompletableFuture<Void> future;

        public Changeset(Iterable<ElasticsearchWork<?>> works, CompletableFuture<Void> future) {
            this.works = works;
            this.future = future;
        }
    }
}

