/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.LettuceStrings;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslConnectionBuilder;
import io.lettuce.core.StatefulRedisConnectionImpl;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterDistributionChannelWriter;
import io.lettuce.core.cluster.ClusterNodeEndpoint;
import io.lettuce.core.cluster.ClusterPubSubConnectionProvider;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshScheduler;
import io.lettuce.core.cluster.PartitionsConsensus;
import io.lettuce.core.cluster.PooledClusterConnectionProvider;
import io.lettuce.core.cluster.PubSubClusterEndpoint;
import io.lettuce.core.cluster.ReconnectEventListener;
import io.lettuce.core.cluster.RedisClusterURIUtil;
import io.lettuce.core.cluster.RoundRobinSocketAddressSupplier;
import io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl;
import io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.topology.ClusterTopologyRefresh;
import io.lettuce.core.cluster.topology.NodeConnectionFactory;
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.DefaultEndpoint;
import io.lettuce.core.pubsub.PubSubCommandHandler;
import io.lettuce.core.pubsub.PubSubEndpoint;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnectionImpl;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.SocketAddressResolver;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class RedisClusterClient
extends AbstractRedisClient {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterClient.class);
    protected final AtomicBoolean clusterTopologyRefreshActivated = new AtomicBoolean(false);
    protected final AtomicReference<ScheduledFuture<?>> clusterTopologyRefreshFuture = new AtomicReference();
    private final ClusterTopologyRefresh refresh = new ClusterTopologyRefresh(new NodeConnectionFactoryImpl(), this.getResources());
    private final ClusterTopologyRefreshScheduler clusterTopologyRefreshScheduler = new ClusterTopologyRefreshScheduler(this, this.getResources());
    private final Iterable<RedisURI> initialUris;
    private Partitions partitions;

    protected RedisClusterClient() {
        super(null);
        this.initialUris = Collections.emptyList();
    }

    protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {
        super(clientResources);
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
        this.setDefaultTimeout(this.getFirstUri().getTimeout(), this.getFirstUri().getUnit());
        this.setOptions(ClusterClientOptions.builder().build());
    }

    private static void assertSameOptions(Iterable<RedisURI> redisURIs) {
        Boolean ssl = null;
        Boolean startTls = null;
        Boolean verifyPeer = null;
        for (RedisURI redisURI : redisURIs) {
            if (ssl == null) {
                ssl = redisURI.isSsl();
            }
            if (startTls == null) {
                startTls = redisURI.isStartTls();
            }
            if (verifyPeer == null) {
                verifyPeer = redisURI.isVerifyPeer();
            }
            if (ssl.booleanValue() != redisURI.isSsl()) {
                throw new IllegalArgumentException("RedisURI " + redisURI + " SSL is not consistent with the other seed URI SSL settings");
            }
            if (startTls.booleanValue() != redisURI.isStartTls()) {
                throw new IllegalArgumentException("RedisURI " + redisURI + " StartTLS is not consistent with the other seed URI StartTLS settings");
            }
            if (verifyPeer.booleanValue() == redisURI.isVerifyPeer()) continue;
            throw new IllegalArgumentException("RedisURI " + redisURI + " VerifyPeer is not consistent with the other seed URI VerifyPeer settings");
        }
    }

    public static RedisClusterClient create(RedisURI redisURI) {
        RedisClusterClient.assertNotNull(redisURI);
        return RedisClusterClient.create(Collections.singleton(redisURI));
    }

    public static RedisClusterClient create(Iterable<RedisURI> redisURIs) {
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        return new RedisClusterClient(null, redisURIs);
    }

    public static RedisClusterClient create(String uri) {
        LettuceAssert.notEmpty(uri, "URI must not be empty");
        return RedisClusterClient.create(RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
    }

    public static RedisClusterClient create(ClientResources clientResources, RedisURI redisURI) {
        RedisClusterClient.assertNotNull(clientResources);
        RedisClusterClient.assertNotNull(redisURI);
        return RedisClusterClient.create(clientResources, Collections.singleton(redisURI));
    }

    public static RedisClusterClient create(ClientResources clientResources, String uri) {
        RedisClusterClient.assertNotNull(clientResources);
        LettuceAssert.notEmpty(uri, "URI must not be empty");
        return RedisClusterClient.create(clientResources, RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
    }

    public static RedisClusterClient create(ClientResources clientResources, Iterable<RedisURI> redisURIs) {
        RedisClusterClient.assertNotNull(clientResources);
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        return new RedisClusterClient(clientResources, redisURIs);
    }

    public StatefulRedisClusterConnection<String, String> connect() {
        return this.connect(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisClusterConnection<K, V> connect(RedisCodec<K, V> codec) {
        return this.connectClusterImpl(codec);
    }

    public StatefulRedisClusterPubSubConnection<String, String> connectPubSub() {
        return this.connectPubSub(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisClusterPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
        return this.connectClusterPubSubImpl(codec);
    }

    protected StatefulRedisConnection<String, String> connectToNode(SocketAddress socketAddress) {
        return this.connectToNode(this.newStringStringCodec(), socketAddress.toString(), null, () -> socketAddress);
    }

    <K, V> StatefulRedisConnection<K, V> connectToNode(RedisCodec<K, V> codec, String nodeId, RedisChannelWriter clusterWriter, Supplier<SocketAddress> socketAddressSupplier) {
        return this.getConnection(this.connectToNodeAsync(codec, nodeId, clusterWriter, socketAddressSupplier));
    }

    <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisCodec<K, V> codec, String nodeId, RedisChannelWriter clusterWriter, Supplier<SocketAddress> socketAddressSupplier) {
        RedisClusterClient.assertNotNull(codec);
        RedisClusterClient.assertNotEmpty(this.initialUris);
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        SocketAddress socketAddress = socketAddressSupplier.get();
        logger.debug(String.format("connectToNodeAsync(%s at %s)", nodeId, socketAddress));
        ClusterNodeEndpoint endpoint = new ClusterNodeEndpoint(this.clientOptions, this.getResources(), clusterWriter);
        StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl(endpoint, codec, this.timeout, this.unit);
        ConnectionFuture connectionFuture = this.connectStatefulAsync(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new CommandHandler(this.clientResources, endpoint));
        return connectionFuture.whenComplete((conn, throwable) -> {
            if (throwable != null) {
                connection.close();
            }
        });
    }

    <K, V> StatefulRedisPubSubConnection<K, V> connectPubSubToNode(RedisCodec<K, V> codec, String nodeId, Supplier<SocketAddress> socketAddressSupplier) {
        return this.getConnection(this.connectPubSubToNodeAsync(codec, nodeId, socketAddressSupplier));
    }

    <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNodeAsync(RedisCodec<K, V> codec, String nodeId, Supplier<SocketAddress> socketAddressSupplier) {
        RedisClusterClient.assertNotNull(codec);
        RedisClusterClient.assertNotEmpty(this.initialUris);
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        logger.debug("connectPubSubToNode(" + nodeId + ")");
        PubSubEndpoint endpoint = new PubSubEndpoint(this.clientOptions);
        StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl(endpoint, endpoint, codec, this.timeout, this.unit);
        ConnectionFuture connectionFuture = this.connectStatefulAsync(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new PubSubCommandHandler(this.clientResources, codec, endpoint));
        return connectionFuture.whenComplete((conn, throwable) -> {
            if (throwable != null) {
                connection.close();
            }
        });
    }

    <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
        if (this.partitions == null) {
            this.initializePartitions();
        }
        this.activateTopologyRefreshIfNeeded();
        logger.debug("connectCluster(" + this.initialUris + ")");
        Supplier<SocketAddress> socketAddressSupplier = this.getSocketAddressSupplier(TopologyComparators::sortByClientCount);
        DefaultEndpoint endpoint = new DefaultEndpoint(this.clientOptions);
        ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(this.clientOptions, endpoint, this.clusterTopologyRefreshScheduler);
        PooledClusterConnectionProvider<K, V> pooledClusterConnectionProvider = new PooledClusterConnectionProvider<K, V>(this, clusterWriter, codec);
        clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
        StatefulRedisClusterConnectionImpl<K, V> connection = new StatefulRedisClusterConnectionImpl<K, V>(clusterWriter, codec, this.timeout, this.unit);
        connection.setReadFrom(ReadFrom.MASTER);
        connection.setPartitions(this.partitions);
        boolean connected = false;
        RedisException causingException = null;
        int connectionAttempts = Math.max(1, this.partitions.size());
        for (int i = 0; i < connectionAttempts; ++i) {
            try {
                this.connectStateful(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new CommandHandler(this.clientResources, endpoint));
                connected = true;
                break;
            }
            catch (RedisException e) {
                logger.warn(e.getMessage());
                causingException = e;
                continue;
            }
        }
        if (!connected) {
            connection.close();
            if (causingException != null) {
                throw causingException;
            }
        }
        connection.registerCloseables(this.closeableResources, clusterWriter, pooledClusterConnectionProvider);
        return connection;
    }

    <K, V> StatefulRedisClusterPubSubConnection<K, V> connectClusterPubSubImpl(RedisCodec<K, V> codec) {
        if (this.partitions == null) {
            this.initializePartitions();
        }
        this.activateTopologyRefreshIfNeeded();
        logger.debug("connectClusterPubSub(" + this.initialUris + ")");
        Supplier<SocketAddress> socketAddressSupplier = this.getSocketAddressSupplier(TopologyComparators::sortByClientCount);
        PubSubClusterEndpoint endpoint = new PubSubClusterEndpoint(this.clientOptions);
        ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(this.clientOptions, endpoint, this.clusterTopologyRefreshScheduler);
        StatefulRedisClusterPubSubConnectionImpl connection = new StatefulRedisClusterPubSubConnectionImpl(endpoint, (RedisChannelWriter)clusterWriter, codec, this.timeout, this.unit);
        ClusterPubSubConnectionProvider<K, V> pooledClusterConnectionProvider = new ClusterPubSubConnectionProvider<K, V>(this, clusterWriter, codec, connection.getUpstreamListener());
        clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
        connection.setPartitions(this.partitions);
        boolean connected = false;
        RedisException causingException = null;
        int connectionAttempts = Math.max(1, this.partitions.size());
        for (int i = 0; i < connectionAttempts; ++i) {
            try {
                this.connectStateful(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new PubSubCommandHandler(this.clientResources, codec, endpoint));
                connected = true;
                break;
            }
            catch (RedisException e) {
                logger.warn(e.getMessage());
                causingException = e;
                continue;
            }
        }
        if (!connected) {
            connection.close();
            if (causingException != null) {
                throw causingException;
            }
        }
        connection.registerCloseables(this.closeableResources, clusterWriter, pooledClusterConnectionProvider);
        return connection;
    }

    private <K, V> void connectStateful(StatefulRedisConnectionImpl<K, V> connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Supplier<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        this.getConnection(this.connectStatefulAsync(connection, endpoint, connectionSettings, socketAddressSupplier, commandHandlerSupplier));
    }

    private <K, V> void connectStateful(StatefulRedisClusterConnectionImpl<K, V> connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Supplier<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        this.getConnection(this.connectStatefulAsync(connection, endpoint, connectionSettings, socketAddressSupplier, commandHandlerSupplier));
    }

    private <K, V, T extends RedisChannelHandler<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Supplier<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder = this.createConnectionBuilder(connection, endpoint, connectionSettings, socketAddressSupplier, commandHandlerSupplier);
        if (this.clientOptions.isPingBeforeActivateConnection()) {
            if (this.hasPassword(connectionSettings)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }
        CompletionStage future = this.initializeChannelAsync(connectionBuilder);
        if (!this.clientOptions.isPingBeforeActivateConnection() && this.hasPassword(connectionSettings)) {
            future = future.thenApplyAsync(channelHandler -> {
                if (connection instanceof StatefulRedisClusterConnectionImpl) {
                    ((StatefulRedisClusterConnectionImpl)connection).async().auth(new String(connectionSettings.getPassword()));
                }
                if (connection instanceof StatefulRedisConnectionImpl) {
                    ((StatefulRedisConnectionImpl)connection).async().auth(new String(connectionSettings.getPassword()));
                }
                return channelHandler;
            }, (Executor)this.clientResources.eventExecutorGroup());
        }
        if (LettuceStrings.isNotEmpty(connectionSettings.getClientName())) {
            future = future.thenApply(channelHandler -> {
                if (connection instanceof StatefulRedisClusterConnectionImpl) {
                    ((StatefulRedisClusterConnectionImpl)connection).setClientName(connectionSettings.getClientName());
                }
                if (connection instanceof StatefulRedisConnectionImpl) {
                    ((StatefulRedisConnectionImpl)connection).setClientName(connectionSettings.getClientName());
                }
                return channelHandler;
            });
        }
        return future.thenApply(channelHandler -> connection);
    }

    private boolean hasPassword(RedisURI connectionSettings) {
        return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0;
    }

    private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K, V> connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Supplier<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder;
        if (connectionSettings.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(connectionSettings);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        connectionBuilder.reconnectionListener(new ReconnectEventListener(this.clusterTopologyRefreshScheduler));
        connectionBuilder.clientOptions(this.clientOptions);
        connectionBuilder.connection(connection);
        connectionBuilder.clientResources(this.clientResources);
        connectionBuilder.endpoint(endpoint);
        connectionBuilder.commandHandler(commandHandlerSupplier);
        this.connectionBuilder(socketAddressSupplier, connectionBuilder, connectionSettings);
        this.channelType(connectionBuilder, connectionSettings);
        return connectionBuilder;
    }

    public void reloadPartitions() {
        if (this.partitions == null) {
            this.initializePartitions();
            this.partitions.updateCache();
        } else {
            Partitions loadedPartitions = this.loadPartitions();
            if (TopologyComparators.isChanged(this.getPartitions(), loadedPartitions)) {
                logger.debug("Using a new cluster topology");
                ArrayList<RedisClusterNode> before = new ArrayList<RedisClusterNode>(this.getPartitions());
                ArrayList<RedisClusterNode> after = new ArrayList<RedisClusterNode>(loadedPartitions);
                this.getResources().eventBus().publish(new ClusterTopologyChangedEvent(before, after));
            }
            this.partitions.reload(loadedPartitions.getPartitions());
        }
        this.updatePartitionsInConnections();
    }

    protected void updatePartitionsInConnections() {
        this.forEachClusterConnection(input -> input.setPartitions(this.partitions));
        this.forEachClusterPubSubConnection(input -> input.setPartitions(this.partitions));
    }

    protected void initializePartitions() {
        this.partitions = this.loadPartitions();
    }

    public Partitions getPartitions() {
        if (this.partitions == null) {
            this.initializePartitions();
        }
        return this.partitions;
    }

    protected Partitions loadPartitions() {
        Iterable<RedisURI> topologyRefreshSource = this.getTopologyRefreshSource();
        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            Map<RedisURI, Partitions> partitions = this.refresh.loadViews(topologyRefreshSource, this.useDynamicRefreshSources());
            if (partitions.isEmpty()) {
                throw new RedisException(message);
            }
            Partitions loadedPartitions = this.determinePartitions(this.partitions, partitions);
            RedisURI viewedBy = this.refresh.getViewedBy(partitions, loadedPartitions);
            for (RedisClusterNode partition : loadedPartitions) {
                if (viewedBy == null) continue;
                RedisURI uri = partition.getUri();
                RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri);
            }
            this.activateTopologyRefreshIfNeeded();
            return loadedPartitions;
        }
        catch (RedisConnectionException e) {
            throw new RedisException(message, e);
        }
    }

    protected Partitions determinePartitions(Partitions current, Map<RedisURI, Partitions> topologyViews) {
        if (current == null) {
            return PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, topologyViews);
        }
        return PartitionsConsensus.KNOWN_MAJORITY.getPartitions(current, topologyViews);
    }

    private void activateTopologyRefreshIfNeeded() {
        if (this.getOptions() instanceof ClusterClientOptions) {
            ClusterClientOptions options = (ClusterClientOptions)this.getOptions();
            ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();
            if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || this.clusterTopologyRefreshActivated.get()) {
                return;
            }
            if (this.clusterTopologyRefreshActivated.compareAndSet(false, true)) {
                ScheduledFuture scheduledFuture = this.genericWorkerPool.scheduleAtFixedRate((Runnable)this.clusterTopologyRefreshScheduler, options.getRefreshPeriod(), options.getRefreshPeriod(), options.getRefreshPeriodUnit());
                this.clusterTopologyRefreshFuture.set(scheduledFuture);
            }
        }
    }

    public void setPartitions(Partitions partitions) {
        this.partitions = partitions;
    }

    public ClientResources getResources() {
        return this.clientResources;
    }

    @Override
    public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {
        if (this.clusterTopologyRefreshActivated.compareAndSet(true, false)) {
            ScheduledFuture<?> scheduledFuture = this.clusterTopologyRefreshFuture.get();
            try {
                scheduledFuture.cancel(false);
                this.clusterTopologyRefreshFuture.set(null);
            }
            catch (Exception e) {
                logger.debug("Could not unschedule Cluster topology refresh", (Throwable)e);
            }
        }
        super.shutdown(quietPeriod, timeout, timeUnit);
    }

    public void setOptions(ClusterClientOptions clientOptions) {
        super.setOptions(clientOptions);
    }

    protected RedisURI getFirstUri() {
        RedisClusterClient.assertNotEmpty(this.initialUris);
        Iterator<RedisURI> iterator = this.initialUris.iterator();
        return iterator.next();
    }

    protected Supplier<SocketAddress> getSocketAddressSupplier(Function<Partitions, Collection<RedisClusterNode>> sortFunction) {
        LettuceAssert.notNull(sortFunction, "Sort function must not be null");
        RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(this.partitions, sortFunction, this.clientResources);
        return () -> {
            if (this.partitions.isEmpty()) {
                SocketAddress socketAddress = SocketAddressResolver.resolve(this.getFirstUri(), this.clientResources.dnsResolver());
                logger.debug("Resolved SocketAddress {} using {}", (Object)socketAddress, (Object)this.getFirstUri());
                return socketAddress;
            }
            return socketAddressSupplier.get();
        };
    }

    protected Iterable<RedisURI> getInitialUris() {
        return this.initialUris;
    }

    protected void forEachClusterConnection(Consumer<StatefulRedisClusterConnectionImpl<?, ?>> function) {
        this.forEachCloseable(input -> input instanceof StatefulRedisClusterConnectionImpl, function);
    }

    protected void forEachClusterPubSubConnection(Consumer<StatefulRedisClusterPubSubConnectionImpl<?, ?>> function) {
        this.forEachCloseable(input -> input instanceof StatefulRedisClusterPubSubConnectionImpl, function);
    }

    protected <T extends Closeable> void forEachCloseable(Predicate<? super Closeable> selector, Consumer<T> function) {
        for (Closeable c : this.closeableResources) {
            if (!selector.test(c)) continue;
            function.accept(c);
        }
    }

    protected Iterable<RedisURI> getTopologyRefreshSource() {
        Iterable<RedisURI> seed;
        boolean initialSeedNodes;
        boolean bl = initialSeedNodes = !this.useDynamicRefreshSources();
        if (initialSeedNodes || this.partitions == null || this.partitions.isEmpty()) {
            seed = this.initialUris;
        } else {
            ArrayList<RedisURI> uris = new ArrayList<RedisURI>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(this.partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

    protected boolean useDynamicRefreshSources() {
        if (this.getClusterClientOptions() != null) {
            ClusterTopologyRefreshOptions topologyRefreshOptions = this.getClusterClientOptions().getTopologyRefreshOptions();
            return topologyRefreshOptions.useDynamicRefreshSources();
        }
        return true;
    }

    protected RedisCodec<String, String> newStringStringCodec() {
        return StringCodec.UTF8;
    }

    ClusterClientOptions getClusterClientOptions() {
        if (this.getOptions() instanceof ClusterClientOptions) {
            return (ClusterClientOptions)this.getOptions();
        }
        return null;
    }

    boolean expireStaleConnections() {
        return this.getClusterClientOptions() == null || this.getClusterClientOptions().isCloseStaleConnections();
    }

    private static <K, V> void assertNotNull(RedisCodec<K, V> codec) {
        LettuceAssert.notNull(codec, "RedisCodec must not be null");
    }

    private static void assertNotEmpty(Iterable<RedisURI> redisURIs) {
        LettuceAssert.notNull(redisURIs, "RedisURIs must not be null");
        LettuceAssert.isTrue(redisURIs.iterator().hasNext(), "RedisURIs must not be empty");
    }

    private static RedisURI assertNotNull(RedisURI redisURI) {
        LettuceAssert.notNull(redisURI, "RedisURI must not be null");
        return redisURI;
    }

    private static void assertNotNull(ClientResources clientResources) {
        LettuceAssert.notNull(clientResources, "ClientResources must not be null");
    }

    private class NodeConnectionFactoryImpl
    implements NodeConnectionFactory {
        private NodeConnectionFactoryImpl() {
        }

        @Override
        public <K, V> StatefulRedisConnection<K, V> connectToNode(RedisCodec<K, V> codec, SocketAddress socketAddress) {
            return RedisClusterClient.this.connectToNode(codec, socketAddress.toString(), null, () -> socketAddress);
        }

        @Override
        public <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisCodec<K, V> codec, SocketAddress socketAddress) {
            return RedisClusterClient.this.connectToNodeAsync(codec, socketAddress.toString(), null, () -> socketAddress);
        }
    }
}

