/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.listener;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ReactiveRedisMessageListenerContainer
implements DisposableBean {
    private final RedisSerializationContext.SerializationPair<String> stringSerializationPair = RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.string());
    private final Map<ReactiveSubscription, Subscribers> subscriptions = new ConcurrentHashMap<ReactiveSubscription, Subscribers>();
    @Nullable
    private volatile ReactiveRedisConnection connection;

    public ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory) {
        Assert.notNull((Object)connectionFactory, (String)"ReactiveRedisConnectionFactory must not be null!");
        this.connection = connectionFactory.getReactiveConnection();
    }

    public void destroy() {
        this.destroyLater().block();
    }

    public Mono<Void> destroyLater() {
        return Mono.defer(this::doDestroy);
    }

    private Mono<Void> doDestroy() {
        if (this.connection == null) {
            return Mono.empty();
        }
        ReactiveRedisConnection connection = this.getRequiredConnection();
        Flux terminationSignals = null;
        while (!this.subscriptions.isEmpty()) {
            HashMap<ReactiveSubscription, Subscribers> local = new HashMap<ReactiveSubscription, Subscribers>(this.subscriptions);
            List monos = local.keySet().stream().peek(this.subscriptions::remove).map(ReactiveSubscription::cancel).collect(Collectors.toList());
            if (terminationSignals == null) {
                terminationSignals = Flux.concat(monos);
                continue;
            }
            terminationSignals = terminationSignals.mergeWith((Publisher)Flux.concat(monos));
        }
        this.connection = null;
        return terminationSignals != null ? terminationSignals.then(connection.closeLater()) : connection.closeLater();
    }

    public Collection<ReactiveSubscription> getActiveSubscriptions() {
        return this.subscriptions.entrySet().stream().filter(entry -> ((Subscribers)entry.getValue()).hasRegistration()).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    public Flux<ReactiveSubscription.Message<String, String>> receive(ChannelTopic ... channelTopics) {
        Assert.notNull((Object)channelTopics, (String)"ChannelTopics must not be null!");
        Assert.noNullElements((Object[])channelTopics, (String)"ChannelTopics must not contain null elements!");
        return this.receive(Arrays.asList(channelTopics), this.stringSerializationPair, this.stringSerializationPair);
    }

    public Mono<Flux<ReactiveSubscription.Message<String, String>>> receiveLater(ChannelTopic ... channelTopics) {
        Assert.notNull((Object)channelTopics, (String)"ChannelTopics must not be null!");
        Assert.noNullElements((Object[])channelTopics, (String)"ChannelTopics must not contain null elements!");
        return this.receiveLater(Arrays.asList(channelTopics), this.stringSerializationPair, this.stringSerializationPair);
    }

    public Flux<ReactiveSubscription.PatternMessage<String, String, String>> receive(PatternTopic ... patternTopics) {
        Assert.notNull((Object)patternTopics, (String)"PatternTopic must not be null!");
        Assert.noNullElements((Object[])patternTopics, (String)"PatternTopic must not contain null elements!");
        return this.receive(Arrays.asList(patternTopics), this.stringSerializationPair, this.stringSerializationPair).map(m -> (ReactiveSubscription.PatternMessage)m);
    }

    public Mono<Flux<ReactiveSubscription.PatternMessage<String, String, String>>> receiveLater(PatternTopic ... patternTopics) {
        Assert.notNull((Object)patternTopics, (String)"PatternTopic must not be null!");
        Assert.noNullElements((Object[])patternTopics, (String)"PatternTopic must not contain null elements!");
        return this.receiveLater(Arrays.asList(patternTopics), this.stringSerializationPair, this.stringSerializationPair).map(it -> it.map(m -> (ReactiveSubscription.PatternMessage)m));
    }

    public Flux<ReactiveSubscription.Message<String, String>> receive(Iterable<? extends Topic> topics, SubscriptionListener subscriptionListener) {
        return this.receive(topics, this.stringSerializationPair, this.stringSerializationPair, subscriptionListener);
    }

    public <C, B> Flux<ReactiveSubscription.Message<C, B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) {
        return this.receive(topics, channelSerializer, messageSerializer, SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
    }

    public <C, B> Flux<ReactiveSubscription.Message<C, B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, SubscriptionListener subscriptionListener) {
        Assert.notNull(topics, (String)"Topics must not be null!");
        Assert.notNull(channelSerializer, (String)"Channel serializer must not be null!");
        Assert.notNull(messageSerializer, (String)"Message serializer must not be null!");
        Assert.notNull((Object)subscriptionListener, (String)"SubscriptionListener must not be null!");
        this.verifyConnection();
        Object[] patterns = this.getTargets(topics, PatternTopic.class);
        Object[] channels = this.getTargets(topics, ChannelTopic.class);
        if (ObjectUtils.isEmpty((Object[])patterns) && ObjectUtils.isEmpty((Object[])channels)) {
            throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to.");
        }
        return this.doReceive(channelSerializer, messageSerializer, this.getRequiredConnection().pubSubCommands().createSubscription(subscriptionListener), (ByteBuffer[])patterns, (ByteBuffer[])channels);
    }

    private <C, B> Flux<ReactiveSubscription.Message<C, B>> doReceive(RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, Mono<ReactiveSubscription> subscription, ByteBuffer[] patterns, ByteBuffer[] channels) {
        Flux messageStream = subscription.flatMapMany(it -> {
            Mono<Void> subscribe = ReactiveRedisMessageListenerContainer.subscribe(patterns, channels, it);
            Sinks.One terminalSink = Sinks.one();
            return it.receive().mergeWith((Publisher)subscribe.then(Mono.defer(() -> {
                this.getSubscribers((ReactiveSubscription)it).registered();
                return Mono.empty();
            }))).doOnCancel(() -> {
                Subscribers subscribers = this.getSubscribers((ReactiveSubscription)it);
                if (subscribers.unregister()) {
                    this.subscriptions.remove(it);
                    it.cancel().subscribe(v -> terminalSink.tryEmitEmpty(), arg_0 -> ((Sinks.One)terminalSink).tryEmitError(arg_0));
                }
            }).mergeWith((Publisher)terminalSink.asMono());
        });
        return messageStream.map(message -> this.readMessage(channelSerializer.getReader(), messageSerializer.getReader(), (ReactiveSubscription.Message<ByteBuffer, ByteBuffer>)message));
    }

    public <C, B> Mono<Flux<ReactiveSubscription.Message<C, B>>> receiveLater(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) {
        Assert.notNull(topics, (String)"Topics must not be null!");
        Assert.notNull(channelSerializer, (String)"Channel serializer must not be null!");
        Assert.notNull(messageSerializer, (String)"Message serializer must not be null!");
        this.verifyConnection();
        Object[] patterns = this.getTargets(topics, PatternTopic.class);
        Object[] channels = this.getTargets(topics, ChannelTopic.class);
        if (ObjectUtils.isEmpty((Object[])patterns) && ObjectUtils.isEmpty((Object[])channels)) {
            throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to.");
        }
        return Mono.defer(() -> this.lambda$receiveLater$10(topics, channelSerializer, messageSerializer, (ByteBuffer[])patterns, (ByteBuffer[])channels));
    }

    private <C, B> Mono<Flux<ReactiveSubscription.Message<C, B>>> doReceiveLater(RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, Mono<ReactiveSubscription> subscription, ByteBuffer[] patterns, ByteBuffer[] channels) {
        return subscription.flatMap(it -> {
            Mono subscribe = ReactiveRedisMessageListenerContainer.subscribe(patterns, channels, it).doOnSuccess(v -> this.getSubscribers((ReactiveSubscription)it).registered());
            Sinks.One terminalSink = Sinks.one();
            Flux receiver = it.receive().doOnCancel(() -> {
                Subscribers subscribers = this.getSubscribers((ReactiveSubscription)it);
                if (subscribers.unregister()) {
                    this.subscriptions.remove(it);
                    it.cancel().subscribe(v -> terminalSink.tryEmitEmpty(), arg_0 -> ((Sinks.One)terminalSink).tryEmitError(arg_0));
                }
            }).mergeWith((Publisher)terminalSink.asMono()).map(message -> this.readMessage(channelSerializer.getReader(), messageSerializer.getReader(), (ReactiveSubscription.Message<ByteBuffer, ByteBuffer>)message));
            return subscribe.then(Mono.just((Object)receiver));
        });
    }

    private static Mono<Void> subscribe(ByteBuffer[] patterns, ByteBuffer[] channels, ReactiveSubscription it) {
        Assert.isTrue((!ObjectUtils.isEmpty((Object[])channels) || !ObjectUtils.isEmpty((Object[])patterns) ? 1 : 0) != 0, (String)"Must provide either channels or patterns!");
        Object subscribe = null;
        if (!ObjectUtils.isEmpty((Object[])patterns)) {
            subscribe = it.pSubscribe(patterns);
        }
        if (!ObjectUtils.isEmpty((Object[])channels)) {
            Mono<Void> channelsSubscribe = it.subscribe(channels);
            subscribe = subscribe == null ? channelsSubscribe : subscribe.and(channelsSubscribe);
        }
        return subscribe == null ? Mono.empty() : subscribe;
    }

    private boolean isActive() {
        return this.connection != null;
    }

    private void verifyConnection() {
        if (!this.isActive()) {
            throw new IllegalStateException("ReactiveRedisMessageListenerContainer is already disposed!");
        }
    }

    private Subscribers getSubscribers(ReactiveSubscription it) {
        return this.subscriptions.computeIfAbsent(it, key -> new Subscribers());
    }

    private ByteBuffer[] getTargets(Iterable<? extends Topic> topics, Class<?> classFilter) {
        return (ByteBuffer[])StreamSupport.stream(topics.spliterator(), false).filter(classFilter::isInstance).map(Topic::getTopic).map(this.stringSerializationPair::write).toArray(ByteBuffer[]::new);
    }

    private <C, B> ReactiveSubscription.Message<C, B> readMessage(RedisElementReader<C> channelSerializer, RedisElementReader<B> messageSerializer, ReactiveSubscription.Message<ByteBuffer, ByteBuffer> message) {
        if (message instanceof ReactiveSubscription.PatternMessage) {
            ReactiveSubscription.PatternMessage patternMessage = (ReactiveSubscription.PatternMessage)message;
            String pattern = ReactiveRedisMessageListenerContainer.read(this.stringSerializationPair.getReader(), (ByteBuffer)patternMessage.getPattern());
            C channel = ReactiveRedisMessageListenerContainer.read(channelSerializer, (ByteBuffer)patternMessage.getChannel());
            B body = ReactiveRedisMessageListenerContainer.read(messageSerializer, (ByteBuffer)patternMessage.getMessage());
            return new ReactiveSubscription.PatternMessage<String, C, B>(pattern, channel, body);
        }
        C channel = ReactiveRedisMessageListenerContainer.read(channelSerializer, message.getChannel());
        B body = ReactiveRedisMessageListenerContainer.read(messageSerializer, message.getMessage());
        return new ReactiveSubscription.ChannelMessage<C, B>(channel, body);
    }

    private ReactiveRedisConnection getRequiredConnection() {
        ReactiveRedisConnection connection = this.connection;
        if (connection == null) {
            throw new IllegalStateException("Connection no longer available");
        }
        return connection;
    }

    private static <C> C read(RedisElementReader<C> reader, ByteBuffer buffer) {
        try {
            buffer.mark();
            C c = reader.read(buffer);
            return c;
        }
        finally {
            buffer.reset();
        }
    }

    private /* synthetic */ Mono lambda$receiveLater$10(Iterable topics, RedisSerializationContext.SerializationPair channelSerializer, RedisSerializationContext.SerializationPair messageSerializer, ByteBuffer[] patterns, ByteBuffer[] channels) {
        SubscriptionReadyListener readyListener = SubscriptionReadyListener.create(topics, this.stringSerializationPair);
        return this.doReceiveLater(channelSerializer, messageSerializer, this.getRequiredConnection().pubSubCommands().createSubscription(readyListener), patterns, channels).delayUntil(it -> readyListener.getTrigger());
    }

    static class Subscribers {
        private static final AtomicLongFieldUpdater<Subscribers> SUBSCRIBERS = AtomicLongFieldUpdater.newUpdater(Subscribers.class, "subscribers");
        private volatile long subscribers;

        Subscribers() {
        }

        void registered() {
            SUBSCRIBERS.incrementAndGet(this);
        }

        boolean hasRegistration() {
            return SUBSCRIBERS.get(this) > 0L;
        }

        boolean unregister() {
            long value = SUBSCRIBERS.get(this);
            if (value <= 0L) {
                return false;
            }
            return SUBSCRIBERS.compareAndSet(this, value, value - 1L) && value == 1L;
        }
    }

    static class SubscriptionReadyListener
    extends AtomicBoolean
    implements SubscriptionListener {
        private final Set<ByteArrayWrapper> toSubscribe;
        private final Sinks.Empty<Void> sink = Sinks.empty();

        private SubscriptionReadyListener(Set<ByteArrayWrapper> topics) {
            this.toSubscribe = topics;
        }

        public static SubscriptionReadyListener create(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<String> serializationPair) {
            HashSet<ByteArrayWrapper> wrappers = new HashSet<ByteArrayWrapper>();
            for (Topic topic : topics) {
                wrappers.add(new ByteArrayWrapper(ByteUtils.getBytes(serializationPair.getWriter().write(topic.getTopic()))));
            }
            return new SubscriptionReadyListener(wrappers);
        }

        @Override
        public void onChannelSubscribed(byte[] channel, long count) {
            this.removeRemaining(channel);
        }

        @Override
        public void onPatternSubscribed(byte[] pattern, long count) {
            this.removeRemaining(pattern);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void removeRemaining(byte[] channel) {
            boolean done;
            Set<ByteArrayWrapper> set = this.toSubscribe;
            synchronized (set) {
                this.toSubscribe.remove(new ByteArrayWrapper(channel));
                done = this.toSubscribe.isEmpty();
            }
            if (done && this.compareAndSet(false, true)) {
                this.sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            }
        }

        public Mono<Void> getTrigger() {
            return this.sink.asMono();
        }
    }
}

