/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.RedisReactiveCommandsImpl;
import com.lambdaworks.redis.api.rx.Success;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.pubsub.PubSubOutput;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import com.lambdaworks.redis.pubsub.api.rx.ChannelMessage;
import com.lambdaworks.redis.pubsub.api.rx.PatternMessage;
import com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands;
import rx.Observable;
import rx.Subscriber;

public class RedisPubSubReactiveCommandsImpl<K, V>
extends RedisReactiveCommandsImpl<K, V>
implements RedisPubSubReactiveCommands<K, V> {
    public RedisPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.connection = connection;
    }

    @Override
    public void addListener(RedisPubSubListener<K, V> listener) {
        this.getStatefulConnection().addListener(listener);
    }

    @Override
    public Observable<PatternMessage<K, V>> observePatterns() {
        SubscriptionPubSubListener listener = new SubscriptionPubSubListener<K, V, PatternMessage<K, V>>(){

            @Override
            public void message(K pattern, K channel, V message) {
                if (this.subscriber == null) {
                    return;
                }
                if (this.subscriber.isUnsubscribed()) {
                    this.subscriber.onCompleted();
                    RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                    this.subscriber = null;
                    return;
                }
                this.subscriber.onNext(new PatternMessage(pattern, channel, message));
            }
        };
        return Observable.create(new PubSubObservable(listener));
    }

    @Override
    public Observable<ChannelMessage<K, V>> observeChannels() {
        SubscriptionPubSubListener listener = new SubscriptionPubSubListener<K, V, ChannelMessage<K, V>>(){

            @Override
            public void message(K channel, V message) {
                if (this.subscriber == null) {
                    return;
                }
                if (this.subscriber.isUnsubscribed()) {
                    this.subscriber.onCompleted();
                    RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                    this.subscriber = null;
                    return;
                }
                this.subscriber.onNext(new ChannelMessage(channel, message));
            }
        };
        return Observable.create(new PubSubObservable(listener));
    }

    @Override
    public void removeListener(RedisPubSubListener<K, V> listener) {
        this.getStatefulConnection().removeListener(listener);
    }

    @Override
    public Observable<Success> psubscribe(K ... patterns) {
        return this.getSuccessObservable(this.createObservable(CommandType.PSUBSCRIBE, new PubSubOutput(this.codec), this.args(patterns)));
    }

    @Override
    public Observable<Success> punsubscribe(K ... patterns) {
        return this.getSuccessObservable(this.createObservable(CommandType.PUNSUBSCRIBE, new PubSubOutput(this.codec), this.args(patterns)));
    }

    @Override
    public Observable<Success> subscribe(K ... channels) {
        return this.getSuccessObservable(this.createObservable(CommandType.SUBSCRIBE, new PubSubOutput(this.codec), this.args(channels)));
    }

    @Override
    public Observable<Success> unsubscribe(K ... channels) {
        return this.getSuccessObservable(this.createObservable(CommandType.UNSUBSCRIBE, new PubSubOutput(this.codec), this.args(channels)));
    }

    private CommandArgs<K, V> args(K ... keys) {
        CommandArgs args = new CommandArgs(this.codec);
        args.addKeys(keys);
        return args;
    }

    @Override
    public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisPubSubConnection)super.getStatefulConnection();
    }

    private static class SubscriptionPubSubListener<K, V, T>
    extends RedisPubSubAdapter<K, V> {
        protected Subscriber<? super T> subscriber;

        private SubscriptionPubSubListener() {
        }

        public void activate(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }
    }

    private class PubSubObservable<T>
    implements Observable.OnSubscribe<T> {
        private SubscriptionPubSubListener<K, V, T> listener;

        public PubSubObservable(SubscriptionPubSubListener<K, V, T> listener) {
            this.listener = listener;
        }

        public void call(Subscriber<? super T> subscriber) {
            this.listener.activate(subscriber);
            subscriber.onStart();
            RedisPubSubReactiveCommandsImpl.this.addListener(this.listener);
        }
    }
}

