/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ZooKeeperCompletedCheckpointStoreTest
extends TestLogger {
    @ClassRule
    public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    private static final ZooKeeperCheckpointStoreUtil zooKeeperCheckpointStoreUtil = ZooKeeperCheckpointStoreUtil.INSTANCE;

    @Test
    public void testPathConversion() {
        long checkpointId = 42L;
        String path = zooKeeperCheckpointStoreUtil.checkpointIDToName(42L);
        Assert.assertEquals((long)42L, (long)zooKeeperCheckpointStoreUtil.nameToCheckpointID(path));
    }

    @Test(expected=ExpectedTestException.class)
    public void testRecoverFailsIfDownloadFails() throws Exception {
        this.testDownloadInternal((TriConsumer<CompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                checkpointsInZk.add(this.createHandle(1L, id -> {
                    throw new ExpectedTestException();
                }));
                store.recover();
            }
            catch (Exception exception) {
                ExceptionUtils.findThrowable((Throwable)exception, ExpectedTestException.class).ifPresent(ExceptionUtils::rethrow);
                ExceptionUtils.rethrow((Throwable)exception);
            }
        }));
    }

    @Test
    public void testNoDownloadIfCheckpointsNotChanged() throws Exception {
        this.testDownloadInternal((TriConsumer<CompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                checkpointsInZk.add(this.createHandle(1L, id -> {
                    throw new AssertionError((Object)("retrieveState was attempted for checkpoint " + id));
                }));
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry), new CheckpointsCleaner(), () -> {});
                store.recover();
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }));
    }

    @Test
    public void testDownloadIfCheckpointsChanged() throws Exception {
        this.testDownloadInternal((TriConsumer<CompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                int lastInZk = 10;
                IntStream.range(0, lastInZk + 1).forEach(i -> checkpointsInZk.add(this.createHandle(i, id -> CompletedCheckpointStoreTest.createCheckpoint(id, sharedStateRegistry))));
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry), new CheckpointsCleaner(), () -> {});
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(5L, sharedStateRegistry), new CheckpointsCleaner(), () -> {});
                store.recover();
                Assert.assertEquals((long)lastInZk, (long)store.getLatestCheckpoint(false).getCheckpointID());
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDownloadInternal(TriConsumer<CompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry> test) throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        final ArrayList checkpointsInZk = new ArrayList();
        ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = new ZooKeeperStateHandleStore<CompletedCheckpoint>(ZooKeeperUtils.startCuratorFramework((Configuration)configuration), new TestingRetrievableStateStorageHelper()){

            public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> getAllAndLock() {
                return checkpointsInZk;
            }
        };
        DefaultCompletedCheckpointStore store = new DefaultCompletedCheckpointStore(10, (StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)zooKeeperCheckpointStoreUtil, Executors.directExecutor());
        try {
            test.accept((Object)store, checkpointsInZk, (Object)sharedStateRegistry);
        }
        finally {
            store.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
            sharedStateRegistry.close();
        }
    }

    private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle(long id, Function<Long, CompletedCheckpoint> checkpointSupplier) {
        return Tuple2.of((Object)new CheckpointStateHandle(checkpointSupplier, id), (Object)zooKeeperCheckpointStoreUtil.checkpointIDToName(id));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscardingSubsumedCheckpoints() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        CompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(client);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint1, new CheckpointsCleaner(), () -> {});
            Assert.assertThat((Object)checkpointStore.getAllCheckpoints(), (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1}));
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint2, new CheckpointsCleaner(), () -> {});
            List allCheckpoints = checkpointStore.getAllCheckpoints();
            Assert.assertThat((Object)allCheckpoints, (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint2}));
            Assert.assertThat((Object)allCheckpoints, (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1})));
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscardingCheckpointsAtShutDown() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        CompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(client);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint1, new CheckpointsCleaner(), () -> {});
            Assert.assertThat((Object)checkpointStore.getAllCheckpoints(), (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1}));
            checkpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            client.close();
        }
    }

    @Nonnull
    private CompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
        ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore((CuratorFramework)client, (String)"/checkpoints", new TestingRetrievableStateStorageHelper());
        return new DefaultCompletedCheckpointStore(1, (StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)zooKeeperCheckpointStoreUtil, Executors.directExecutor());
    }

    @Test
    public void testAddCheckpointWithFailedRemove() throws Exception {
        boolean numCheckpointsToRetain = true;
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        CompletedCheckpointStore store = this.createZooKeeperCheckpointStore(client);
        CountDownLatch discardAttempted = new CountDownLatch(1);
        for (long i = 0L; i < 2L; ++i) {
            CompletedCheckpoint checkpointToAdd = new CompletedCheckpoint(new JobID(), i, i, i, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
            store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {
                discardAttempted.countDown();
                throw new RuntimeException();
            });
        }
        discardAttempted.await();
    }

    private static class CheckpointStateHandle
    implements RetrievableStateHandle<CompletedCheckpoint> {
        private static final long serialVersionUID = 1L;
        private final Function<Long, CompletedCheckpoint> checkpointSupplier;
        private final long id;

        CheckpointStateHandle(Function<Long, CompletedCheckpoint> checkpointSupplier, long id) {
            this.checkpointSupplier = checkpointSupplier;
            this.id = id;
        }

        public CompletedCheckpoint retrieveState() {
            return this.checkpointSupplier.apply(this.id);
        }

        public void discardState() {
        }

        public long getStateSize() {
            return 0L;
        }
    }
}

