/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.RunnableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBAggregatingState;
import org.apache.flink.contrib.streaming.state.RocksDBFoldingState;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBListState;
import org.apache.flink.contrib.streaming.state.RocksDBMapState;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBReducingState;
import org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBSnapshotTransformFactoryAdaptor;
import org.apache.flink.contrib.streaming.state.RocksDBValueState;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, RocksDBValueState::create), Tuple2.of(ListStateDescriptor.class, RocksDBListState::create), Tuple2.of(MapStateDescriptor.class, RocksDBMapState::create), Tuple2.of(AggregatingStateDescriptor.class, RocksDBAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, RocksDBReducingState::create), Tuple2.of(FoldingStateDescriptor.class, RocksDBFoldingState::create)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final DBOptions dbOptions;
    private final File instanceBasePath;
    private final ResourceGuard rocksDBResourceGuard;
    private final WriteOptions writeOptions;
    private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
    private final int keyGroupPrefixBytes;
    private final ColumnFamilyHandle defaultColumnFamily;
    private final RocksDBWriteBatchWrapper writeBatchWrapper;
    private final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
    private final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;
    private final RocksDBNativeMetricMonitor nativeMetricMonitor;
    private final PriorityQueueSetFactory priorityQueueFactory;
    private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
    protected final RocksDB db;
    private boolean disposed = false;
    private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;

    public RocksDBKeyedStateBackend(ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, RocksDB db, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, ResourceGuard rocksDBResourceGuard, RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy, RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy, RocksDBWriteBatchWrapper writeBatchWrapper, ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder, PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, cancelStreamRegistry, keyGroupCompressionDecorator, keyContext);
        this.ttlCompactFiltersManager = ttlCompactFiltersManager;
        this.columnFamilyOptionsFactory = (Function)Preconditions.checkNotNull(columnFamilyOptionsFactory);
        this.dbOptions = (DBOptions)Preconditions.checkNotNull((Object)dbOptions);
        this.instanceBasePath = (File)Preconditions.checkNotNull((Object)instanceBasePath);
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.kvStateInformation = kvStateInformation;
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        this.db = db;
        this.rocksDBResourceGuard = rocksDBResourceGuard;
        this.checkpointSnapshotStrategy = checkpointSnapshotStrategy;
        this.savepointSnapshotStrategy = savepointSnapshotStrategy;
        this.writeBatchWrapper = writeBatchWrapper;
        this.defaultColumnFamily = defaultColumnFamilyHandle;
        this.nativeMetricMonitor = nativeMetricMonitor;
        this.sharedRocksKeyBuilder = sharedRocksKeyBuilder;
        this.priorityQueueFactory = priorityQueueFactory;
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        byte[] nameSpaceBytes;
        RocksDbKvStateInfo columnInfo = this.kvStateInformation.get(state);
        if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
            return Stream.empty();
        }
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)columnInfo.metaInfo;
        TypeSerializer namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
        DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
        boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(this.getKeySerializer(), namespaceSerializer);
        try {
            RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible);
            nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
        }
        catch (IOException ex) {
            throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", (Throwable)ex);
        }
        RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(this.db, columnInfo.columnFamilyHandle);
        iterator.seekToFirst();
        RocksStateKeysIterator iteratorWrapper = new RocksStateKeysIterator(iterator, state, this.getKeySerializer(), this.keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes);
        Stream targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 16), false);
        return (Stream)targetStream.onClose(iteratorWrapper::close);
    }

    @VisibleForTesting
    ColumnFamilyHandle getColumnFamilyHandle(String state) {
        RocksDbKvStateInfo columnInfo = this.kvStateInformation.get(state);
        return columnInfo != null ? columnInfo.columnFamilyHandle : null;
    }

    public void setCurrentKey(K newKey) {
        super.setCurrentKey(newKey);
        this.sharedRocksKeyBuilder.setKeyAndKeyGroup(this.getCurrentKey(), this.getCurrentKeyGroupIndex());
    }

    public void dispose() {
        if (this.disposed) {
            return;
        }
        super.dispose();
        this.rocksDBResourceGuard.close();
        if (this.db != null) {
            IOUtils.closeQuietly((AutoCloseable)this.writeBatchWrapper);
            if (this.nativeMetricMonitor != null) {
                this.nativeMetricMonitor.close();
            }
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(this.kvStateInformation.values().size());
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, this.defaultColumnFamily);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
            for (RocksDbKvStateInfo kvStateInfo : this.kvStateInformation.values()) {
                RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, kvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly((AutoCloseable)kvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly((AutoCloseable)this.db);
            columnFamilyOptions.forEach(IOUtils::closeQuietly);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            IOUtils.closeQuietly((AutoCloseable)this.writeOptions);
            this.ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            this.kvStateInformation.clear();
            this.cleanInstanceBasePath();
        }
        this.disposed = true;
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
    }

    private void cleanInstanceBasePath() {
        LOG.info("Deleting existing instance base directory {}.", (Object)this.instanceBasePath);
        try {
            FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
        catch (IOException ex) {
            LOG.warn("Could not delete instance base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
        }
    }

    public int getKeyGroupPrefixBytes() {
        return this.keyGroupPrefixBytes;
    }

    @VisibleForTesting
    PriorityQueueSetFactory getPriorityQueueFactory() {
        return this.priorityQueueFactory;
    }

    public WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
        return this.sharedRocksKeyBuilder;
    }

    @VisibleForTesting
    boolean isDisposed() {
        return this.disposed;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long startTime = System.currentTimeMillis();
        this.writeBatchWrapper.flush();
        RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy = checkpointOptions.getCheckpointType().isSavepoint() ? this.savepointSnapshotStrategy : this.checkpointSnapshotStrategy;
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner = chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        chosenSnapshotStrategy.logSyncCompleted(streamFactory, startTime);
        return snapshotRunner;
    }

    public void notifyCheckpointComplete(long completedCheckpointId) throws Exception {
        if (this.checkpointSnapshotStrategy != null) {
            this.checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
        }
        if (this.savepointSnapshotStrategy != null) {
            this.savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
        }
    }

    private <N, S extends State, SV, SEV> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tryRegisterKvStateInformation(StateDescriptor<S, SV> stateDesc, TypeSerializer<N> namespaceSerializer, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        RocksDbKvStateInfo newRocksStateInfo;
        RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
        RocksDbKvStateInfo oldStateInfo = this.kvStateInformation.get(stateDesc.getName());
        TypeSerializer stateSerializer = stateDesc.getSerializer();
        if (oldStateInfo != null) {
            RegisteredKeyValueStateBackendMetaInfo castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.metaInfo;
            newMetaInfo = this.updateRestoredStateMetaInfo(Tuple2.of((Object)oldStateInfo.columnFamilyHandle, (Object)castedMetaInfo), stateDesc, namespaceSerializer, stateSerializer);
            newRocksStateInfo = new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, (RegisteredStateMetaInfoBase)newMetaInfo);
            this.kvStateInformation.put(stateDesc.getName(), newRocksStateInfo);
        } else {
            newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<N, SV>(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateSerializer, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());
            newRocksStateInfo = RocksDBOperationUtils.createStateInfo(newMetaInfo, this.db, this.columnFamilyOptionsFactory, this.ttlCompactFiltersManager);
            RocksDBOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateDesc.getName(), newRocksStateInfo);
        }
        StateSnapshotTransformer.StateSnapshotTransformFactory<SV> wrappedSnapshotTransformFactory = RocksDBSnapshotTransformFactoryAdaptor.wrapStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory, newMetaInfo.getStateSerializer());
        newMetaInfo.updateSnapshotTransformFactory(wrappedSnapshotTransformFactory);
        this.ttlCompactFiltersManager.configCompactFilter(stateDesc, newMetaInfo.getStateSerializer());
        return Tuple2.of((Object)newRocksStateInfo.columnFamilyHandle, newMetaInfo);
    }

    private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo, StateDescriptor<S, SV> stateDesc, TypeSerializer<N> namespaceSerializer, TypeSerializer<SV> stateSerializer) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo restoredKvStateMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.f1;
        TypeSerializerSchemaCompatibility s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
        if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
            throw new StateMigrationException("The new namespace serializer must be compatible.");
        }
        restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
        TypeSerializerSchemaCompatibility newStateSerializerCompatibility = restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
        if (newStateSerializerCompatibility.isCompatibleAfterMigration()) {
            this.migrateStateValues(stateDesc, oldStateInfo);
        } else if (newStateSerializerCompatibility.isIncompatible()) {
            throw new StateMigrationException("The new state serializer cannot be incompatible.");
        }
        return restoredKvStateMetaInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <N, S extends State, SV> void migrateStateValues(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception {
        if (stateDesc.getType() == StateDescriptor.Type.MAP) {
            TypeSerializerSnapshot previousSerializerSnapshot = ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getPreviousStateSerializerSnapshot();
            Preconditions.checkState((previousSerializerSnapshot != null ? 1 : 0) != 0, (Object)"the previous serializer snapshot should exist.");
            Preconditions.checkState((boolean)(previousSerializerSnapshot instanceof MapSerializerSnapshot), (Object)"previous serializer snapshot should be a MapSerializerSnapshot.");
            TypeSerializer newSerializer = ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getStateSerializer();
            Preconditions.checkState((boolean)(newSerializer instanceof MapSerializer), (Object)"new serializer should be a MapSerializer.");
            MapSerializer mapSerializer = (MapSerializer)newSerializer;
            MapSerializerSnapshot mapSerializerSnapshot = (MapSerializerSnapshot)previousSerializerSnapshot;
            if (!RocksDBKeyedStateBackend.checkMapStateKeySchemaCompatibility(mapSerializerSnapshot, mapSerializer)) {
                throw new StateMigrationException("The new serializer for a MapState requires state migration in order for the job to proceed, since the key schema has changed. However, migration for MapState currently only allows value schema evolutions.");
            }
        }
        LOG.info("Performing state migration for state {} because the state serializer's schema, i.e. serialization format, has changed.", stateDesc);
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), ((Object)((Object)this)).getClass());
            throw new FlinkRuntimeException(message);
        }
        Object state = stateFactory.createState(stateDesc, stateMetaInfo, this);
        if (!(state instanceof AbstractRocksDBState)) {
            throw new FlinkRuntimeException("State should be an AbstractRocksDBState but is " + state);
        }
        AbstractRocksDBState rocksDBState = (AbstractRocksDBState)state;
        Snapshot rocksDBSnapshot = this.db.getSnapshot();
        try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(this.db, (ColumnFamilyHandle)stateMetaInfo.f0);
             RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(this.db, this.getWriteOptions());){
            iterator.seekToFirst();
            DataInputDeserializer serializedValueInput = new DataInputDeserializer();
            DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512);
            while (iterator.isValid()) {
                serializedValueInput.setBuffer(iterator.value());
                rocksDBState.migrateSerializedValue(serializedValueInput, migratedSerializedValueOutput, ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getPreviousStateSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)stateMetaInfo.f1).getStateSerializer());
                batchWriter.put((ColumnFamilyHandle)stateMetaInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer());
                migratedSerializedValueOutput.clear();
                iterator.next();
            }
        }
        finally {
            this.db.releaseSnapshot(rocksDBSnapshot);
            rocksDBSnapshot.close();
        }
    }

    private static <UK> boolean checkMapStateKeySchemaCompatibility(MapSerializerSnapshot<?, ?> mapStateSerializerSnapshot, MapSerializer<?, ?> newMapStateSerializer) {
        TypeSerializerSnapshot previousKeySerializerSnapshot = mapStateSerializerSnapshot.getKeySerializerSnapshot();
        TypeSerializer newUserKeySerializer = newMapStateSerializer.getKeySerializer();
        TypeSerializerSchemaCompatibility keyCompatibility = previousKeySerializerSnapshot.resolveSchemaCompatibility(newUserKeySerializer);
        return keyCompatibility.isCompatibleAsIs();
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), ((Object)((Object)this)).getClass());
            throw new FlinkRuntimeException(message);
        }
        Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer, snapshotTransformFactory);
        return stateFactory.createState(stateDesc, registerResult, this);
    }

    File getInstanceBasePath() {
        return this.instanceBasePath;
    }

    public boolean supportsAsynchronousSnapshots() {
        return true;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int count = 0;
        for (RocksDbKvStateInfo metaInfo : this.kvStateInformation.values()) {
            RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(this.db, metaInfo.columnFamilyHandle);
            Throwable throwable = null;
            try {
                rocksIterator.seekToFirst();
                while (rocksIterator.isValid()) {
                    ++count;
                    rocksIterator.next();
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (rocksIterator == null) continue;
                if (throwable != null) {
                    try {
                        rocksIterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                rocksIterator.close();
            }
        }
        return count;
    }

    public boolean requiresLegacySynchronousTimerSnapshots() {
        return this.priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
    }

    @VisibleForTesting
    public void compactState(StateDescriptor<?, ?> stateDesc) throws RocksDBException {
        RocksDbKvStateInfo kvStateInfo = this.kvStateInformation.get(stateDesc.getName());
        this.db.compactRange(kvStateInfo.columnFamilyHandle);
    }

    public static class RocksDbKvStateInfo
    implements AutoCloseable {
        public final ColumnFamilyHandle columnFamilyHandle;
        public final RegisteredStateMetaInfoBase metaInfo;

        public RocksDbKvStateInfo(ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo) {
            this.columnFamilyHandle = columnFamilyHandle;
            this.metaInfo = metaInfo;
        }

        @Override
        public void close() throws Exception {
            this.columnFamilyHandle.close();
        }
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> var2, RocksDBKeyedStateBackend<K> var3) throws Exception;
    }
}

