package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactionCommitSink.class */
public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
    private final Configuration conf;
    private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
    private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
    private transient HoodieFlinkTable<?> table;

    public CompactionCommitSink(Configuration configuration) {
        super(configuration);
        this.conf = configuration;
    }

    @Override // org.apache.hudi.sink.CleanFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (this.writeClient == null) {
            this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
        }
        this.commitBuffer = new HashMap();
        this.compactionPlanCache = new HashMap();
        this.table = this.writeClient.getHoodieTable();
    }

    public void invoke(CompactionCommitEvent compactionCommitEvent, SinkFunction.Context context) throws Exception {
        String instant = compactionCommitEvent.getInstant();
        this.commitBuffer.computeIfAbsent(instant, str -> {
            return new HashMap();
        }).put(compactionCommitEvent.getFileId(), compactionCommitEvent);
        commitIfNecessary(instant, this.commitBuffer.get(instant).values());
    }

    private void commitIfNecessary(String str, Collection<CompactionCommitEvent> collection) throws IOException {
        if (this.compactionPlanCache.computeIfAbsent(str, str2 -> {
            try {
                return CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(), str);
            } catch (IOException e) {
                throw new HoodieException(e);
            }
        }).getOperations().size() == collection.size()) {
            try {
                if (collection.stream().anyMatch((v0) -> {
                    return v0.isFailed();
                })) {
                    try {
                        CompactionUtil.rollbackCompaction(this.table, str);
                        reset(str);
                        return;
                    } finally {
                        reset(str);
                    }
                }
                try {
                    doCommit(str, collection);
                    reset(str);
                } catch (Throwable th) {
                    LOG.error("Error while committing compaction instant: " + str, th);
                    reset(str);
                }
            } catch (Throwable th2) {
                throw th2;
            }
        }
    }

    private void doCommit(String str, Collection<CompactionCommitEvent> collection) throws IOException {
        this.writeClient.commitCompaction(str, CompactHelpers.getInstance().createCompactionMetadata(this.table, str, HoodieListData.eager((List) collection.stream().map((v0) -> {
            return v0.getWriteStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())), this.writeClient.getConfig().getSchema()), Option.empty());
        if (this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
            return;
        }
        this.writeClient.clean();
    }

    private void reset(String str) {
        this.commitBuffer.remove(str);
        this.compactionPlanCache.remove(str);
    }
}
