/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.ttl;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator;
import org.apache.flink.runtime.state.ttl.MockTtlStateTest;
import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
import org.apache.flink.runtime.state.ttl.StateBackendTestContext;
import org.apache.flink.runtime.state.ttl.TtlAggregatingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlFixedLenElemListStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStateAllEntriesTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStatePerElementTestContext;
import org.apache.flink.runtime.state.ttl.TtlMapStatePerNullElementTestContext;
import org.apache.flink.runtime.state.ttl.TtlMergingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlNonFixedLenElemListStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlReducingStateTestContext;
import org.apache.flink.runtime.state.ttl.TtlStateTestContextBase;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValueStateTestContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class TtlStateTestBase {
    protected static final long TTL = 100L;
    private static final int INC_CLEANUP_ALL_KEYS = 970;
    protected MockTtlTimeProvider timeProvider;
    protected StateBackendTestContext sbetc;
    protected static final String UNEXPIRED_AVAIL = "Unexpired state should be available";
    protected static final String UPDATED_UNEXPIRED_AVAIL = "Unexpired state should be available after update";
    protected static final String EXPIRED_UNAVAIL = "Expired state should be unavailable";
    private static final String EXPIRED_AVAIL = "Expired state should be available";
    private StateTtlConfig ttlConfig;
    @Parameterized.Parameter
    public TtlStateTestContextBase<?, ?, ?> ctx;

    @Before
    public void setup() {
        this.timeProvider = new MockTtlTimeProvider();
        this.sbetc = this.createStateBackendTestContext(this.timeProvider);
    }

    protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider var1);

    @Parameterized.Parameters(name="{0}")
    public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
        return Arrays.asList(new TtlValueStateTestContext(), new TtlFixedLenElemListStateTestContext(), new TtlNonFixedLenElemListStateTestContext(), new TtlMapStateAllEntriesTestContext(), new TtlMapStatePerElementTestContext(), new TtlMapStatePerNullElementTestContext(), new TtlAggregatingStateTestContext(), new TtlReducingStateTestContext());
    }

    public boolean fullSnapshot() {
        return true;
    }

    protected <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
        return this.ctx;
    }

    private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
        return (TtlMergingStateTestContext)this.ctx;
    }

    private void initTest() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility visibility) throws Exception {
        this.initTest(updateType, visibility, 100L);
    }

    private void initTest(StateTtlConfig.UpdateType updateType, StateTtlConfig.StateVisibility visibility, long ttl) throws Exception {
        this.initTest(TtlStateTestBase.getConfBuilder(ttl).setUpdateType(updateType).setStateVisibility(visibility).disableCleanupInBackground().build());
    }

    protected static StateTtlConfig.Builder getConfBuilder(long ttl) {
        return StateTtlConfig.newBuilder((Time)Time.milliseconds((long)ttl));
    }

    protected <S extends State> StateDescriptor<S, Object> initTest(StateTtlConfig ttlConfig) throws Exception {
        this.ttlConfig = ttlConfig;
        this.sbetc.createAndRestoreKeyedStateBackend(null);
        this.sbetc.setCurrentKey("defaultKey");
        StateDescriptor<S, Object> stateDesc = this.createState();
        this.ctx().initTestValues();
        return stateDesc;
    }

    private <S extends State> StateDescriptor<S, Object> createState() throws Exception {
        StateDescriptor stateDescriptor = this.ctx().createStateDescriptor();
        stateDescriptor.enableTimeToLive(this.ttlConfig);
        String defaultNamespace = "defaultNamespace";
        this.ctx().ttlState = (InternalKvState)this.sbetc.createState(stateDescriptor, defaultNamespace);
        this.ctx().setCurrentNamespace(defaultNamespace);
        return stateDescriptor;
    }

    private void takeAndRestoreSnapshot() throws Exception {
        this.restoreSnapshot(this.sbetc.takeSnapshot(), 10);
    }

    protected void takeAndRestoreSnapshot(int numberOfKeyGroupsAfterRestore) throws Exception {
        this.restoreSnapshot(this.sbetc.takeSnapshot(), numberOfKeyGroupsAfterRestore);
    }

    private void restoreSnapshot(KeyedStateHandle snapshot, int numberOfKeyGroups) throws Exception {
        this.sbetc.createAndRestoreKeyedStateBackend(numberOfKeyGroups, snapshot);
        this.sbetc.setCurrentKey("defaultKey");
        this.createState();
    }

    protected boolean incrementalCleanupSupported() {
        return false;
    }

    @Test
    public void testNonExistentValue() throws Exception {
        this.initTest();
        Assert.assertEquals((String)"Non-existing state should be empty", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExactExpirationOnWrite() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 20L;
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        this.ctx().update(this.ctx().updateUnexpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)UPDATED_UNEXPIRED_AVAIL, this.ctx().getUnexpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        this.ctx().update(this.ctx().updateExpired);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 220L;
        Assert.assertEquals((String)UPDATED_UNEXPIRED_AVAIL, this.ctx().getUpdateExpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 300L;
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
        Assert.assertTrue((String)"Original state should be cleared on access", (boolean)this.ctx().isOriginalEmptyValue());
    }

    @Test
    public void testRelaxedExpirationOnWrite() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)EXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        Assert.assertTrue((String)"Original state should be cleared on access", (boolean)this.ctx().isOriginalEmptyValue());
        Assert.assertEquals((String)"Expired state should be cleared on access", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExactExpirationOnRead() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        Assert.assertEquals((String)"Unexpired state should be available after read", this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 250L;
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
        Assert.assertTrue((String)"Original state should be cleared on access", (boolean)this.ctx().isOriginalEmptyValue());
    }

    @Test
    public void testRelaxedExpirationOnRead() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnReadAndWrite, StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp);
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        Assert.assertEquals((String)EXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        Assert.assertEquals((String)"Expired state should be cleared on access", this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testExpirationTimestampOverflow() throws Exception {
        this.initTest(StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired, Long.MAX_VALUE);
        this.timeProvider.time = 10L;
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
    }

    @Test
    public void testMergeNamespaces() throws Exception {
        Assume.assumeThat(this.ctx, (Matcher)CoreMatchers.instanceOf(TtlMergingStateTestContext.class));
        this.initTest();
        this.timeProvider.time = 0L;
        List expiredUpdatesToMerge = this.mctx().generateExpiredUpdatesToMerge();
        this.mctx().applyStateUpdates(expiredUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 120L;
        List unexpiredUpdatesToMerge = this.mctx().generateUnexpiredUpdatesToMerge();
        this.mctx().applyStateUpdates(unexpiredUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 150L;
        List finalUpdatesToMerge = this.mctx().generateFinalUpdatesToMerge();
        this.mctx().applyStateUpdates(finalUpdatesToMerge);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 230L;
        ((InternalMergingState)this.mctx().ttlState).mergeNamespaces((Object)"targetNamespace", TtlMergingStateTestContext.NAMESPACES);
        ((InternalMergingState)this.mctx().ttlState).setCurrentNamespace((Object)"targetNamespace");
        Assert.assertEquals((String)"Unexpected result of merge operation", this.mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), this.mctx().get());
    }

    @Test
    public void testMultipleKeys() throws Exception {
        this.initTest();
        this.testMultipleStateIds(id -> this.sbetc.setCurrentKey((String)id), false);
    }

    @Test
    public void testMultipleKeysWithSnapshotCleanup() throws Exception {
        Assume.assumeTrue((String)"full snapshot strategy", (boolean)this.fullSnapshot());
        this.initTest(TtlStateTestBase.getConfBuilder(100L).cleanupFullSnapshot().build());
        this.testMultipleStateIds(id -> this.sbetc.setCurrentKey((String)id), true);
    }

    @Test
    public void testMultipleNamespaces() throws Exception {
        this.initTest();
        this.testMultipleStateIds(id -> this.ctx().ttlState.setCurrentNamespace(id), false);
    }

    @Test
    public void testMultipleNamespacesWithSnapshotCleanup() throws Exception {
        Assume.assumeTrue((String)"full snapshot strategy", (boolean)this.fullSnapshot());
        this.initTest(TtlStateTestBase.getConfBuilder(100L).cleanupFullSnapshot().build());
        this.testMultipleStateIds(id -> this.ctx().ttlState.setCurrentNamespace(id), true);
    }

    private void testMultipleStateIds(Consumer<String> idChanger, boolean timeBackAfterRestore) throws Exception {
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 0L;
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateEmpty);
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 50L;
        idChanger.accept("id1");
        this.ctx().update(this.ctx().updateEmpty);
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateUnexpired);
        this.timeProvider.time = 120L;
        this.takeAndRestoreSnapshot();
        idChanger.accept("id1");
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)UPDATED_UNEXPIRED_AVAIL, this.ctx().getUnexpired, this.ctx().get());
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = 170L;
        idChanger.accept("id2");
        this.ctx().update(this.ctx().updateExpired);
        this.timeProvider.time = 230L;
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = timeBackAfterRestore ? 170L : this.timeProvider.time;
        idChanger.accept("id1");
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)UPDATED_UNEXPIRED_AVAIL, this.ctx().getUpdateExpired, this.ctx().get());
        this.timeProvider.time = 300L;
        this.takeAndRestoreSnapshot();
        this.timeProvider.time = timeBackAfterRestore ? 230L : this.timeProvider.time;
        idChanger.accept("id1");
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
        idChanger.accept("id2");
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
    }

    @Test
    public void testSnapshotChangeRestore() throws Exception {
        this.initTest();
        this.timeProvider.time = 0L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateEmpty);
        this.timeProvider.time = 50L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateUnexpired);
        this.timeProvider.time = 100L;
        this.sbetc.setCurrentKey("k2");
        this.ctx().update(this.ctx().updateEmpty);
        KeyedStateHandle snapshot = this.sbetc.takeSnapshot();
        this.timeProvider.time = 170L;
        this.sbetc.setCurrentKey("k1");
        this.ctx().update(this.ctx().updateExpired);
        this.sbetc.setCurrentKey("k2");
        this.ctx().update(this.ctx().updateUnexpired);
        this.restoreSnapshot(snapshot, 10);
        this.timeProvider.time = 180L;
        this.sbetc.setCurrentKey("k1");
        Assert.assertEquals((String)EXPIRED_UNAVAIL, this.ctx().emptyValue, this.ctx().get());
        this.sbetc.setCurrentKey("k2");
        Assert.assertEquals((String)UNEXPIRED_AVAIL, this.ctx().getUpdateEmpty, this.ctx().get());
    }

    @Test(expected=StateMigrationException.class)
    public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
        Assume.assumeThat((Object)this, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.instanceOf(MockTtlStateTest.class)));
        this.initTest();
        this.timeProvider.time = 0L;
        this.ctx().update(this.ctx().updateEmpty);
        KeyedStateHandle snapshot = this.sbetc.takeSnapshot();
        this.sbetc.createAndRestoreKeyedStateBackend(snapshot);
        this.sbetc.setCurrentKey("defaultKey");
        this.sbetc.createState(this.ctx().createStateDescriptor(), "");
    }

    @Test
    public void testIncrementalCleanupWholeState() throws Exception {
        int i;
        Assume.assumeTrue((boolean)this.incrementalCleanupSupported());
        this.initTest(TtlStateTestBase.getConfBuilder(100L).cleanupIncrementally(5, true).build());
        this.timeProvider.time = 0L;
        this.updateKeys(0, 970, this.ctx().updateEmpty);
        this.timeProvider.time = 120L;
        for (i = 0; i < 970; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
        }
        for (i = 0; i < 970; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            Assert.assertTrue((String)"Original state should be cleared", (boolean)this.isOriginalCleared());
        }
    }

    @Test
    public void testIncrementalCleanup() throws Exception {
        Assume.assumeTrue((boolean)this.incrementalCleanupSupported());
        this.initTest(TtlStateTestBase.getConfBuilder(100L).cleanupIncrementally(5, true).build());
        int keysToUpdate = 160;
        this.timeProvider.time = 0L;
        this.updateKeys(0, 970, this.ctx().updateEmpty);
        this.timeProvider.time = 50L;
        this.updateKeys(0, 160, this.ctx().updateUnexpired);
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture = this.sbetc.triggerSnapshot();
        this.updateKeys(160, 320, this.ctx().updateUnexpired);
        this.timeProvider.time = 120L;
        this.triggerMoreIncrementalCleanupByOtherOps();
        this.checkExpiredKeys(320, 970);
        KeyedStateHandle snapshot = (KeyedStateHandle)((SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot();
        this.timeProvider.time = 50L;
        this.restoreSnapshot(snapshot, 10);
        this.checkUnexpiredKeys(160, 970, this.ctx().getUpdateEmpty);
        this.timeProvider.time = 120L;
        for (int i = 80; i < 200; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            this.ctx().ttlState.clear();
        }
        this.checkUnexpiredKeys(0, 80, this.ctx().getUnexpired);
        this.triggerMoreIncrementalCleanupByOtherOps();
        this.checkExpiredKeys(160, 320);
        this.timeProvider.time = 170L;
        this.checkExpiredKeys(80, 970);
        this.checkExpiredKeys(0, 80);
    }

    private <T> void updateKeys(int startKey, int endKey, T value) throws Exception {
        for (int i = startKey; i < endKey; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            this.ctx().update(value);
        }
    }

    private <T> void checkUnexpiredKeys(int startKey, int endKey, T value) throws Exception {
        for (int i = startKey; i < endKey; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            Assert.assertEquals((String)UNEXPIRED_AVAIL, value, this.ctx().get());
        }
    }

    private void checkExpiredKeys(int startKey, int endKey) throws Exception {
        for (int i = startKey; i < endKey; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            Assert.assertTrue((String)"Original state should be cleared", (boolean)this.ctx().isOriginalEmptyValue());
        }
    }

    private void triggerMoreIncrementalCleanupByOtherOps() throws Exception {
        for (int i = 970; i < 1940; ++i) {
            this.sbetc.setCurrentKey(Integer.toString(i));
            if (i % 2 == 0) {
                this.ctx().get();
                continue;
            }
            this.ctx().update(this.ctx().updateEmpty);
        }
    }

    private boolean isOriginalCleared() {
        InternalKvState original = (InternalKvState)((AbstractTtlDecorator)this.ctx.ttlState).original;
        Preconditions.checkState((boolean)(original instanceof AbstractHeapState));
        return ((AbstractHeapState)original).getStateTable().get((Object)this.ctx.currentNamespace) == null;
    }

    @After
    public void tearDown() throws Exception {
        this.sbetc.dispose();
    }
}

