/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory;
import org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBKeyedStateBackendBuilder<K>
extends AbstractKeyedStateBackendBuilder<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class);
    static final String DB_INSTANCE_DIR_STRING = "db";
    private final String operatorIdentifier;
    private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final DBOptions dbOptions;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final MetricGroup metricGroup;
    private boolean enableIncrementalCheckpointing;
    private boolean enableTtlCompactionFilter;
    private RocksDBNativeMetricOptions nativeMetricOptions;
    private int numberOfTransferingThreads;
    private RocksDB injectedTestDB;
    private ColumnFamilyHandle injectedDefaultColumnFamilyHandle;

    public RocksDBKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, CloseableRegistry cancelStreamRegistry) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.operatorIdentifier = operatorIdentifier;
        this.priorityQueueStateType = priorityQueueStateType;
        this.localRecoveryConfig = localRecoveryConfig;
        this.columnFamilyOptionsFactory = (Function)Preconditions.checkNotNull(columnFamilyOptionsFactory);
        this.dbOptions = dbOptions;
        this.instanceBasePath = instanceBasePath;
        this.instanceRocksDBPath = new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
        this.metricGroup = metricGroup;
        this.enableIncrementalCheckpointing = false;
        this.nativeMetricOptions = new RocksDBNativeMetricOptions();
        this.numberOfTransferingThreads = (Integer)RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue();
    }

    @VisibleForTesting
    RocksDBKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, RocksDB injectedTestDB, ColumnFamilyHandle injectedDefaultColumnFamilyHandle, CloseableRegistry cancelStreamRegistry) {
        this(operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptionsFactory, kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, localRecoveryConfig, priorityQueueStateType, ttlTimeProvider, metricGroup, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.injectedTestDB = injectedTestDB;
        this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle;
    }

    RocksDBKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter(boolean enableTtlCompactionFilter) {
        this.enableTtlCompactionFilter = enableTtlCompactionFilter;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
        this.nativeMetricOptions = nativeMetricOptions;
        return this;
    }

    RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int numberOfTransferingThreads) {
        this.numberOfTransferingThreads = numberOfTransferingThreads;
        return this;
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
        PriorityQueueSetFactory priorityQueueFactory;
        SnapshotStrategy<K> snapshotStrategy;
        RocksDBSerializedCompositeKeyBuilder sharedRocksKeyBuilder;
        RocksDBWriteBatchWrapper writeBatchWrapper = null;
        ColumnFamilyHandle defaultColumnFamilyHandle = null;
        RocksDBNativeMetricMonitor nativeMetricMonitor = null;
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        WriteOptions writeOptions = new WriteOptions().setDisableWAL(true);
        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>();
        RocksDB db = null;
        AbstractRocksDBRestoreOperation<K> restoreOperation = null;
        RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = new RocksDbTtlCompactFiltersManager(this.enableTtlCompactionFilter, this.ttlTimeProvider);
        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
        int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(this.numberOfKeyGroups);
        try {
            UUID backendUID = UUID.randomUUID();
            TreeMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap();
            long lastCompletedCheckpointId = -1L;
            if (this.injectedTestDB != null) {
                db = this.injectedTestDB;
                defaultColumnFamilyHandle = this.injectedDefaultColumnFamilyHandle;
                nativeMetricMonitor = this.nativeMetricOptions.isEnabled() ? new RocksDBNativeMetricMonitor(this.nativeMetricOptions, this.metricGroup, db) : null;
            } else {
                this.prepareDirectories();
                restoreOperation = this.getRocksDBRestoreOperation(keyGroupPrefixBytes, this.cancelStreamRegistry, kvStateInformation, ttlCompactFiltersManager);
                RocksDBRestoreResult restoreResult = restoreOperation.restore();
                db = restoreResult.getDb();
                defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
                nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
                if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
                    backendUID = restoreResult.getBackendUID();
                    materializedSstFiles = restoreResult.getRestoredSstFiles();
                    lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
                }
            }
            writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
            sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer(), keyGroupPrefixBytes, 32);
            snapshotStrategy = this.initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard, kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
            priorityQueueFactory = this.initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, writeBatchWrapper, nativeMetricMonitor);
        }
        catch (Throwable e) {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(kvStateInformation.values().size());
            IOUtils.closeQuietly((AutoCloseable)cancelStreamRegistryForBackend);
            IOUtils.closeQuietly(writeBatchWrapper);
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, defaultColumnFamilyHandle);
            IOUtils.closeQuietly((AutoCloseable)defaultColumnFamilyHandle);
            IOUtils.closeQuietly(nativeMetricMonitor);
            for (RocksDBKeyedStateBackend.RocksDbKvStateInfo kvStateInfo : kvStateInformation.values()) {
                RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, kvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly((AutoCloseable)kvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly((AutoCloseable)db);
            IOUtils.closeQuietly(restoreOperation);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            IOUtils.closeQuietly((AutoCloseable)writeOptions);
            ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            kvStateInformation.clear();
            try {
                FileUtils.deleteDirectory((File)this.instanceBasePath);
            }
            catch (Exception ex) {
                LOG.warn("Failed to instance base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
            }
            if (e instanceof BackendBuildingException) {
                throw (BackendBuildingException)e;
            }
            String errMsg = "Caught unexpected exception.";
            LOG.error(errMsg, e);
            throw new BackendBuildingException(errMsg, e);
        }
        InternalKeyContextImpl keyContext = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
        return new RocksDBKeyedStateBackend(this.userCodeClassLoader, this.instanceBasePath, this.dbOptions, this.columnFamilyOptionsFactory, this.kvStateRegistry, this.keySerializerProvider.currentSchemaSerializer(), this.executionConfig, this.ttlTimeProvider, db, kvStateInformation, keyGroupPrefixBytes, cancelStreamRegistryForBackend, this.keyGroupCompressionDecorator, rocksDBResourceGuard, snapshotStrategy.checkpointSnapshotStrategy, snapshotStrategy.savepointSnapshotStrategy, writeBatchWrapper, defaultColumnFamilyHandle, nativeMetricMonitor, sharedRocksKeyBuilder, priorityQueueFactory, ttlCompactFiltersManager, keyContext);
    }

    private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
        if (this.restoreStateHandles.isEmpty()) {
            return new RocksDBNoneRestoreOperation(this.keyGroupRange, keyGroupPrefixBytes, this.numberOfTransferingThreads, cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, this.dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, ttlCompactFiltersManager);
        }
        KeyedStateHandle firstStateHandle = (KeyedStateHandle)this.restoreStateHandles.iterator().next();
        if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
            return new RocksDBIncrementalRestoreOperation(this.operatorIdentifier, this.keyGroupRange, keyGroupPrefixBytes, this.numberOfTransferingThreads, cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, this.dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, ttlCompactFiltersManager);
        }
        return new RocksDBFullRestoreOperation(this.keyGroupRange, keyGroupPrefixBytes, this.numberOfTransferingThreads, cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, this.dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, ttlCompactFiltersManager);
    }

    private SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies(CloseableRegistry cancelStreamRegistry, ResourceGuard rocksDBResourceGuard, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, RocksDB db, UUID backendUID, SortedMap<Long, Set<StateHandleID>> materializedSstFiles, long lastCompletedCheckpointId) {
        RocksFullSnapshotStrategy savepointSnapshotStrategy = new RocksFullSnapshotStrategy(db, rocksDBResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, this.localRecoveryConfig, cancelStreamRegistry, this.keyGroupCompressionDecorator);
        RocksDBSnapshotStrategyBase checkpointSnapshotStrategy = this.enableIncrementalCheckpointing ? new RocksIncrementalSnapshotStrategy(db, rocksDBResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, this.localRecoveryConfig, cancelStreamRegistry, this.instanceBasePath, backendUID, materializedSstFiles, lastCompletedCheckpointId, this.numberOfTransferingThreads) : savepointSnapshotStrategy;
        return new SnapshotStrategy(checkpointSnapshotStrategy, savepointSnapshotStrategy);
    }

    private PriorityQueueSetFactory initPriorityQueueFactory(int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) {
        Object priorityQueueFactory;
        switch (this.priorityQueueStateType) {
            case HEAP: {
                priorityQueueFactory = new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
                break;
            }
            case ROCKSDB: {
                priorityQueueFactory = new RocksDBPriorityQueueSetFactory(this.keyGroupRange, keyGroupPrefixBytes, this.numberOfKeyGroups, kvStateInformation, db, writeBatchWrapper, nativeMetricMonitor, this.columnFamilyOptionsFactory);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown priority queue state type: " + (Object)((Object)this.priorityQueueStateType));
            }
        }
        return priorityQueueFactory;
    }

    private void prepareDirectories() throws IOException {
        RocksDBKeyedStateBackendBuilder.checkAndCreateDirectory(this.instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
    }

    static final class SnapshotStrategy<K> {
        final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
        final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;

        SnapshotStrategy(RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy, RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy) {
            this.checkpointSnapshotStrategy = checkpointSnapshotStrategy;
            this.savepointSnapshotStrategy = savepointSnapshotStrategy;
        }
    }
}

