/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.support;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.IntFunction;
import java.util.function.ObjIntConsumer;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import org.infinispan.Cache;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.AbstractSegmentedStoreConfiguration;
import org.infinispan.configuration.cache.HashConfiguration;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.persistence.InitializationContextImpl;
import org.infinispan.persistence.internal.PersistenceUtil;
import org.infinispan.persistence.spi.AdvancedCacheExpirationWriter;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.support.AbstractSegmentedAdvancedLoadWriteStore;
import org.infinispan.util.concurrent.CompletionStages;
import org.reactivestreams.Publisher;

public class ComposedSegmentedLoadWriteStore<K, V, T extends AbstractSegmentedStoreConfiguration>
extends AbstractSegmentedAdvancedLoadWriteStore<K, V> {
    private final AbstractSegmentedStoreConfiguration<T> configuration;
    Cache<K, V> cache;
    KeyPartitioner keyPartitioner;
    InitializationContext ctx;
    boolean shouldStopSegments;
    AtomicReferenceArray<AdvancedLoadWriteStore<K, V>> stores;

    public ComposedSegmentedLoadWriteStore(AbstractSegmentedStoreConfiguration<T> configuration) {
        this.configuration = configuration;
    }

    @Override
    public ToIntFunction<Object> getKeyMapper() {
        return this.keyPartitioner;
    }

    @Override
    public MarshallableEntry<K, V> get(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        if (store != null) {
            return store.loadEntry(key);
        }
        return null;
    }

    @Override
    public boolean contains(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        return store != null && store.contains(key);
    }

    @Override
    public void write(int segment, MarshallableEntry<? extends K, ? extends V> entry) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        if (store != null) {
            store.write(entry);
        }
    }

    @Override
    public boolean delete(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        return store != null && store.delete(key);
    }

    @Override
    public int size(IntSet segments) {
        int size = 0;
        PrimitiveIterator.OfInt segmentIterator = segments.iterator();
        while (segmentIterator.hasNext()) {
            int segment = segmentIterator.nextInt();
            AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
            if (store == null || (size += store.size()) >= 0) continue;
            return Integer.MAX_VALUE;
        }
        return size;
    }

    @Override
    public int size() {
        int size = 0;
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> store = this.stores.get(i);
            if (store == null || (size += store.size()) >= 0) continue;
            return Integer.MAX_VALUE;
        }
        return size;
    }

    @Override
    public Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter) {
        IntFunction<Publisher> publisherFunction = i -> {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws != null) {
                return alws.publishKeys(filter);
            }
            return Flowable.empty();
        };
        if (segments.size() == 1) {
            return publisherFunction.apply(segments.iterator().nextInt());
        }
        return Flowable.fromStream(segments.intStream().mapToObj(publisherFunction)).concatMap(RxJavaInterop.identityFunction());
    }

    @Override
    public Publisher<K> publishKeys(Predicate<? super K> filter) {
        return this.publishKeys(IntSets.immutableRangeSet((int)this.stores.length()), filter);
    }

    @Override
    public Publisher<MarshallableEntry<K, V>> entryPublisher(IntSet segments, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        IntFunction<Publisher> publisherFunction = i -> {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws != null) {
                return alws.entryPublisher(filter, fetchValue, fetchMetadata);
            }
            return Flowable.empty();
        };
        if (segments.size() == 1) {
            return publisherFunction.apply(segments.iterator().nextInt());
        }
        return Flowable.fromStream(segments.intStream().mapToObj(publisherFunction)).concatMap(RxJavaInterop.identityFunction());
    }

    @Override
    public Publisher<MarshallableEntry<K, V>> entryPublisher(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        return this.entryPublisher(IntSets.immutableRangeSet((int)this.stores.length()), filter, fetchValue, fetchMetadata);
    }

    @Override
    public void clear() {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws == null) continue;
            alws.clear();
        }
    }

    @Override
    public void purge(Executor executor, AdvancedCacheExpirationWriter.ExpirationPurgeListener<K, V> listener) {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws instanceof AdvancedCacheExpirationWriter) {
                ((AdvancedCacheExpirationWriter)((Object)alws)).purge(executor, listener);
                continue;
            }
            if (alws == null) continue;
            alws.purge(executor, listener);
        }
    }

    @Override
    public void clear(IntSet segments) {
        PrimitiveIterator.OfInt segmentIterator = segments.iterator();
        while (segmentIterator.hasNext()) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(segmentIterator.nextInt());
            if (alws == null) continue;
            alws.clear();
        }
    }

    @Override
    public void deleteBatch(Iterable<Object> keys) {
        int maxBatchSize = this.configuration.maxBatchSize();
        CompletionStage stage = Flowable.fromIterable(keys).groupBy(this.keyPartitioner::getSegment).flatMap(groupedFlowable -> groupedFlowable.buffer(maxBatchSize).doOnNext(batch -> this.stores.get((Integer)groupedFlowable.getKey()).deleteBatch((Iterable)batch)), this.stores.length()).ignoreElements().toCompletionStage(null);
        CompletionStages.join(stage);
    }

    @Override
    public CompletionStage<Void> bulkUpdate(Publisher<MarshallableEntry<? extends K, ? extends V>> publisher) {
        int maxBatchSize = this.configuration.maxBatchSize();
        return Flowable.fromPublisher(publisher).groupBy(me -> this.keyPartitioner.getSegment(me.getKey())).flatMapCompletable(groupedFlowable -> groupedFlowable.buffer(maxBatchSize).flatMapCompletable(batch -> {
            CompletionStage stage = this.stores.get((Integer)groupedFlowable.getKey()).bulkUpdate((Publisher)Flowable.fromIterable((Iterable)batch));
            return Completable.fromCompletionStage((CompletionStage)stage);
        }), false, this.stores.length()).toCompletionStage(null);
    }

    @Override
    public void init(InitializationContext ctx) {
        this.ctx = ctx;
        this.cache = ctx.getCache();
    }

    public void start() {
        ComponentRegistry componentRegistry = ComponentRegistry.of(this.cache);
        HashConfiguration hashConfiguration = this.cache.getCacheConfiguration().clustering().hash();
        this.keyPartitioner = componentRegistry.getComponent(KeyPartitioner.class);
        this.stores = new AtomicReferenceArray(hashConfiguration.numSegments());
        for (int i = 0; i < this.stores.length(); ++i) {
            this.startNewStoreForSegment(i);
        }
        this.shouldStopSegments = this.cache.getCacheConfiguration().clustering().cacheMode().isDistributed();
    }

    private void startNewStoreForSegment(int segment) {
        if (this.stores.get(segment) == null) {
            AbstractSegmentedStoreConfiguration storeConfiguration = (AbstractSegmentedStoreConfiguration)this.configuration.newConfigurationFrom(segment, this.ctx);
            AdvancedLoadWriteStore newStore = (AdvancedLoadWriteStore)PersistenceUtil.createStoreInstance(storeConfiguration);
            newStore.init(new InitializationContextImpl(storeConfiguration, this.cache, this.keyPartitioner, this.ctx.getPersistenceMarshaller(), this.ctx.getTimeService(), this.ctx.getByteBufferFactory(), this.ctx.getMarshallableEntryFactory(), this.ctx.getNonBlockingExecutor(), this.ctx.getGlobalConfiguration(), this.ctx.getBlockingManager(), this.ctx.getNonBlockingManager()));
            newStore.start();
            this.stores.set(segment, newStore);
        }
    }

    private void stopStoreForSegment(int segment) {
        AdvancedLoadWriteStore store = this.stores.getAndSet(segment, null);
        if (store != null) {
            store.stop();
        }
    }

    private void destroyStore(int segment) {
        AdvancedLoadWriteStore store = this.stores.getAndSet(segment, null);
        if (store != null) {
            store.destroy();
        }
    }

    public void stop() {
        for (int i = 0; i < this.stores.length(); ++i) {
            this.stopStoreForSegment(i);
        }
    }

    @Override
    public void destroy() {
        for (int i = 0; i < this.stores.length(); ++i) {
            this.destroyStore(i);
        }
    }

    @Override
    public void addSegments(IntSet segments) {
        segments.forEach(this::startNewStoreForSegment);
    }

    @Override
    public void removeSegments(IntSet segments) {
        if (this.shouldStopSegments) {
            PrimitiveIterator.OfInt segmentIterator = segments.iterator();
            while (segmentIterator.hasNext()) {
                this.destroyStore(segmentIterator.nextInt());
            }
        } else {
            this.clear(segments);
        }
    }

    public void forEach(ObjIntConsumer<? super AdvancedLoadWriteStore> consumer) {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> store = this.stores.get(i);
            if (store == null) continue;
            consumer.accept(store, i);
        }
    }
}

