/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactOperator
extends TableStreamOperator<CompactionCommitEvent>
implements OneInputStreamOperator<CompactionPlanEvent, CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactOperator.class);
    private final Configuration conf;
    private transient HoodieFlinkWriteClient<?> writeClient;
    private final boolean asyncCompaction;
    private int taskID;
    private transient NonThrownExecutor executor;
    private transient StreamRecordCollector<CompactionCommitEvent> collector;

    public CompactOperator(Configuration conf) {
        this.conf = conf;
        this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<CompactionCommitEvent>> output) {
        super.setup(containingTask, config, new MaskingOutputAdapter<CompactionCommitEvent>(output));
    }

    public void open() throws Exception {
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, (RuntimeContext)this.getRuntimeContext());
        if (this.asyncCompaction) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
        this.collector = new StreamRecordCollector(this.output);
    }

    public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
        CompactionPlanEvent event = (CompactionPlanEvent)record.getValue();
        String instantTime = event.getCompactionInstantTime();
        CompactionOperation compactionOperation = event.getOperation();
        if (this.asyncCompaction) {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.doCompaction(instantTime, compactionOperation, (Collector<CompactionCommitEvent>)this.collector, this.reloadWriteConfig())), (errMsg, t) -> this.collector.collect((Object)new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), this.taskID)), "Execute compaction for instant %s from task %d", instantTime, this.taskID);
        } else {
            LOG.info("Execute compaction for instant {} from task {}", (Object)instantTime, (Object)this.taskID);
            this.doCompaction(instantTime, compactionOperation, (Collector<CompactionCommitEvent>)this.collector, this.writeClient.getConfig());
        }
    }

    private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector, HoodieWriteConfig writeConfig) throws IOException {
        HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
        HoodieTableMetaClient metaClient = this.writeClient.getHoodieTable().getMetaClient();
        String maxInstantTime = compactor.getMaxInstantTime(metaClient);
        List<WriteStatus> writeStatuses = compactor.compact(new HoodieFlinkCopyOnWriteTable(writeConfig, this.writeClient.getEngineContext(), metaClient), metaClient, this.writeClient.getConfig(), compactionOperation, instantTime, maxInstantTime, this.writeClient.getHoodieTable().getTaskContextSupplier());
        collector.collect((Object)new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, this.taskID));
    }

    private HoodieWriteConfig reloadWriteConfig() throws Exception {
        HoodieWriteConfig writeConfig = this.writeClient.getConfig();
        CompactionUtil.setAvroSchema(writeConfig, this.writeClient.getHoodieTable().getMetaClient());
        return writeConfig;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor executor) {
        this.executor = executor;
    }

    public void close() throws Exception {
        if (null != this.executor) {
            this.executor.close();
        }
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }
}

