/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageFactory;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CheckpointStorageLoaderTest {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private final ClassLoader cl = this.getClass().getClassLoader();

    @Test
    public void testNoCheckpointStorageDefined() throws Exception {
        Assert.assertFalse((boolean)CheckpointStorageLoader.fromConfig((ReadableConfig)new Configuration(), (ClassLoader)this.cl, null).isPresent());
    }

    @Test
    public void testLegacyStateBackendTakesPrecedence() throws Exception {
        LegacyStateBackend legacy = new LegacyStateBackend();
        MockStorage storage = new MockStorage();
        CheckpointStorage configured = CheckpointStorageLoader.load((CheckpointStorage)storage, null, (StateBackend)legacy, (Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertEquals((String)"Legacy state backends should always take precedence", (Object)legacy, (Object)configured);
    }

    @Test
    public void testModernStateBackendDoesNotTakePrecedence() throws Exception {
        ModernStateBackend modern = new ModernStateBackend();
        MockStorage storage = new MockStorage();
        CheckpointStorage configured = CheckpointStorageLoader.load((CheckpointStorage)storage, null, (StateBackend)modern, (Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertEquals((String)"Modern state backends should never take precedence", (Object)storage, (Object)configured);
    }

    @Test
    public void testLoadingFromFactory() throws Exception {
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)WorkingFactory.class.getName());
        CheckpointStorage storage = CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)storage, (Matcher)Matchers.instanceOf(MockStorage.class));
    }

    @Test
    public void testDefaultCheckpointStorage() throws Exception {
        CheckpointStorage storage1 = CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (ClassLoader)this.cl, null);
        Assert.assertThat((Object)storage1, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir);
        CheckpointStorage storage2 = CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)storage2, (Matcher)Matchers.instanceOf(FileSystemCheckpointStorage.class));
    }

    @Test
    public void testLoadingFails() throws Exception {
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"does.not.exist");
        try {
            CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)File.class.getName());
        try {
            CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with exception");
        }
        catch (DynamicCodeLoadingException dynamicCodeLoadingException) {
            // empty catch block
        }
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)FailingFactory.class.getName());
        try {
            CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
            Assert.fail((String)"should fail with exception");
        }
        catch (IllegalConfigurationException illegalConfigurationException) {
            // empty catch block
        }
    }

    @Test
    public void testLoadJobManagerStorageNoParameters() throws Exception {
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        CheckpointStorage storage = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config, (ClassLoader)this.cl, null).get();
        Assert.assertThat((Object)storage, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
    }

    @Test
    public void testLoadJobManagerStorageWithParameters() throws Exception {
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedSavepointPath = new Path(savepointDir);
        Configuration config1 = new Configuration();
        config1.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config1.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        CheckpointStorage storage1 = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null).get();
        Assert.assertThat((Object)storage1, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertEquals((Object)expectedSavepointPath, (Object)((JobManagerCheckpointStorage)storage1).getSavepointPath());
    }

    @Test
    public void testConfigureJobManagerStorage() throws Exception {
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedSavepointPath = new Path(savepointDir);
        int maxSize = 100;
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        CheckpointStorage storage = CheckpointStorageLoader.load((CheckpointStorage)new JobManagerCheckpointStorage(100), null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)storage, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage)storage;
        Assert.assertThat((Object)jmStorage.getSavepointPath(), CheckpointStorageLoaderTest.normalizedPath(expectedSavepointPath));
        Assert.assertEquals((long)100L, (long)jmStorage.getMaxStateSize());
    }

    @Test
    public void testConfigureJobManagerStorageWithParameters() throws Exception {
        String savepointDirConfig = new Path(this.tmp.newFolder().toURI()).toString();
        Path savepointDirJob = new Path(this.tmp.newFolder().toURI());
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDirConfig);
        CheckpointStorage storage = CheckpointStorageLoader.load((CheckpointStorage)new JobManagerCheckpointStorage(), (Path)savepointDirJob, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)storage, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage)storage;
        Assert.assertThat((Object)jmStorage.getSavepointPath(), CheckpointStorageLoaderTest.normalizedPath(savepointDirJob));
    }

    @Test
    public void testLoadFileSystemCheckpointStorage() throws Exception {
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedCheckpointsPath = new Path(checkpointDir);
        Path expectedSavepointsPath = new Path(savepointDir);
        MemorySize threshold = MemorySize.parse((String)"900kb");
        int minWriteBufferSize = 1024;
        Configuration config1 = new Configuration();
        config1.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        config1.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir);
        config1.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)threshold);
        config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 1024);
        CheckpointStorage storage1 = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null).get();
        Assert.assertThat((Object)storage1, (Matcher)Matchers.instanceOf(FileSystemCheckpointStorage.class));
        FileSystemCheckpointStorage fs1 = (FileSystemCheckpointStorage)storage1;
        Assert.assertThat((Object)fs1.getCheckpointPath(), CheckpointStorageLoaderTest.normalizedPath(expectedCheckpointsPath));
        Assert.assertThat((Object)fs1.getSavepointPath(), CheckpointStorageLoaderTest.normalizedPath(expectedSavepointsPath));
        Assert.assertEquals((long)threshold.getBytes(), (long)fs1.getMinFileSizeThreshold());
        Assert.assertEquals((long)Math.max(threshold.getBytes(), 1024L), (long)fs1.getWriteBufferSize());
    }

    @Test
    public void testLoadFileSystemCheckpointStorageMixed() throws Exception {
        Path appCheckpointDir = new Path(this.tmp.newFolder().toURI());
        String checkpointDir = new Path(this.tmp.newFolder().toURI()).toString();
        String savepointDir = new Path(this.tmp.newFolder().toURI()).toString();
        Path expectedSavepointsPath = new Path(savepointDir);
        int threshold = 1000000;
        int writeBufferSize = 4000000;
        FileSystemCheckpointStorage storage = new FileSystemCheckpointStorage(appCheckpointDir, 1000000, 4000000);
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir);
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.parse((String)"20"));
        config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000);
        CheckpointStorage loadedStorage = CheckpointStorageLoader.load((CheckpointStorage)storage, null, (StateBackend)new ModernStateBackend(), (Configuration)config, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)loadedStorage, (Matcher)Matchers.instanceOf(FileSystemCheckpointStorage.class));
        FileSystemCheckpointStorage fs = (FileSystemCheckpointStorage)loadedStorage;
        Assert.assertThat((Object)fs.getCheckpointPath(), CheckpointStorageLoaderTest.normalizedPath(appCheckpointDir));
        Assert.assertThat((Object)fs.getSavepointPath(), CheckpointStorageLoaderTest.normalizedPath(expectedSavepointsPath));
        Assert.assertEquals((long)1000000L, (long)fs.getMinFileSizeThreshold());
        Assert.assertEquals((long)4000000L, (long)fs.getWriteBufferSize());
    }

    @Test
    public void testHighAvailabilityDefault() throws Exception {
        String haPersistenceDir = new Path(this.tmp.newFolder().toURI()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(this.tmp.newFolder().toURI().toString());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    @Test
    public void testHighAvailabilityDefaultLocalPaths() throws Exception {
        String haPersistenceDir = new Path(this.tmp.newFolder().getAbsolutePath()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(this.tmp.newFolder().toURI().toString()).makeQualified(FileSystem.getLocalFileSystem());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    private void testMemoryBackendHighAvailabilityDefault(String haPersistenceDir, Path checkpointPath) throws Exception {
        Configuration config1 = new Configuration();
        config1.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        config1.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)"myCluster");
        config1.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)haPersistenceDir);
        Configuration config2 = new Configuration();
        config2.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config2.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        config2.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)"myCluster");
        config2.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)haPersistenceDir);
        if (checkpointPath != null) {
            config1.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointPath.toUri().toString());
            config2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointPath.toUri().toString());
        }
        JobManagerCheckpointStorage storage = new JobManagerCheckpointStorage();
        CheckpointStorage loaded1 = CheckpointStorageLoader.load((CheckpointStorage)storage, null, (StateBackend)new ModernStateBackend(), (Configuration)config1, (ClassLoader)this.cl, null);
        CheckpointStorage loaded2 = CheckpointStorageLoader.load(null, null, (StateBackend)new ModernStateBackend(), (Configuration)config2, (ClassLoader)this.cl, null);
        Assert.assertThat((Object)loaded1, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertThat((Object)loaded2, (Matcher)Matchers.instanceOf(JobManagerCheckpointStorage.class));
        JobManagerCheckpointStorage memStorage1 = (JobManagerCheckpointStorage)loaded1;
        JobManagerCheckpointStorage memStorage2 = (JobManagerCheckpointStorage)loaded2;
        Assert.assertNull((Object)memStorage1.getSavepointPath());
        Assert.assertNull((Object)memStorage2.getSavepointPath());
        if (checkpointPath != null) {
            Assert.assertThat((Object)memStorage1.getCheckpointPath(), CheckpointStorageLoaderTest.normalizedPath(checkpointPath));
            Assert.assertThat((Object)memStorage2.getCheckpointPath(), CheckpointStorageLoaderTest.normalizedPath(checkpointPath));
        } else {
            Assert.assertNull((Object)memStorage1.getCheckpointPath());
            Assert.assertNull((Object)memStorage2.getCheckpointPath());
        }
    }

    private static Matcher<Path> normalizedPath(Path expected) {
        return new NormalizedPathMatcher(expected);
    }

    private static class NormalizedPathMatcher
    extends TypeSafeMatcher<Path> {
        private final Path reNormalizedExpected;

        private NormalizedPathMatcher(Path expected) {
            this.reNormalizedExpected = expected == null ? null : new Path(expected.toString());
        }

        protected boolean matchesSafely(Path actual) {
            if (this.reNormalizedExpected == null) {
                return actual == null;
            }
            Path reNormalizedActual = new Path(actual.toString());
            return this.reNormalizedExpected.equals((Object)reNormalizedActual);
        }

        public void describeTo(Description description) {
            description.appendValue((Object)this.reNormalizedExpected);
        }
    }

    static final class FailingFactory
    implements CheckpointStorageFactory<CheckpointStorage> {
        FailingFactory() {
        }

        public CheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            throw new IllegalConfigurationException("fail!");
        }
    }

    static final class WorkingFactory
    implements CheckpointStorageFactory<MockStorage> {
        WorkingFactory() {
        }

        public MockStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            return new MockStorage();
        }
    }

    static final class MockStorage
    implements CheckpointStorage {
        MockStorage() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return null;
        }
    }

    static final class ModernStateBackend
    implements StateBackend {
        ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return null;
        }
    }

    static final class LegacyStateBackend
    implements StateBackend,
    CheckpointStorage {
        LegacyStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return null;
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return null;
        }
    }
}

