/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieFlinkTableServiceClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.WriteStatMerger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkWriteClient<T>
extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
    private final Map<String, Path> bucketToHandles = new HashMap<String, Path>();

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
        super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
        this.tableServiceClient = new HoodieFlinkTableServiceClient(context, writeConfig, this.getTimelineServer());
    }

    @Override
    protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    @Override
    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
        List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        List<HoodieWriteStat> merged = writeStats.stream().collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath())).values().stream().map(duplicates -> (HoodieWriteStat)duplicates.stream().reduce(WriteStatMerger::merge).get()).collect(Collectors.toList());
        return this.commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
    }

    @Override
    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    @Override
    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context, metaClient);
    }

    @Override
    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List recordsWithLocation = this.getIndex().tagLocation(HoodieListData.eager(hoodieRecords), this.context, (HoodieTable)table).collectAsList();
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    @Override
    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    @Override
    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata<List<WriteStatus>> result;
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).upsert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    @Override
    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
        Map<String, List<HoodieRecord>> preppedRecordsByFileId = ((Stream)preppedRecords.stream().parallel()).collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
        return ((Stream)preppedRecordsByFileId.values().stream().parallel()).map(records -> {
            HoodieWriteMetadata<List<WriteStatus>> result;
            try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
                result = ((HoodieFlinkTable)table).upsertPrepped(this.context, closeableHandle.getWriteHandle(), instantTime, records);
            }
            return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
        }).flatMap(Collection::stream).collect(Collectors.toList());
    }

    @Override
    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata<List<WriteStatus>> result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insert(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata<List<WriteStatus>> result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insertOverwrite(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    public List<WriteStatus> insertOverwriteTable(List<HoodieRecord<T>> records, String instantTime) {
        HoodieWriteMetadata<List<WriteStatus>> result;
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
        try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table);){
            result = ((HoodieFlinkTable)table).insertOverwriteTable(this.context, closeableHandle.getWriteHandle(), instantTime, records);
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    @Override
    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> delete(List<HoodieKey> keys2, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
        HoodieWriteMetadata result = table.delete(this.context, instantTime, keys2);
        return this.postWrite(result, instantTime, table);
    }

    public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
        HoodieWriteMetadata result = table.deletePartitions(this.context, instantTime, partitions);
        return this.postWrite(result, instantTime, table);
    }

    @Override
    public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
    }

    @Override
    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        this.tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
    }

    public void initMetadataTable() {
        ((HoodieFlinkTableServiceClient)this.tableServiceClient).initMetadataTable();
    }

    public void startAsyncCleaning() {
        this.tableServiceClient.startAsyncCleanerService(this);
    }

    public void waitForCleaningFinish() {
        if (this.tableServiceClient.asyncCleanerService != null) {
            LOG.info("Cleaner has been spawned already. Waiting for it to finish");
            this.tableServiceClient.asyncClean();
            LOG.info("Cleaner has finished");
        }
    }

    @Override
    protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
        }
        return result.getWriteStatuses();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
        try {
            WriteMarkersFactory.get(this.config.getMarkersType(), this.createTable(this.config, this.hadoopConf), instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        }
        finally {
            this.heartbeatClient.stop(instantTime);
        }
    }

    @Override
    protected void mayBeCleanAndArchive(HoodieTable table) {
        this.autoArchiveOnCommit(table);
    }

    @Override
    public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        this.tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata);
    }

    @Override
    public void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
        this.tableServiceClient.completeCompaction(metadata, table, compactionCommitTime);
    }

    @Override
    protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        return this.tableServiceClient.compact(compactionInstantTime, shouldComplete);
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String clusteringCommitTime) {
        ((HoodieFlinkTableServiceClient)this.tableServiceClient).completeClustering(metadata, table, clusteringCommitTime);
    }

    @Override
    protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
    }

    public void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String commitInstant) {
        switch (tableServiceType) {
            case CLUSTER: {
                this.completeClustering((HoodieReplaceCommitMetadata)metadata, table, commitInstant);
                break;
            }
            case COMPACT: {
                this.completeCompaction(metadata, table, commitInstant);
                break;
            }
            default: {
                throw new IllegalArgumentException("This table service is not valid " + (Object)((Object)tableServiceType));
            }
        }
    }

    public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
        new UpgradeDowngrade(metaClient, this.config, this.context, FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
    }

    public void cleanHandles() {
        this.bucketToHandles.clear();
    }

    @Override
    public void close() {
        super.close();
        this.cleanHandles();
    }

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(HoodieRecord<T> record, HoodieWriteConfig config, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, Iterator<HoodieRecord<T>> recordItr) {
        FlinkWriteHandleFactory.Factory<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config);
        return writeHandleFactory.create(this.bucketToHandles, record, config, instantTime, table, recordItr);
    }

    public HoodieFlinkTable<T> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    public Map<String, List<String>> getPartitionToReplacedFileIds(WriteOperationType writeOperationType, List<WriteStatus> writeStatuses) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        switch (writeOperationType) {
            case INSERT_OVERWRITE: {
                return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toMap(partition -> partition, partitionPath -> this.getAllExistingFileIds(table, (String)partitionPath)));
            }
            case INSERT_OVERWRITE_TABLE: {
                Map<String, List<String>> partitionToExistingFileIds = new HashMap<String, List<String>>();
                List<String> partitionPaths = FSUtils.getAllPartitionPaths(this.context, this.config.getMetadataConfig(), table.getMetaClient().getBasePath());
                if (partitionPaths != null && partitionPaths.size() > 0) {
                    this.context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + this.config.getTableName());
                    partitionToExistingFileIds = ((Stream)partitionPaths.stream().parallel()).collect(Collectors.toMap(partition -> partition, partition -> this.getAllExistingFileIds(table, (String)partition)));
                }
                return partitionToExistingFileIds;
            }
        }
        throw new AssertionError();
    }

    private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
        return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }

    private final class AutoCloseableWriteHandle
    implements AutoCloseable {
        private final HoodieWriteHandle<?, ?, ?, ?> writeHandle;

        AutoCloseableWriteHandle(List<HoodieRecord<T>> records, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
            this.writeHandle = HoodieFlinkWriteClient.this.getOrCreateWriteHandle(records.get(0), HoodieFlinkWriteClient.this.getConfig(), instantTime, table, records.listIterator());
        }

        HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
            return this.writeHandle;
        }

        @Override
        public void close() {
            ((MiniBatchHandle)((Object)this.writeHandle)).closeGracefully();
        }
    }
}

