/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.Cancelable;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamPollTask;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.data.redis.stream.Task;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;

class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>>
implements StreamMessageListenerContainer<K, V> {
    private final Object lifecycleMonitor = new Object();
    private final Executor taskExecutor;
    private final ErrorHandler errorHandler;
    private final StreamReadOptions readOptions;
    private final RedisTemplate<K, ?> template;
    private final StreamOperations<K, Object, Object> streamOperations;
    private final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> containerOptions;
    private final List<Subscription> subscriptions = new ArrayList<Subscription>();
    private boolean running = false;

    DefaultStreamMessageListenerContainer(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> containerOptions) {
        Assert.notNull((Object)connectionFactory, (String)"RedisConnectionFactory must not be null!");
        Assert.notNull(containerOptions, (String)"StreamMessageListenerContainerOptions must not be null!");
        this.taskExecutor = containerOptions.getExecutor();
        this.errorHandler = containerOptions.getErrorHandler();
        this.readOptions = DefaultStreamMessageListenerContainer.getStreamReadOptions(containerOptions);
        this.template = this.createRedisTemplate(connectionFactory, containerOptions);
        this.containerOptions = containerOptions;
        this.streamOperations = containerOptions.hasHashMapper() ? this.template.opsForStream(containerOptions.getRequiredHashMapper()) : this.template.opsForStream();
    }

    private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainer.StreamMessageListenerContainerOptions<?, ?> options) {
        StreamReadOptions readOptions = StreamReadOptions.empty();
        if (options.getBatchSize().isPresent()) {
            readOptions = readOptions.count(options.getBatchSize().getAsInt());
        }
        if (!options.getPollTimeout().isZero()) {
            readOptions = readOptions.block(options.getPollTimeout());
        }
        return readOptions;
    }

    private RedisTemplate<K, V> createRedisTemplate(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> containerOptions) {
        RedisTemplate template = new RedisTemplate();
        template.setKeySerializer(containerOptions.getKeySerializer());
        template.setValueSerializer(containerOptions.getKeySerializer());
        template.setHashKeySerializer(containerOptions.getHashKeySerializer());
        template.setHashValueSerializer(containerOptions.getHashValueSerializer());
        template.setConnectionFactory(connectionFactory);
        template.afterPropertiesSet();
        return template;
    }

    public boolean isAutoStartup() {
        return false;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.running) {
                return;
            }
            this.subscriptions.stream().filter(it -> !it.isActive()).filter(it -> it instanceof TaskSubscription).map(TaskSubscription.class::cast).map(TaskSubscription::getTask).forEach(this.taskExecutor::execute);
            this.running = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.running) {
                this.subscriptions.forEach(Cancelable::cancel);
                this.running = false;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            return this.running;
        }
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override
    public Subscription register(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
        return this.doRegister(this.getReadTask(streamRequest, listener));
    }

    private StreamPollTask<K, V> getReadTask(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
        Function<ReadOffset, List<ByteRecord>> readFunction = this.getReadFunction(streamRequest);
        Function<ByteRecord, V> deserializerToUse = this.getDeserializer();
        TypeDescriptor targetType = TypeDescriptor.valueOf(this.containerOptions.hasHashMapper() ? this.containerOptions.getTargetType() : MapRecord.class);
        return new StreamPollTask<K, V>(streamRequest, listener, this.errorHandler, targetType, readFunction, deserializerToUse);
    }

    private Function<ByteRecord, V> getDeserializer() {
        Function<ByteRecord, MapRecord> deserializer = this.streamOperations::deserializeRecord;
        if (this.containerOptions.getHashMapper() == null) {
            return deserializer;
        }
        return source -> {
            MapRecord intermediate = (MapRecord)deserializer.apply((ByteRecord)source);
            return this.streamOperations.map(intermediate, this.containerOptions.getTargetType());
        };
    }

    private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest) {
        byte[] rawKey = this.template.getKeySerializer().serialize(streamRequest.getStreamOffset().getKey());
        if (streamRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest) {
            StreamMessageListenerContainer.ConsumerStreamReadRequest consumerStreamRequest = (StreamMessageListenerContainer.ConsumerStreamReadRequest)streamRequest;
            StreamReadOptions readOptions = consumerStreamRequest.isAutoAcknowledge() ? this.readOptions.autoAcknowledge() : this.readOptions;
            Consumer consumer = consumerStreamRequest.getConsumer();
            return offset -> this.template.execute(connection -> connection.streamCommands().xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
        }
        return offset -> this.template.execute(connection -> connection.streamCommands().xRead(this.readOptions, StreamOffset.create(rawKey, offset)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Subscription doRegister(Task task) {
        TaskSubscription subscription = new TaskSubscription(task);
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            this.subscriptions.add(subscription);
            if (this.running) {
                this.taskExecutor.execute((Runnable)((Object)task));
            }
        }
        return subscription;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void remove(Subscription subscription) {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.subscriptions.contains(subscription)) {
                if (subscription.isActive()) {
                    subscription.cancel();
                }
                this.subscriptions.remove(subscription);
            }
        }
    }

    static class TaskSubscription
    implements Subscription {
        private final Task task;

        protected TaskSubscription(Task task) {
            this.task = task;
        }

        Task getTask() {
            return this.task;
        }

        @Override
        public boolean isActive() {
            return this.task.isActive();
        }

        @Override
        public boolean await(Duration timeout) throws InterruptedException {
            return this.task.awaitStart(timeout);
        }

        @Override
        public void cancel() throws DataAccessResourceFailureException {
            this.task.cancel();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TaskSubscription that = (TaskSubscription)o;
            return ObjectUtils.nullSafeEquals((Object)this.task, (Object)that.task);
        }

        public int hashCode() {
            return ObjectUtils.nullSafeHashCode((Object)this.task);
        }
    }

    static enum LoggingErrorHandler implements ErrorHandler
    {
        INSTANCE;

        private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);

        public void handleError(Throwable t) {
            if (this.logger.isErrorEnabled()) {
                this.logger.error((Object)"Unexpected error occurred in scheduled task.", t);
            }
        }
    }
}

