/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorTestBase;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Test;

public class SourceCoordinatorTest
extends SourceCoordinatorTestBase {
    @Test
    public void testThrowExceptionWhenNotStarted() {
        String failureMessage = "Call should fail when source coordinator has not started yet.";
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.notifyCheckpointComplete(100L)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.handleEventFromOperator(0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.subtaskFailed(0, null)), failureMessage, "The coordinator has not started yet.");
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture())), failureMessage, "The coordinator has not started yet.");
    }

    @Test
    public void testRestCheckpointAfterCoordinatorStarted() throws Exception {
        this.sourceCoordinator.start();
        CoordinatorTestUtils.verifyException((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.sourceCoordinator.resetToCheckpoint(0L, null)), "Reset to checkpoint should fail after the coordinator has started", "The coordinator can only be reset if it was not yet started");
    }

    @Test
    public void testStart() throws Exception {
        this.sourceCoordinator.start();
        this.waitForCoordinatorToProcessActions();
        Assert.assertTrue((boolean)this.getEnumerator().isStarted());
    }

    @Test
    public void testClosed() throws Exception {
        this.sourceCoordinator.start();
        this.sourceCoordinator.close();
        Assert.assertTrue((boolean)this.getEnumerator().isClosed());
    }

    @Test
    public void testHandleSourceEvent() throws Exception {
        this.sourceReady();
        SourceEvent sourceEvent = new SourceEvent(){};
        this.sourceCoordinator.handleEventFromOperator(0, (OperatorEvent)new SourceEventWrapper(sourceEvent));
        this.waitForCoordinatorToProcessActions();
        Assert.assertEquals((long)1L, (long)this.getEnumerator().getHandledSourceEvent().size());
        Assert.assertEquals((Object)sourceEvent, (Object)this.getEnumerator().getHandledSourceEvent().get(0));
    }

    @Test
    public void testCheckpointCoordinatorAndRestore() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        CompletableFuture checkpointFuture = new CompletableFuture();
        this.sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
        byte[] bytes = (byte[])checkpointFuture.get();
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(100L, bytes);
        TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        Assert.assertEquals((String)"2 splits should have been assigned to reader 0", (long)4L, (long)restoredEnumerator.getUnassignedSplits().size());
        Assert.assertTrue((boolean)restoredEnumerator.getContext().registeredReaders().isEmpty());
        Assert.assertEquals((String)"Registered readers should not be recovered by restoring", (long)0L, (long)restoredContext.registeredReaders().size());
    }

    @Test
    public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.getEnumerator().addNewSplits((SourceSplit[])new MockSourceSplit[]{new MockSourceSplit(6)});
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(101L, new CompletableFuture());
        this.waitForCoordinatorToProcessActions();
        Assert.assertEquals((long)4L, (long)this.getEnumerator().getUnassignedSplits().size());
        Assert.assertTrue((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
        CoordinatorTestUtils.verifyAssignment(Arrays.asList("0", "1"), (Collection)((Map)this.splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L)).get(0));
        CoordinatorTestUtils.verifyAssignment(Collections.singletonList("2"), (Collection)this.splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
        this.sourceCoordinator.subtaskFailed(0, null);
        this.sourceCoordinator.subtaskReset(0, 99L);
        this.waitForCoordinatorToProcessActions();
        Assert.assertFalse((String)"Reader 0 should have been unregistered.", (boolean)this.context.registeredReaders().containsKey(0));
        for (Map assignment : this.splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
            Assert.assertFalse((String)"Assignment in uncompleted checkpoint should have been reverted.", (boolean)assignment.containsKey(0));
        }
        Assert.assertFalse((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
        Assert.assertEquals((long)7L, (long)this.getEnumerator().getUnassignedSplits().size());
    }

    @Test
    public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        this.registerReader(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.getEnumerator().executeAssignOneSplit(0);
        this.sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture());
        this.sourceCoordinator.notifyCheckpointComplete(100L);
        this.sourceCoordinator.subtaskFailed(0, null);
        this.waitForCoordinatorToProcessActions();
        Assert.assertEquals((long)100L, (long)this.getEnumerator().getSuccessfulCheckpoints().get(0));
        Assert.assertFalse((boolean)this.context.registeredReaders().containsKey(0));
        Assert.assertEquals((long)4L, (long)this.getEnumerator().getUnassignedSplits().size());
        Assert.assertFalse((boolean)this.splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
        Assert.assertTrue((boolean)this.splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
    }

    @Test
    public void testFailJobWhenExceptionThrownFromStart() throws Exception {
        final RuntimeException failureReason = new RuntimeException("Artificial Exception");
        MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)new MockSplitEnumeratorContext(1)){

            public void start() {
                throw failureReason;
            }
        };
        SourceCoordinator coordinator = new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testFailJobWhenExceptionThrownFromStart$5((SplitEnumerator)splitEnumerator)), this.context);
        coordinator.start();
        CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
        Assert.assertEquals((Object)failureReason, (Object)this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testErrorThrownFromSplitEnumerator() throws Exception {
        final Error error = new Error("Test Error");
        MockSplitEnumerator splitEnumerator = new MockSplitEnumerator(1, (SplitEnumeratorContext)new MockSplitEnumeratorContext(1)){

            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                throw error;
            }
        };
        SourceCoordinator coordinator = new SourceCoordinator("TestOperator", this.coordinatorExecutor, new EnumeratorCreatingSource(() -> SourceCoordinatorTest.lambda$testErrorThrownFromSplitEnumerator$7((SplitEnumerator)splitEnumerator)), this.context);
        coordinator.start();
        coordinator.handleEventFromOperator(1, (OperatorEvent)new SourceEventWrapper(new SourceEvent(){}));
        CommonTestUtils.waitUtil(() -> this.operatorCoordinatorContext.isJobFailed(), (Duration)Duration.ofSeconds(10L), (String)"The job should have failed due to the artificial exception.");
        Assert.assertEquals((Object)error, (Object)this.operatorCoordinatorContext.getJobFailureReason());
    }

    @Test
    public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.createEnumeratorFuture.get();
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.constructorClassLoader);
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.threadClassLoader.get());
        coordinator.close();
    }

    @Test
    public void testUserClassLoaderWhenRestoringEnumerator() throws Exception {
        URLClassLoader testClassLoader = new URLClassLoader(new URL[0]);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader);
        EnumeratorCreatingSource source = new EnumeratorCreatingSource(ClassLoaderTestEnumerator::new);
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("testOperator", context.getOperatorId(), source, 1);
        OperatorCoordinator coordinator = provider.getCoordinator((OperatorCoordinator.Context)context);
        coordinator.resetToCheckpoint(1L, SourceCoordinatorTest.createEmptyCheckpoint());
        coordinator.start();
        ClassLoaderTestEnumerator enumerator = (ClassLoaderTestEnumerator)source.restoreEnumeratorFuture.get();
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.constructorClassLoader);
        Assert.assertSame((Object)testClassLoader, (Object)enumerator.threadClassLoader.get());
        coordinator.close();
    }

    @Test
    public void testSerdeBackwardCompatibility() throws Exception {
        this.sourceReady();
        this.addTestingSplitSet(6);
        TestingSplitEnumerator<MockSourceSplit> enumerator = this.getEnumerator();
        HashSet<MockSourceSplit> splits = new HashSet<MockSourceSplit>();
        enumerator.runInEnumThreadAndSync(() -> splits.addAll((Collection<MockSourceSplit>)enumerator.snapshotState(1L)));
        byte[] checkpointDataForV0Serde = this.createCheckpointDataWithSerdeV0(splits);
        SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> restoredCoordinator = this.getNewSourceCoordinator();
        restoredCoordinator.resetToCheckpoint(15213L, checkpointDataForV0Serde);
        TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator)restoredCoordinator.getEnumerator();
        SourceCoordinatorContext restoredContext = restoredCoordinator.getContext();
        Assert.assertEquals(splits, restoredEnumerator.getUnassignedSplits());
        Assert.assertTrue((boolean)restoredEnumerator.getHandledSourceEvent().isEmpty());
        Assert.assertEquals((long)0L, (long)restoredContext.registeredReaders().size());
    }

    private byte[] createCheckpointDataWithSerdeV0(Set<MockSourceSplit> splits) throws Exception {
        MockSplitEnumeratorCheckpointSerializer enumChkptSerializer = new MockSplitEnumeratorCheckpointSerializer();
        DataOutputSerializer serializer = new DataOutputSerializer(32);
        serializer.writeInt(0);
        serializer.writeInt(enumChkptSerializer.getVersion());
        byte[] serializedEnumChkpt = enumChkptSerializer.serialize(splits);
        serializer.writeInt(serializedEnumChkpt.length);
        serializer.write(serializedEnumChkpt);
        serializer.writeInt(0);
        serializer.writeInt(0);
        serializer.writeInt(0);
        return serializer.getCopyOfBuffer();
    }

    private void check(Runnable runnable) {
        try {
            this.coordinatorExecutor.submit(runnable).get();
        }
        catch (Exception e) {
            Assert.fail((String)("Test failed due to " + e));
        }
    }

    private static byte[] createEmptyCheckpoint() throws Exception {
        return SourceCoordinator.writeCheckpointBytes(Collections.emptySet(), (SimpleVersionedSerializer)new MockSplitEnumeratorCheckpointSerializer());
    }

    private static /* synthetic */ SplitEnumerator lambda$testErrorThrownFromSplitEnumerator$7(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static /* synthetic */ SplitEnumerator lambda$testFailJobWhenExceptionThrownFromStart$5(SplitEnumerator splitEnumerator) {
        return splitEnumerator;
    }

    private static final class EnumeratorCreatingSource<T, EnumT extends SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>>
    implements Source<T, MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<EnumT> createEnumeratorFuture = new CompletableFuture();
        final CompletableFuture<EnumT> restoreEnumeratorFuture = new CompletableFuture();
        private final Supplier<EnumT> enumeratorFactory;

        public EnumeratorCreatingSource(Supplier<EnumT> enumeratorFactory) {
            this.enumeratorFactory = enumeratorFactory;
        }

        public Boundedness getBoundedness() {
            return Boundedness.CONTINUOUS_UNBOUNDED;
        }

        public SourceReader<T, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            throw new UnsupportedOperationException();
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.createEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext, Set<MockSourceSplit> checkpoint) {
            SplitEnumerator enumerator = (SplitEnumerator)this.enumeratorFactory.get();
            this.restoreEnumeratorFuture.complete(enumerator);
            return enumerator;
        }

        public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
            return new MockSourceSplitSerializer();
        }

        public SimpleVersionedSerializer<Set<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new MockSplitEnumeratorCheckpointSerializer();
        }
    }

    private static final class ClassLoaderTestEnumerator
    implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
        final CompletableFuture<ClassLoader> threadClassLoader = new CompletableFuture();
        final ClassLoader constructorClassLoader = Thread.currentThread().getContextClassLoader();

        public void start() {
            this.threadClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            throw new UnsupportedOperationException();
        }

        public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public void addReader(int subtaskId) {
            throw new UnsupportedOperationException();
        }

        public Set<MockSourceSplit> snapshotState(long checkpointId) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }
}

