/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;

final class TestValuesRuntimeFunctions {
    static final Object LOCK = TestValuesTableFactory.class;
    private static final Map<String, Map<Integer, List<String>>> globalRawResult = new HashMap<String, Map<Integer, List<String>>>();
    private static final Map<String, Map<Integer, Map<String, String>>> globalUpsertResult = new HashMap<String, Map<Integer, Map<String, String>>>();
    private static final Map<String, Map<Integer, List<String>>> globalRetractResult = new HashMap<String, Map<Integer, List<String>>>();
    private static final Map<String, List<org.apache.flink.api.common.eventtime.Watermark>> watermarkHistory = new HashMap<String, List<org.apache.flink.api.common.eventtime.Watermark>>();

    TestValuesRuntimeFunctions() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<String> getRawResults(String tableName) {
        ArrayList<String> result = new ArrayList<String>();
        Object object = LOCK;
        synchronized (object) {
            if (globalRawResult.containsKey(tableName)) {
                globalRawResult.get(tableName).values().forEach(result::addAll);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<String> getOnlyRawResults() {
        ArrayList<String> result = new ArrayList<String>();
        Object object = LOCK;
        synchronized (object) {
            if (globalRawResult.size() != 1) {
                throw new IllegalStateException("Expected results for only one table to be present, but found " + globalRawResult.size());
            }
            globalRawResult.values().iterator().next().values().forEach(result::addAll);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks(String tableName) {
        Object object = LOCK;
        synchronized (object) {
            if (watermarkHistory.containsKey(tableName)) {
                return new ArrayList<org.apache.flink.api.common.eventtime.Watermark>((Collection)watermarkHistory.get(tableName));
            }
            return Collections.emptyList();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<String> getResults(String tableName) {
        ArrayList<String> result = new ArrayList<String>();
        Object object = LOCK;
        synchronized (object) {
            if (globalUpsertResult.containsKey(tableName)) {
                globalUpsertResult.get(tableName).values().forEach(map -> result.addAll(map.values()));
            } else if (globalRetractResult.containsKey(tableName)) {
                globalRetractResult.get(tableName).values().forEach(result::addAll);
            } else if (globalRawResult.containsKey(tableName)) {
                TestValuesRuntimeFunctions.getRawResults(tableName).stream().map(s -> s.substring(3, s.length() - 1)).forEach(result::add);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void clearResults() {
        Object object = LOCK;
        synchronized (object) {
            globalRawResult.clear();
            globalUpsertResult.clear();
            globalRetractResult.clear();
            watermarkHistory.clear();
        }
    }

    public static class TestNoLookupUntilNthAccessAsyncLookupFunction
    extends AsyncTestValueLookupFunction {
        private static final long serialVersionUID = 1L;
        private static Collection<RowData> emptyResult = Collections.emptyList();
        private final int lookupThreshold;
        private transient Map<RowData, Integer> accessCounter;

        public TestNoLookupUntilNthAccessAsyncLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection, int lookupThreshold) {
            super(data, lookupIndices, producedRowType, converter, generatedProjection);
            this.lookupThreshold = lookupThreshold;
        }

        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.accessCounter = new HashMap<RowData, Integer>();
        }

        protected int counter(RowData key) {
            int currentCnt = this.accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
            this.accessCounter.put(key, currentCnt);
            return currentCnt;
        }

        @Override
        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            int currentCnt = this.counter(keyRow);
            if (currentCnt <= this.lookupThreshold) {
                return CompletableFuture.supplyAsync(() -> emptyResult);
            }
            return super.asyncLookup(keyRow);
        }
    }

    public static class TestNoLookupUntilNthAccessLookupFunction
    extends TestValuesLookupFunction {
        private static final long serialVersionUID = 1L;
        private final int lookupThreshold;
        private transient Map<RowData, Integer> accessCounter;

        protected TestNoLookupUntilNthAccessLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection, int lookupThreshold) {
            super(data, lookupIndices, producedRowType, converter, generatedProjection);
            this.lookupThreshold = lookupThreshold;
        }

        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.accessCounter = new HashMap<RowData, Integer>();
        }

        protected int counter(RowData key) {
            int currentCnt = this.accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
            this.accessCounter.put(key, currentCnt);
            return currentCnt;
        }

        @Override
        public Collection<RowData> lookup(RowData keyRow) throws IOException {
            int currentCnt = this.counter(keyRow);
            if (currentCnt <= this.lookupThreshold) {
                return null;
            }
            return super.lookup(keyRow);
        }
    }

    public static class AsyncTestValueLookupFunction
    extends AsyncLookupFunction {
        private static final long serialVersionUID = 1L;
        private final List<Row> data;
        private final int[] lookupIndices;
        private final RowType producedRowType;
        private final DynamicTableSource.DataStructureConverter converter;
        private final GeneratedProjection generatedProjection;
        private final boolean projectable;
        private final Random random;
        private transient boolean isOpenCalled = false;
        private transient ExecutorService executor;
        private transient Map<RowData, List<RowData>> indexedData;
        private transient Projection<RowData, GenericRowData> projection;
        private transient TypeSerializer<RowData> rowSerializer;

        protected AsyncTestValueLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection) {
            this.data = data;
            this.lookupIndices = lookupIndices;
            this.producedRowType = producedRowType;
            this.converter = converter;
            this.projectable = generatedProjection.isPresent();
            this.generatedProjection = generatedProjection.orElse(null);
            this.random = new Random();
        }

        public void open(FunctionContext context) throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.incrementAndGet();
            if (this.projectable) {
                this.projection = (Projection)this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
            }
            this.rowSerializer = InternalSerializers.create((RowType)this.producedRowType);
            this.isOpenCalled = true;
            this.executor = Executors.newFixedThreadPool(2);
            this.indexDataByKey();
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            Preconditions.checkArgument((boolean)this.isOpenCalled, (Object)"open() is not called.");
            for (int i = 0; i < keyRow.getArity(); ++i) {
                Preconditions.checkNotNull((Object)((GenericRowData)keyRow).getField(i), (String)String.format("Lookup key %s contains null value, which should not happen.", keyRow));
            }
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(this.random.nextInt(5));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return this.indexedData.get(keyRow);
            }, this.executor);
        }

        public void close() throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.decrementAndGet();
            if (this.executor != null && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
        }

        private void indexDataByKey() {
            this.indexedData = new HashMap<RowData, List<RowData>>();
            this.data.forEach(record -> {
                GenericRowData rowData = (GenericRowData)this.converter.toInternal(record);
                if (this.projectable) {
                    rowData = (GenericRowData)this.projection.apply((RowData)rowData);
                }
                Preconditions.checkNotNull((Object)rowData, (String)"Cannot convert record to internal GenericRowData type");
                GenericRowData key = GenericRowData.of((Object[])Arrays.stream(this.lookupIndices).mapToObj(arg_0 -> ((GenericRowData)rowData).getField(arg_0)).toArray());
                RowData copiedRow = (RowData)this.rowSerializer.copy((Object)rowData);
                List<RowData> list = this.indexedData.get(key);
                if (list != null) {
                    list.add(copiedRow);
                } else {
                    list = new ArrayList<RowData>();
                    list.add(copiedRow);
                    this.indexedData.put((RowData)key, list);
                }
            });
        }
    }

    public static class TestValuesLookupFunction
    extends LookupFunction {
        private static final long serialVersionUID = 1L;
        private final List<Row> data;
        private final int[] lookupIndices;
        private final RowType producedRowType;
        private final DynamicTableSource.DataStructureConverter converter;
        private final GeneratedProjection generatedProjection;
        private final boolean projectable;
        private transient Map<RowData, List<RowData>> indexedData;
        private transient boolean isOpenCalled = false;
        private transient Projection<RowData, GenericRowData> projection;
        private transient TypeSerializer<RowData> rowSerializer;

        protected TestValuesLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection) {
            this.data = data;
            this.lookupIndices = lookupIndices;
            this.producedRowType = producedRowType;
            this.converter = converter;
            this.projectable = generatedProjection.isPresent();
            this.generatedProjection = generatedProjection.orElse(null);
        }

        public void open(FunctionContext context) throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.incrementAndGet();
            this.isOpenCalled = true;
            if (this.projectable) {
                this.projection = (Projection)this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
            }
            this.rowSerializer = InternalSerializers.create((RowType)this.producedRowType);
            this.indexDataByKey();
        }

        public Collection<RowData> lookup(RowData keyRow) throws IOException {
            Preconditions.checkArgument((boolean)this.isOpenCalled, (Object)"open() is not called.");
            for (int i = 0; i < keyRow.getArity(); ++i) {
                Preconditions.checkNotNull((Object)((GenericRowData)keyRow).getField(i), (String)String.format("Lookup key %s contains null value, which should not happen.", keyRow));
            }
            return this.indexedData.get(keyRow);
        }

        public void close() throws Exception {
            TestValuesTableFactory.RESOURCE_COUNTER.decrementAndGet();
        }

        private void indexDataByKey() {
            this.indexedData = new HashMap<RowData, List<RowData>>();
            this.data.forEach(record -> {
                GenericRowData rowData = (GenericRowData)this.converter.toInternal(record);
                if (this.projectable) {
                    rowData = (GenericRowData)this.projection.apply((RowData)rowData);
                }
                Preconditions.checkNotNull((Object)rowData, (String)"Cannot convert record to internal GenericRowData type");
                GenericRowData key = GenericRowData.of((Object[])Arrays.stream(this.lookupIndices).mapToObj(arg_0 -> ((GenericRowData)rowData).getField(arg_0)).toArray());
                RowData copiedRow = (RowData)this.rowSerializer.copy((Object)rowData);
                List<RowData> list = this.indexedData.get(key);
                if (list != null) {
                    list.add(copiedRow);
                } else {
                    list = new ArrayList<RowData>();
                    list.add(copiedRow);
                    this.indexedData.put((RowData)key, list);
                }
            });
        }
    }

    static class AppendingOutputFormat
    extends RichOutputFormat<RowData> {
        private static final long serialVersionUID = 1L;
        private final String tableName;
        private final DynamicTableSink.DataStructureConverter converter;
        protected transient List<String> localRawResult;

        protected AppendingOutputFormat(String tableName, DynamicTableSink.DataStructureConverter converter) {
            this.tableName = tableName;
            this.converter = converter;
        }

        public void configure(Configuration parameters) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(int taskNumber, int numTasks) throws IOException {
            this.localRawResult = new ArrayList<String>();
            Object object = LOCK;
            synchronized (object) {
                globalRawResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(taskNumber, this.localRawResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeRecord(RowData value) throws IOException {
            RowKind kind = value.getRowKind();
            if (value.getRowKind() == RowKind.INSERT) {
                Row row = (Row)this.converter.toExternal((Object)value);
                Assertions.assertThat((Object)row).isNotNull();
                Object object = LOCK;
                synchronized (object) {
                    this.localRawResult.add(kind.shortString() + "(" + row.toString() + ")");
                }
            } else {
                throw new RuntimeException("AppendingOutputFormat received " + value.getRowKind() + " messages.");
            }
        }

        public void close() throws IOException {
        }
    }

    static class RetractingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final long serialVersionUID = 1L;
        private final DynamicTableSink.DataStructureConverter converter;
        protected transient ListState<String> retractResultState;
        protected transient List<String> localRetractResult;

        protected RetractingSinkFunction(String tableName, DynamicTableSink.DataStructureConverter converter) {
            super(tableName);
            this.converter = converter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            super.initializeState(context);
            this.retractResultState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sink-retract-results", Types.STRING));
            this.localRetractResult = new ArrayList<String>();
            if (context.isRestored()) {
                for (String value : (Iterable)this.retractResultState.get()) {
                    this.localRetractResult.add(value);
                }
            }
            int taskId = this.getRuntimeContext().getIndexOfThisSubtask();
            Object object = LOCK;
            synchronized (object) {
                globalRetractResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(taskId, this.localRetractResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.retractResultState.clear();
            Object object = LOCK;
            synchronized (object) {
                this.retractResultState.addAll(this.localRetractResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RowKind kind = value.getRowKind();
            Row row = (Row)this.converter.toExternal((Object)value);
            Assertions.assertThat((Object)row).isNotNull();
            Object object = LOCK;
            synchronized (object) {
                this.localRawResult.add(kind.shortString() + "(" + row.toString() + ")");
                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                    row.setKind(RowKind.INSERT);
                    this.localRetractResult.add(row.toString());
                } else {
                    row.setKind(RowKind.INSERT);
                    boolean contains = this.localRetractResult.remove(row.toString());
                    if (!contains) {
                        throw new RuntimeException("Tried to retract a value that wasn't inserted first. This is probably an incorrectly implemented test.");
                    }
                }
            }
        }
    }

    static class KeyedUpsertingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final long serialVersionUID = 1L;
        private final DynamicTableSink.DataStructureConverter converter;
        private final int[] keyIndices;
        private final int expectedSize;
        private transient Map<String, String> localUpsertResult;
        private transient int receivedNum;

        protected KeyedUpsertingSinkFunction(String tableName, DynamicTableSink.DataStructureConverter converter, int[] keyIndices, int expectedSize) {
            super(tableName);
            this.converter = converter;
            this.keyIndices = keyIndices;
            this.expectedSize = expectedSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            super.initializeState(context);
            Object object = LOCK;
            synchronized (object) {
                this.localUpsertResult = globalUpsertResult.computeIfAbsent(this.tableName, k -> new HashMap()).computeIfAbsent(0, k -> new HashMap());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RowKind kind = value.getRowKind();
            Row row = (Row)this.converter.toExternal((Object)value);
            Assertions.assertThat((Object)row).isNotNull();
            Object object = LOCK;
            synchronized (object) {
                if (RowUtils.USE_LEGACY_TO_STRING) {
                    this.localRawResult.add(kind.shortString() + "(" + row + ")");
                } else {
                    this.localRawResult.add(row.toString());
                }
                row.setKind(RowKind.INSERT);
                Row key = Row.project((Row)row, (int[])this.keyIndices);
                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                    this.localUpsertResult.put(key.toString(), row.toString());
                } else {
                    String oldValue = this.localUpsertResult.remove(key.toString());
                    if (oldValue == null) {
                        throw new RuntimeException("Tried to delete a value that wasn't inserted first. This is probably an incorrectly implemented test.");
                    }
                }
                ++this.receivedNum;
                if (this.expectedSize != -1 && this.receivedNum == this.expectedSize) {
                    throw new SuccessException();
                }
            }
        }
    }

    static class AppendingSinkFunction
    extends AbstractExactlyOnceSink {
        private static final long serialVersionUID = 1L;
        private final DynamicTableSink.DataStructureConverter converter;
        private final int rowtimeIndex;

        protected AppendingSinkFunction(String tableName, DynamicTableSink.DataStructureConverter converter, int rowtimeIndex) {
            super(tableName);
            this.converter = converter;
            this.rowtimeIndex = rowtimeIndex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            RowKind kind = value.getRowKind();
            if (value.getRowKind() == RowKind.INSERT) {
                Row row = (Row)this.converter.toExternal((Object)value);
                Assertions.assertThat((Object)row).isNotNull();
                if (this.rowtimeIndex >= 0) {
                    TimestampData rowtime = value.getTimestamp(this.rowtimeIndex, 3);
                    long mark = context.currentWatermark();
                    if (mark > rowtime.getMillisecond()) {
                        return;
                    }
                }
                Object object = LOCK;
                synchronized (object) {
                    this.localRawResult.add(kind.shortString() + "(" + row.toString() + ")");
                }
            } else {
                throw new RuntimeException("AppendingSinkFunction received " + value.getRowKind() + " messages.");
            }
        }
    }

    private static abstract class AbstractExactlyOnceSink
    extends RichSinkFunction<RowData>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        protected final String tableName;
        protected transient ListState<String> rawResultState;
        protected transient List<String> localRawResult;

        protected AbstractExactlyOnceSink(String tableName) {
            this.tableName = tableName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.rawResultState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sink-results", Types.STRING));
            this.localRawResult = new ArrayList<String>();
            if (context.isRestored()) {
                for (String value : (Iterable)this.rawResultState.get()) {
                    this.localRawResult.add(value);
                }
            }
            int taskId = this.getRuntimeContext().getIndexOfThisSubtask();
            Object object = LOCK;
            synchronized (object) {
                globalRawResult.computeIfAbsent(this.tableName, k -> new HashMap()).put(taskId, this.localRawResult);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.rawResultState.clear();
            Object object = LOCK;
            synchronized (object) {
                this.rawResultState.addAll(this.localRawResult);
            }
        }
    }

    public static class FromElementSourceFunctionWithWatermark
    implements SourceFunction<RowData> {
        private final TypeSerializer<RowData> serializer;
        private final byte[] elementsSerialized;
        private final int numElements;
        private volatile int numElementsEmitted;
        private final WatermarkStrategy<RowData> watermarkStrategy;
        private volatile boolean isRunning = true;
        private String tableName;

        public FromElementSourceFunctionWithWatermark(String tableName, TypeSerializer<RowData> serializer, Iterable<RowData> elements, WatermarkStrategy<RowData> watermarkStrategy) throws IOException {
            this.tableName = tableName;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper((OutputStream)baos);
            int count = 0;
            try {
                for (RowData element : elements) {
                    serializer.serialize((Object)element, (DataOutputView)wrapper);
                    ++count;
                }
            }
            catch (Exception e) {
                throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
            }
            this.numElements = count;
            this.elementsSerialized = baos.toByteArray();
            this.watermarkStrategy = watermarkStrategy;
            this.serializer = serializer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            ByteArrayInputStream bais = new ByteArrayInputStream(this.elementsSerialized);
            DataInputViewStreamWrapper input = new DataInputViewStreamWrapper((InputStream)bais);
            WatermarkGenerator generator = this.watermarkStrategy.createWatermarkGenerator(() -> null);
            TestValuesWatermarkOutput output = new TestValuesWatermarkOutput(ctx);
            Object lock = ctx.getCheckpointLock();
            while (this.isRunning && this.numElementsEmitted < this.numElements) {
                RowData next;
                try {
                    next = (RowData)this.serializer.deserialize((DataInputView)input);
                    generator.onEvent((Object)next, Long.MIN_VALUE, (WatermarkOutput)output);
                    generator.onPeriodicEmit((WatermarkOutput)output);
                }
                catch (Exception e) {
                    throw new IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + this.serializer, e);
                }
                Object object = lock;
                synchronized (object) {
                    ctx.collect((Object)next);
                    ++this.numElementsEmitted;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        private class TestValuesWatermarkOutput
        implements WatermarkOutput {
            SourceFunction.SourceContext<RowData> ctx;

            public TestValuesWatermarkOutput(SourceFunction.SourceContext<RowData> ctx) {
                this.ctx = ctx;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
                this.ctx.emitWatermark(new Watermark(watermark.getTimestamp()));
                Object object = LOCK;
                synchronized (object) {
                    watermarkHistory.computeIfAbsent(FromElementSourceFunctionWithWatermark.this.tableName, k -> new LinkedList()).add(watermark);
                }
            }

            public void markIdle() {
            }

            public void markActive() {
            }
        }
    }
}

