package org.apache.hudi.table.catalog;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/catalog/HoodieCatalog.class */
public class HoodieCatalog extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalog.class);
    private final Configuration hadoopConf;
    private final String catalogPathStr;
    private final Map<String, String> tableCommonOptions;
    private Path catalogPath;
    private FileSystem fs;

    public HoodieCatalog(String str, org.apache.flink.configuration.Configuration configuration) {
        super(str, (String) configuration.get(CatalogOptions.DEFAULT_DATABASE));
        configuration.getOptional(CatalogOptions.CATALOG_PATH).orElseThrow(() -> {
            return new ValidationException("Option [catalog.path] should not be empty.");
        });
        this.catalogPathStr = (String) configuration.get(CatalogOptions.CATALOG_PATH);
        this.hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
        this.tableCommonOptions = CatalogOptions.tableCommonOptions(configuration);
    }

    public void open() throws CatalogException {
        this.fs = FSUtils.getFs(this.catalogPathStr, this.hadoopConf);
        this.catalogPath = new Path(this.catalogPathStr);
        try {
            if (this.fs.exists(this.catalogPath)) {
            } else {
                throw new CatalogException(String.format("Catalog %s path %s does not exist.", getName(), this.catalogPathStr));
            }
        } catch (IOException e) {
            throw new CatalogException(String.format("Checking catalog path %s exists exception.", this.catalogPathStr), e);
        }
    }

    public void close() throws CatalogException {
        try {
            this.fs.close();
        } catch (IOException e) {
            throw new CatalogException("Closing FileSystem exception.", e);
        }
    }

    public List<String> listDatabases() throws CatalogException {
        try {
            return (List) Arrays.stream(this.fs.listStatus(this.catalogPath)).filter((v0) -> {
                return v0.isDirectory();
            }).map(fileStatus -> {
                return fileStatus.getPath().getName();
            }).collect(Collectors.toList());
        } catch (IOException e) {
            throw new CatalogException("Listing database exception.", e);
        }
    }

    public CatalogDatabase getDatabase(String str) throws DatabaseNotExistException, CatalogException {
        if (databaseExists(str)) {
            return new CatalogDatabaseImpl(Collections.emptyMap(), (String) null);
        }
        throw new DatabaseNotExistException(getName(), str);
    }

    public boolean databaseExists(String str) throws CatalogException {
        Preconditions.checkArgument(!StringUtils.isNullOrEmpty(str));
        return listDatabases().contains(str);
    }

    public void createDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseAlreadyExistException, CatalogException {
        if (databaseExists(str)) {
            if (!z) {
                throw new DatabaseAlreadyExistException(getName(), str);
            }
        } else {
            if (!CollectionUtil.isNullOrEmpty(catalogDatabase.getProperties())) {
                throw new CatalogException("Hudi catalog doesn't support to create database with options.");
            }
            try {
                this.fs.mkdirs(new Path(this.catalogPath, str));
            } catch (IOException e) {
                throw new CatalogException(String.format("Creating database %s exception.", str), e);
            }
        }
    }

    public void dropDatabase(String str, boolean z, boolean z2) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        if (!databaseExists(str)) {
            if (!z) {
                throw new DatabaseNotExistException(getName(), str);
            }
        } else {
            if (!listTables(str).isEmpty() && !z2) {
                throw new DatabaseNotEmptyException(getName(), str);
            }
            if (str.equals(getDefaultDatabase())) {
                throw new IllegalArgumentException("Hudi catalog doesn't support to drop the default database.");
            }
            try {
                this.fs.delete(new Path(this.catalogPath, str), true);
            } catch (IOException e) {
                throw new CatalogException(String.format("Dropping database %s exception.", str), e);
            }
        }
    }

    public void alterDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException("Altering database is not implemented.");
    }

    public List<String> listTables(String str) throws DatabaseNotExistException, CatalogException {
        if (!databaseExists(str)) {
            throw new DatabaseNotExistException(getName(), str);
        }
        Path path = new Path(this.catalogPath, str);
        try {
            return (List) Arrays.stream(this.fs.listStatus(path)).filter((v0) -> {
                return v0.isDirectory();
            }).map(fileStatus -> {
                return fileStatus.getPath().getName();
            }).collect(Collectors.toList());
        } catch (IOException e) {
            throw new CatalogException(String.format("Listing table in database %s exception.", path), e);
        }
    }

    public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            throw new TableNotExistException(getName(), objectPath);
        }
        String inferTablePath = inferTablePath(this.catalogPathStr, objectPath);
        Map<String, String> loadFromProperties = TableOptionProperties.loadFromProperties(inferTablePath, this.hadoopConf);
        Schema latestTableSchema = getLatestTableSchema(inferTablePath);
        if (latestTableSchema == null) {
            throw new TableNotExistException(getName(), objectPath);
        }
        List<String> pkColumns = TableOptionProperties.getPkColumns(loadFromProperties);
        Schema.Builder fromRowDataType = org.apache.flink.table.api.Schema.newBuilder().fromRowDataType(DataTypeUtils.ensureColumnsAsNonNullable(AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns));
        String pkConstraintName = TableOptionProperties.getPkConstraintName(loadFromProperties);
        if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
            fromRowDataType.primaryKeyNamed(pkConstraintName, pkColumns);
        } else if (!CollectionUtils.isNullOrEmpty(pkColumns)) {
            fromRowDataType.primaryKey(pkColumns);
        }
        return CatalogTable.of(fromRowDataType.build(), TableOptionProperties.getComment(loadFromProperties), TableOptionProperties.getPartitionColumns(loadFromProperties), TableOptionProperties.getTableOptions(loadFromProperties));
    }

    public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean z) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        if (!databaseExists(objectPath.getDatabaseName())) {
            throw new DatabaseNotExistException(getName(), objectPath.getDatabaseName());
        }
        if (tableExists(objectPath)) {
            if (!z) {
                throw new TableAlreadyExistException(getName(), objectPath);
            }
            return;
        }
        if (catalogBaseTable instanceof CatalogView) {
            throw new UnsupportedOperationException("Hudi catalog doesn't support to CREATE VIEW.");
        }
        ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) catalogBaseTable;
        String inferTablePath = inferTablePath(this.catalogPathStr, objectPath);
        Map<String, String> applyOptionsHook = applyOptionsHook(inferTablePath, catalogBaseTable.getOptions());
        org.apache.flink.configuration.Configuration fromMap = org.apache.flink.configuration.Configuration.fromMap(applyOptionsHook);
        fromMap.setString(FlinkOptions.PATH, inferTablePath);
        ResolvedSchema resolvedSchema = resolvedCatalogTable.getResolvedSchema();
        if (!resolvedSchema.getPrimaryKey().isPresent()) {
            throw new CatalogException("Primary key definition is missing");
        }
        fromMap.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(resolvedSchema.toPhysicalRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName(objectPath.getObjectName())).toString());
        String join = String.join(",", ((UniqueConstraint) resolvedSchema.getPrimaryKey().get()).getColumns());
        fromMap.setString(FlinkOptions.RECORD_KEY_FIELD, join);
        applyOptionsHook.put(TableOptionProperties.PK_CONSTRAINT_NAME, ((UniqueConstraint) resolvedSchema.getPrimaryKey().get()).getName());
        applyOptionsHook.put(TableOptionProperties.PK_COLUMNS, join);
        String string = fromMap.getString(FlinkOptions.PRECOMBINE_FIELD);
        if (!resolvedSchema.getColumnNames().contains(string)) {
            if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(fromMap)) {
                throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
            }
            if (string.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
                fromMap.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE);
            } else if (!string.equals(FlinkOptions.NO_PRE_COMBINE)) {
                throw new HoodieValidationException("Field " + string + " does not exist in the table schema.Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
            }
        }
        if (resolvedCatalogTable.isPartitioned()) {
            String join2 = String.join(",", resolvedCatalogTable.getPartitionKeys());
            fromMap.setString(FlinkOptions.PARTITION_PATH_FIELD, join2);
            applyOptionsHook.put(TableOptionProperties.PARTITION_COLUMNS, join2);
        }
        fromMap.setString(FlinkOptions.TABLE_NAME, objectPath.getObjectName());
        try {
            StreamerUtil.initTableIfNotExists(fromMap);
            if (!StringUtils.isNullOrEmpty(resolvedCatalogTable.getComment())) {
                applyOptionsHook.put("comment", resolvedCatalogTable.getComment());
            }
            TableOptionProperties.createProperties(inferTablePath, this.hadoopConf, applyOptionsHook);
        } catch (IOException e) {
            throw new CatalogException(String.format("Initialize table path %s exception.", inferTablePath), e);
        }
    }

    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
        return StreamerUtil.tableExists(inferTablePath(this.catalogPathStr, objectPath), this.hadoopConf);
    }

    public void dropTable(ObjectPath objectPath, boolean z) throws TableNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            if (!z) {
                throw new TableNotExistException(getName(), objectPath);
            }
        } else {
            try {
                this.fs.delete(new Path(inferTablePath(this.catalogPathStr, objectPath)), true);
            } catch (IOException e) {
                throw new CatalogException(String.format("Dropping table %s exception.", objectPath), e);
            }
        }
    }

    public void renameTable(ObjectPath objectPath, String str, boolean z) throws TableNotExistException, TableAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException("renameTable is not implemented.");
    }

    public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean z) throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterTable is not implemented.");
    }

    public List<String> listViews(String str) throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath objectPath, List<Expression> list) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogPartition getPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        throw new PartitionNotExistException(getName(), objectPath, catalogPartitionSpec);
    }

    public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException {
        if (!tableExists(objectPath)) {
            return false;
        }
        return StreamerUtil.partitionExists(inferTablePath(this.catalogPathStr, objectPath), HoodieCatalogUtil.inferPartitionPath(Boolean.parseBoolean(TableOptionProperties.loadFromProperties(inferTablePath(this.catalogPathStr, objectPath), this.hadoopConf).getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), HoodieRealtimeRecordReader.DEFAULT_REALTIME_SKIP_MERGE)), catalogPartitionSpec), this.hadoopConf);
    }

    public void createPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean z) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
        throw new UnsupportedOperationException("createPartition is not implemented.");
    }

    public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, boolean z) throws PartitionNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            if (!z) {
                throw new PartitionNotExistException(getName(), objectPath, catalogPartitionSpec);
            }
            return;
        }
        String inferTablePath = inferTablePath(this.catalogPathStr, objectPath);
        Map<String, String> loadFromProperties = TableOptionProperties.loadFromProperties(inferTablePath, this.hadoopConf);
        String inferPartitionPath = HoodieCatalogUtil.inferPartitionPath(Boolean.parseBoolean(loadFromProperties.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), HoodieRealtimeRecordReader.DEFAULT_REALTIME_SKIP_MERGE)), catalogPartitionSpec);
        if (!StreamerUtil.partitionExists(inferTablePath, inferPartitionPath, this.hadoopConf)) {
            if (!z) {
                throw new PartitionNotExistException(getName(), objectPath, catalogPartitionSpec);
            }
            return;
        }
        loadFromProperties.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
        try {
            HoodieFlinkWriteClient<?> createWriteClient = createWriteClient(loadFromProperties, inferTablePath, objectPath);
            Throwable th = null;
            try {
                try {
                    createWriteClient.deletePartitions(Collections.singletonList(inferPartitionPath), HoodieActiveTimeline.createNewInstantTime()).forEach(writeStatus -> {
                        if (writeStatus.hasErrors()) {
                            throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
                        }
                    });
                    this.fs.delete(new Path(inferTablePath, inferPartitionPath), true);
                    if (createWriteClient != null) {
                        if (0 != 0) {
                            try {
                                createWriteClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createWriteClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new CatalogException(String.format("Dropping partition %s of table %s exception.", inferPartitionPath, objectPath), e);
        }
    }

    public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterPartition is not implemented.");
    }

    public List<String> listFunctions(String str) throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogFunction getFunction(ObjectPath objectPath) throws FunctionNotExistException, CatalogException {
        throw new FunctionNotExistException(getName(), objectPath);
    }

    public boolean functionExists(ObjectPath objectPath) throws CatalogException {
        return false;
    }

    public void createFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean z) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException("createFunction is not implemented.");
    }

    public void alterFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean z) throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterFunction is not implemented.");
    }

    public void dropFunction(ObjectPath objectPath, boolean z) throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException("dropFunction is not implemented.");
    }

    public CatalogTableStatistics getTableStatistics(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public CatalogTableStatistics getPartitionStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public void alterTableStatistics(ObjectPath objectPath, CatalogTableStatistics catalogTableStatistics, boolean z) throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterTableStatistics is not implemented.");
    }

    public void alterTableColumnStatistics(ObjectPath objectPath, CatalogColumnStatistics catalogColumnStatistics, boolean z) throws TableNotExistException, CatalogException, TablePartitionedException {
        throw new UnsupportedOperationException("alterTableColumnStatistics is not implemented.");
    }

    public void alterPartitionStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics catalogTableStatistics, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterPartitionStatistics is not implemented.");
    }

    public void alterPartitionColumnStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics catalogColumnStatistics, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException("alterPartitionColumnStatistics is not implemented.");
    }

    @Nullable
    private org.apache.hudi.org.apache.avro.Schema getLatestTableSchema(String str) {
        if (str == null || !StreamerUtil.tableExists(str, this.hadoopConf)) {
            return null;
        }
        try {
            return new TableSchemaResolver(StreamerUtil.createMetaClient(str, this.hadoopConf)).getTableAvroSchema(false);
        } catch (Throwable th) {
            LOG.warn("Error while resolving the latest table schema.", th);
            return null;
        }
    }

    private Map<String, String> applyOptionsHook(String str, Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.put("connector", "hudi");
        hashMap.computeIfAbsent(FlinkOptions.PATH.key(), str2 -> {
            return str;
        });
        Map<String, String> map2 = this.tableCommonOptions;
        hashMap.getClass();
        map2.forEach((v1, v2) -> {
            r1.putIfAbsent(v1, v2);
        });
        return hashMap;
    }

    private HoodieFlinkWriteClient<?> createWriteClient(Map<String, String> map, String str, ObjectPath objectPath) {
        return FlinkWriteClients.createWriteClientV2(org.apache.flink.configuration.Configuration.fromMap(map).set(FlinkOptions.TABLE_NAME, objectPath.getObjectName()).set(FlinkOptions.SOURCE_AVRO_SCHEMA, StreamerUtil.createMetaClient(str, this.hadoopConf).getTableConfig().getTableCreateSchema().get().toString()));
    }

    @VisibleForTesting
    protected String inferTablePath(String str, ObjectPath objectPath) {
        return String.format("%s/%s/%s", str, objectPath.getDatabaseName(), objectPath.getObjectName());
    }
}
