/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Nullable;

public class TestUpdateDeleteTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static final String IDENTIFIER = "test-update-delete";
    private static final ConfigOption<String> DATA_ID = ConfigOptions.key((String)"data-id").stringType().noDefaultValue().withDescription("The data id used to read the rows.");
    private static final ConfigOption<Boolean> ONLY_ACCEPT_EQUAL_PREDICATE = ConfigOptions.key((String)"only-accept-equal-predicate").booleanType().defaultValue((Object)false).withDescription("Whether only accept when the all predicates in filter is equal expression for delete statement.");
    private static final ConfigOption<Boolean> SUPPORT_DELETE_PUSH_DOWN = ConfigOptions.key((String)"support-delete-push-down").booleanType().defaultValue((Object)true).withDescription("Whether the table supports delete push down.");
    private static final ConfigOption<Boolean> MIX_DELETE = ConfigOptions.key((String)"mix-delete").booleanType().defaultValue((Object)false).withDescription("Whether the table support both delete push down and row-level delete. Note: for supporting delete push down, only the filter pushed is empty, can the filter be accepted.");
    private static final ConfigOption<SupportsRowLevelDelete.RowLevelDeleteMode> DELETE_MODE = ConfigOptions.key((String)"delete-mode").enumType(SupportsRowLevelDelete.RowLevelDeleteMode.class).defaultValue((Object)SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS).withDescription("The delete mode for row level delete.");
    private static final ConfigOption<SupportsRowLevelUpdate.RowLevelUpdateMode> UPDATE_MODE = ConfigOptions.key((String)"update-mode").enumType(SupportsRowLevelUpdate.RowLevelUpdateMode.class).defaultValue((Object)SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS).withDescription("The update mode for row level update.");
    private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_DELETE = ConfigOptions.key((String)"required-columns-for-delete").stringType().asList().noDefaultValue().withDescription("The columns' name for the required columns in row-level delete.");
    private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_UPDATE = ConfigOptions.key((String)"required-columns-for-update").stringType().asList().noDefaultValue().withDescription("The name for the required columns in row-level update.");
    private static final ConfigOption<Boolean> ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE = ConfigOptions.key((String)"only-require-updated-columns-for-update").booleanType().defaultValue((Object)false).withDescription("Whether to only require the updated columns for update statement, require all columns by default.");
    private static final List<Column.MetadataColumn> META_COLUMNS = Arrays.asList(Column.metadata((String)"g", (DataType)DataTypes.STRING(), null, (boolean)true), Column.metadata((String)"meta_f1", (DataType)((DataType)DataTypes.INT().notNull()), null, (boolean)false), Column.metadata((String)"meta_f2", (DataType)((DataType)DataTypes.STRING().notNull()), (String)"meta_k2", (boolean)false));
    private static final AtomicInteger idCounter = new AtomicInteger(0);
    private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<String, Collection<RowData>>();

    public static String registerRowData(Collection<RowData> data) {
        String id = String.valueOf(idCounter.incrementAndGet());
        registeredRowData.put(id, data);
        return id;
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        String dataId = helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
        SupportsRowLevelDelete.RowLevelDeleteMode deleteMode = (SupportsRowLevelDelete.RowLevelDeleteMode)helper.getOptions().get(DELETE_MODE);
        SupportsRowLevelUpdate.RowLevelUpdateMode updateMode = (SupportsRowLevelUpdate.RowLevelUpdateMode)helper.getOptions().get(UPDATE_MODE);
        List requireColsForDelete = (List)helper.getOptions().get(REQUIRED_COLUMNS_FOR_DELETE);
        List requireColsForUpdate = (List)helper.getOptions().get(REQUIRED_COLUMNS_FOR_UPDATE);
        boolean onlyRequireUpdatedColumns = (Boolean)helper.getOptions().get(ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE);
        if (((Boolean)helper.getOptions().get(MIX_DELETE)).booleanValue()) {
            return new SupportsDeleteSink(context.getObjectIdentifier(), context.getCatalogTable(), deleteMode, updateMode, dataId, requireColsForDelete, requireColsForUpdate, onlyRequireUpdatedColumns);
        }
        if (((Boolean)helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)).booleanValue()) {
            return new SupportsDeletePushDownSink(context.getObjectIdentifier(), context.getCatalogTable(), updateMode, dataId, requireColsForUpdate, onlyRequireUpdatedColumns, (Boolean)helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE));
        }
        return new SupportsRowLevelModificationSink(context.getObjectIdentifier(), context.getCatalogTable(), deleteMode, updateMode, dataId, requireColsForDelete, requireColsForUpdate, onlyRequireUpdatedColumns);
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        String dataId = helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
        return new TestTableSource(dataId, context.getObjectIdentifier());
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet(Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, SUPPORT_DELETE_PUSH_DOWN, MIX_DELETE, DELETE_MODE, UPDATE_MODE, REQUIRED_COLUMNS_FOR_DELETE, REQUIRED_COLUMNS_FOR_UPDATE, ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE));
    }

    private static Optional<List<Tuple2<String, Object>>> getEqualPredicates(List<ResolvedExpression> filters) {
        ArrayList<Tuple2> equalPredicates = new ArrayList<Tuple2>();
        for (ResolvedExpression expression : filters) {
            if (!(expression instanceof CallExpression)) {
                return Optional.empty();
            }
            CallExpression callExpression = (CallExpression)expression;
            if (callExpression.getFunctionDefinition() != BuiltInFunctionDefinitions.EQUALS) {
                return Optional.empty();
            }
            String colName = TestUpdateDeleteTableFactory.getColumnName(callExpression);
            Object value = TestUpdateDeleteTableFactory.getColumnValue(callExpression);
            equalPredicates.add(Tuple2.of((Object)colName, (Object)value));
        }
        return Optional.of(equalPredicates);
    }

    private static String getColumnName(CallExpression comp) {
        return ((FieldReferenceExpression)comp.getChildren().get(0)).getName();
    }

    private static Object getColumnValue(CallExpression comp) {
        ValueLiteralExpression valueLiteralExpression = (ValueLiteralExpression)comp.getChildren().get(1);
        return valueLiteralExpression.getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass()).get();
    }

    private static boolean satisfyEqualPredicate(List<Tuple2<String, Object>> equalPredicates, RowData rowData, RowData.FieldGetter[] fieldGetters, List<String> columns) {
        for (Tuple2<String, Object> equalPredicate : equalPredicates) {
            Object value = equalPredicate.f1;
            String colName = (String)equalPredicate.f0;
            int colIndex = columns.indexOf(colName);
            if (Objects.equals(value, fieldGetters[colIndex].getFieldOrNull(rowData))) continue;
            return false;
        }
        return true;
    }

    private static void checkScanContext(RowLevelModificationScanContext context, ObjectIdentifier tableIdentifier) {
        Preconditions.checkArgument((boolean)(context instanceof TestScanContext));
        TestScanContext scanContext = (TestScanContext)context;
        Preconditions.checkArgument((boolean)scanContext.scanTables.contains(tableIdentifier), (Object)"The scan context should contains the object identifier for row-level modification.");
    }

    private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(ResolvedSchema resolvedSchema) {
        int[] indexes = resolvedSchema.getPrimaryKeyIndexes();
        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[indexes.length];
        List dataTypes = resolvedSchema.getColumnDataTypes();
        for (int i = 0; i < fieldGetters.length; ++i) {
            int colIndex = indexes[i];
            fieldGetters[i] = RowData.createFieldGetter((LogicalType)((DataType)dataTypes.get(colIndex)).getLogicalType(), (int)colIndex);
        }
        return fieldGetters;
    }

    private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
        List dataTypes = resolvedSchema.getColumnDataTypes();
        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()];
        for (int i = 0; i < dataTypes.size(); ++i) {
            fieldGetters[i] = RowData.createFieldGetter((LogicalType)((DataType)dataTypes.get(i)).getLogicalType(), (int)i);
        }
        return fieldGetters;
    }

    private static boolean equal(RowData value1, RowData value2, RowData.FieldGetter[] fieldGetters) {
        for (RowData.FieldGetter fieldGetter : fieldGetters) {
            if (Objects.equals(fieldGetter.getFieldOrNull(value1), fieldGetter.getFieldOrNull(value2))) continue;
            return false;
        }
        return true;
    }

    private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] fieldGetters) {
        Object[] values = new Object[fieldGetters.length];
        for (int i = 0; i < fieldGetters.length; ++i) {
            values[i] = fieldGetters[i].getFieldOrNull(rowData);
        }
        return GenericRowData.of((Object[])values);
    }

    private static List<Column> getRequiredColumns(List<String> requiredColName, ResolvedSchema schema) {
        ArrayList<Column> requiredCols = new ArrayList<Column>();
        for (String colName : requiredColName) {
            Optional optionalColumn = schema.getColumn(colName);
            if (optionalColumn.isPresent()) {
                requiredCols.add((Column)optionalColumn.get());
                continue;
            }
            Column.MetadataColumn metaCol = null;
            for (Column.MetadataColumn metadataColumn : META_COLUMNS) {
                String metaColName = metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
                if (!metaColName.equals(colName)) continue;
                metaCol = metadataColumn;
                break;
            }
            if (metaCol == null) {
                throw new TableException(String.format("Can't find the required column: `%s`.", colName));
            }
            requiredCols.add((Column)metaCol);
        }
        return requiredCols;
    }

    private static class UpdateDataSinkFunction
    extends RichSinkFunction<RowData> {
        private final String dataId;
        private final RowData.FieldGetter[] primaryKeyFieldGetters;
        private final RowData.FieldGetter[] allFieldGetters;
        private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
        private transient RowData[] oldRows;
        private transient List<Tuple2<Integer, RowData>> updatedRows;
        private transient List<RowData> allNewRows;

        public UpdateDataSinkFunction(String dataId, RowData.FieldGetter[] primaryKeyFieldGetters, RowData.FieldGetter[] allFieldGetters, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
            this.dataId = dataId;
            this.primaryKeyFieldGetters = primaryKeyFieldGetters;
            this.updateMode = updateMode;
            this.allFieldGetters = allFieldGetters;
        }

        public void open(Configuration parameters) {
            this.oldRows = ((Collection)registeredRowData.get(this.dataId)).toArray(new RowData[0]);
            this.updatedRows = new ArrayList<Tuple2<Integer, RowData>>();
            this.allNewRows = new ArrayList<RowData>();
        }

        public void invoke(RowData value, SinkFunction.Context context) {
            if (this.updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS) {
                this.consumeUpdatedRows(value);
            } else if (this.updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS) {
                this.consumeAllRows(value);
            } else {
                throw new TableException("Unknown update mode " + this.updateMode);
            }
        }

        private void consumeUpdatedRows(RowData updatedRow) {
            Preconditions.checkArgument((updatedRow.getRowKind() == RowKind.UPDATE_AFTER ? 1 : 0) != 0, (Object)("The RowKind for the updated rows should be " + RowKind.UPDATE_AFTER));
            for (int i = 0; i < this.oldRows.length; ++i) {
                if (!TestUpdateDeleteTableFactory.equal(this.oldRows[i], updatedRow, this.primaryKeyFieldGetters)) continue;
                this.updatedRows.add((Tuple2<Integer, RowData>)new Tuple2((Object)i, (Object)TestUpdateDeleteTableFactory.copyRowData(updatedRow, this.allFieldGetters)));
            }
        }

        private void consumeAllRows(RowData rowData) {
            Preconditions.checkArgument((rowData.getRowKind() == RowKind.INSERT ? 1 : 0) != 0, (Object)("The RowKind for the updated rows should be " + RowKind.INSERT));
            this.allNewRows.add(TestUpdateDeleteTableFactory.copyRowData(rowData, this.allFieldGetters));
        }

        public void finish() throws Exception {
            if (this.updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS) {
                this.commitForUpdatedRows();
            } else if (this.updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS) {
                this.commitForAllRows();
            } else {
                throw new TableException("Unknown update mode " + this.updateMode);
            }
        }

        private void commitForUpdatedRows() {
            List<RowData> newRows = Arrays.asList(this.oldRows);
            for (Tuple2<Integer, RowData> updatedRow : this.updatedRows) {
                newRows.set((Integer)updatedRow.f0, (RowData)updatedRow.f1);
            }
            registeredRowData.put(this.dataId, newRows);
        }

        private void commitForAllRows() {
            registeredRowData.put(this.dataId, this.allNewRows);
        }
    }

    private static class SupportsDeleteSink
    extends SupportsRowLevelModificationSink
    implements SupportsDeletePushDown {
        public SupportsDeleteSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelDelete.RowLevelDeleteMode deleteMode, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForDelete, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns) {
            super(tableIdentifier, resolvedCatalogTable, deleteMode, updateMode, dataId, requireColumnsForDelete, requireColumnsForUpdate, onlyRequireUpdatedColumns);
        }

        public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
            return filters.isEmpty();
        }

        public Optional<Long> executeDeletion() {
            Collection oldRows = (Collection)registeredRowData.get(this.dataId);
            if (oldRows != null) {
                registeredRowData.put(this.dataId, new ArrayList());
                return Optional.of(Long.valueOf(oldRows.size()));
            }
            return Optional.empty();
        }
    }

    public static class SupportsDeletePushDownSink
    extends SupportsRowLevelUpdateSink
    implements SupportsDeletePushDown {
        private final String dataId;
        private final boolean onlyAcceptEqualPredicate;
        private final ResolvedCatalogTable resolvedCatalogTable;
        private final RowData.FieldGetter[] fieldGetters;
        private final List<String> columns;
        private List<Tuple2<String, Object>> equalPredicates;

        public SupportsDeletePushDownSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns, boolean onlyAcceptEqualPredicate) {
            super(tableIdentifier, resolvedCatalogTable, updateMode, dataId, requireColumnsForUpdate, onlyRequireUpdatedColumns);
            this.dataId = dataId;
            this.onlyAcceptEqualPredicate = onlyAcceptEqualPredicate;
            this.resolvedCatalogTable = resolvedCatalogTable;
            this.fieldGetters = TestUpdateDeleteTableFactory.getAllFieldGetter(resolvedCatalogTable.getResolvedSchema());
            this.columns = resolvedCatalogTable.getResolvedSchema().getColumnNames();
        }

        @Override
        public DynamicTableSink copy() {
            return new SupportsDeletePushDownSink(this.tableIdentifier, this.resolvedCatalogTable, this.updateMode, this.dataId, this.requireColumnsForUpdate, this.onlyRequireUpdatedColumns, this.onlyAcceptEqualPredicate);
        }

        @Override
        public String asSummaryString() {
            return "SupportDeletePushDownSink";
        }

        public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
            if (this.onlyAcceptEqualPredicate) {
                Optional optionalEqualPredicates = TestUpdateDeleteTableFactory.getEqualPredicates(filters);
                if (optionalEqualPredicates.isPresent()) {
                    this.equalPredicates = (List)optionalEqualPredicates.get();
                    return true;
                }
                return false;
            }
            return true;
        }

        public Optional<Long> executeDeletion() {
            if (this.onlyAcceptEqualPredicate) {
                Collection existingRows = (Collection)registeredRowData.get(this.dataId);
                long rowsBefore = existingRows.size();
                existingRows.removeIf(rowData -> TestUpdateDeleteTableFactory.satisfyEqualPredicate(this.equalPredicates, rowData, this.fieldGetters, this.columns));
                return Optional.of(rowsBefore - (long)existingRows.size());
            }
            return Optional.empty();
        }
    }

    private static class DeleteDataSinkFunction
    extends RichSinkFunction<RowData> {
        private final String dataId;
        private final RowData.FieldGetter[] fieldGetters;
        private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
        private transient Collection<RowData> data;
        private transient List<RowData> newData;

        DeleteDataSinkFunction(String dataId, RowData.FieldGetter[] fieldGetters, SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) {
            this.dataId = dataId;
            this.fieldGetters = fieldGetters;
            this.deleteMode = deleteMode;
        }

        public void open(Configuration parameters) {
            this.data = (Collection)registeredRowData.get(this.dataId);
            this.newData = new ArrayList<RowData>();
        }

        public void invoke(RowData value, SinkFunction.Context context) {
            if (this.deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
                this.consumeDeletedRows(value);
            } else if (this.deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
                this.consumeRemainingRows(value);
            } else {
                throw new TableException(String.format("Unknown delete mode: %s.", this.deleteMode));
            }
        }

        private void consumeDeletedRows(RowData deletedRow) {
            Preconditions.checkState((deletedRow.getRowKind() == RowKind.DELETE ? 1 : 0) != 0, (Object)String.format("The RowKind for the coming rows should be %s in delete mode %s.", RowKind.DELETE, DELETE_MODE));
            this.data.removeIf(rowData -> TestUpdateDeleteTableFactory.equal(rowData, deletedRow, this.fieldGetters));
        }

        private void consumeRemainingRows(RowData remainingRow) {
            Preconditions.checkState((remainingRow.getRowKind() == RowKind.INSERT ? 1 : 0) != 0, (Object)String.format("The RowKind for the coming rows should be %s in delete mode %s.", RowKind.INSERT, DELETE_MODE));
            this.newData.add(TestUpdateDeleteTableFactory.copyRowData(remainingRow, this.fieldGetters));
        }

        public void finish() {
            if (this.deleteMode == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
                registeredRowData.put(this.dataId, this.newData);
            }
        }
    }

    private static class SupportsRowLevelModificationSink
    extends SupportsRowLevelUpdateSink
    implements SupportsRowLevelDelete {
        private final ObjectIdentifier tableIdentifier;
        private final ResolvedCatalogTable resolvedCatalogTable;
        private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
        protected final String dataId;
        private final List<String> requireColumnsForDelete;
        private boolean isDelete;

        public SupportsRowLevelModificationSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelDelete.RowLevelDeleteMode deleteMode, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForDelete, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns) {
            this(tableIdentifier, resolvedCatalogTable, deleteMode, updateMode, dataId, requireColumnsForDelete, requireColumnsForUpdate, onlyRequireUpdatedColumns, false, false);
        }

        public SupportsRowLevelModificationSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelDelete.RowLevelDeleteMode deleteMode, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForDelete, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns, boolean isDelete, boolean isUpdate) {
            super(tableIdentifier, resolvedCatalogTable, updateMode, dataId, requireColumnsForUpdate, onlyRequireUpdatedColumns, isUpdate);
            this.tableIdentifier = tableIdentifier;
            this.resolvedCatalogTable = resolvedCatalogTable;
            this.deleteMode = deleteMode;
            this.dataId = dataId;
            this.requireColumnsForDelete = requireColumnsForDelete;
            this.isDelete = isDelete;
        }

        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.all();
        }

        @Override
        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            if (this.isUpdate) {
                return super.getSinkRuntimeProvider(context);
            }
            return new DataStreamSinkProvider(){

                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    if (isDelete) {
                        return dataStream.addSink((SinkFunction)new DeleteDataSinkFunction(dataId, TestUpdateDeleteTableFactory.getAllFieldGetter(resolvedCatalogTable.getResolvedSchema()), deleteMode)).setParallelism(1);
                    }
                    return dataStream.addSink((SinkFunction)new DiscardingSink());
                }
            };
        }

        @Override
        public DynamicTableSink copy() {
            return new SupportsRowLevelModificationSink(this.tableIdentifier, this.resolvedCatalogTable, this.deleteMode, this.updateMode, this.dataId, this.requireColumnsForDelete, this.requireColumnsForUpdate, this.onlyRequireUpdatedColumns, this.isDelete, this.isUpdate);
        }

        @Override
        public String asSummaryString() {
            return "SupportsRowLevelModificationSink";
        }

        public SupportsRowLevelDelete.RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) {
            TestUpdateDeleteTableFactory.checkScanContext(context, this.tableIdentifier);
            this.isDelete = true;
            return new SupportsRowLevelDelete.RowLevelDeleteInfo(){

                public Optional<List<Column>> requiredColumns() {
                    List requiredCols = null;
                    if (requireColumnsForDelete != null) {
                        requiredCols = TestUpdateDeleteTableFactory.getRequiredColumns(requireColumnsForDelete, resolvedCatalogTable.getResolvedSchema());
                    }
                    return Optional.ofNullable(requiredCols);
                }

                public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
                    return deleteMode;
                }
            };
        }
    }

    private static class SupportsRowLevelUpdateSink
    implements DynamicTableSink,
    SupportsRowLevelUpdate {
        protected final ObjectIdentifier tableIdentifier;
        protected final ResolvedCatalogTable resolvedCatalogTable;
        protected final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
        protected final List<String> requireColumnsForUpdate;
        protected final boolean onlyRequireUpdatedColumns;
        protected final String dataId;
        protected boolean isUpdate;

        public SupportsRowLevelUpdateSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns) {
            this(tableIdentifier, resolvedCatalogTable, updateMode, dataId, requireColumnsForUpdate, onlyRequireUpdatedColumns, false);
        }

        public SupportsRowLevelUpdateSink(ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SupportsRowLevelUpdate.RowLevelUpdateMode updateMode, String dataId, List<String> requireColumnsForUpdate, boolean onlyRequireUpdatedColumns, boolean isUpdate) {
            this.tableIdentifier = tableIdentifier;
            this.resolvedCatalogTable = resolvedCatalogTable;
            this.updateMode = updateMode;
            this.dataId = dataId;
            this.requireColumnsForUpdate = requireColumnsForUpdate;
            this.onlyRequireUpdatedColumns = onlyRequireUpdatedColumns;
            this.isUpdate = isUpdate;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.upsert();
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            return new DataStreamSinkProvider(){

                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink((SinkFunction)new UpdateDataSinkFunction(dataId, TestUpdateDeleteTableFactory.getPrimaryKeyFieldGetter(resolvedCatalogTable.getResolvedSchema()), TestUpdateDeleteTableFactory.getAllFieldGetter(resolvedCatalogTable.getResolvedSchema()), updateMode)).setParallelism(1);
                }
            };
        }

        public DynamicTableSink copy() {
            return new SupportsRowLevelUpdateSink(this.tableIdentifier, this.resolvedCatalogTable, this.updateMode, this.dataId, this.requireColumnsForUpdate, this.onlyRequireUpdatedColumns, this.isUpdate);
        }

        public String asSummaryString() {
            return "SupportsRowLevelUpdateSink";
        }

        public SupportsRowLevelUpdate.RowLevelUpdateInfo applyRowLevelUpdate(final List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) {
            TestUpdateDeleteTableFactory.checkScanContext(context, this.tableIdentifier);
            this.isUpdate = true;
            return new SupportsRowLevelUpdate.RowLevelUpdateInfo(){

                public Optional<List<Column>> requiredColumns() {
                    List requiredCols = null;
                    if (onlyRequireUpdatedColumns) {
                        requiredCols = updatedColumns;
                    } else if (requireColumnsForUpdate != null) {
                        requiredCols = TestUpdateDeleteTableFactory.getRequiredColumns(requireColumnsForUpdate, resolvedCatalogTable.getResolvedSchema());
                    }
                    return Optional.ofNullable(requiredCols);
                }

                public SupportsRowLevelUpdate.RowLevelUpdateMode getRowLevelUpdateMode() {
                    return updateMode;
                }
            };
        }
    }

    private static class TestScanContext
    implements RowLevelModificationScanContext {
        private final Set<ObjectIdentifier> scanTables = new HashSet<ObjectIdentifier>();

        private TestScanContext() {
        }
    }

    private static class TestTableSource
    implements ScanTableSource,
    SupportsReadingMetadata,
    SupportsRowLevelModificationScan {
        private final String dataId;
        private final ObjectIdentifier tableIdentifier;

        public TestTableSource(String dataId, ObjectIdentifier tableIdentifier) {
            this.dataId = dataId;
            this.tableIdentifier = tableIdentifier;
        }

        public DynamicTableSource copy() {
            return new TestTableSource(this.dataId, this.tableIdentifier);
        }

        public String asSummaryString() {
            return "TestTableSource";
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            return new SourceFunctionProvider(){

                public SourceFunction<RowData> createSourceFunction() {
                    Collection rows = (Collection)registeredRowData.get(dataId);
                    if (rows != null) {
                        return new FromElementsFunction((Iterable)rows);
                    }
                    return new FromElementsFunction((Object[])new RowData[0]);
                }

                public boolean isBounded() {
                    return true;
                }
            };
        }

        public Map<String, DataType> listReadableMetadata() {
            HashMap<String, DataType> metaData = new HashMap<String, DataType>();
            META_COLUMNS.forEach(column -> metaData.put(column.getMetadataKey().orElse(column.getName()), column.getDataType()));
            return metaData;
        }

        public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        }

        public RowLevelModificationScanContext applyRowLevelModificationScan(SupportsRowLevelModificationScan.RowLevelModificationType rowLevelModificationType, @Nullable RowLevelModificationScanContext previousContext) {
            TestScanContext scanContext = previousContext == null ? new TestScanContext() : (TestScanContext)previousContext;
            scanContext.scanTables.add(this.tableIdentifier);
            return scanContext;
        }
    }
}

