package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGenerator;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoader;
import com.starrocks.data.load.stream.TransactionStatus;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/sink/LingeringTransactionAborter.class */
public class LingeringTransactionAborter {
    private static final Logger LOG = LoggerFactory.getLogger(LingeringTransactionAborter.class);
    private final String currentLabelPrefix;
    private final long restoredCheckpointId;
    private final int subtaskIndex;
    private final int checkNumTxns;
    private final List<Tuple2<String, String>> dbTables;
    private final List<ExactlyOnceLabelGeneratorSnapshot> snapshots;
    private final StreamLoader streamLoader;

    public LingeringTransactionAborter(String str, long j, int i, int i2, List<Tuple2<String, String>> list, List<ExactlyOnceLabelGeneratorSnapshot> list2, StreamLoader streamLoader) {
        this.currentLabelPrefix = str;
        this.restoredCheckpointId = j;
        this.subtaskIndex = i;
        this.checkNumTxns = i2;
        this.dbTables = list;
        this.snapshots = list2;
        this.streamLoader = streamLoader;
        LOG.info("Create lingering transaction aborter, currentLabelPrefix: {}, restoredCheckpointId: {}, subtaskIndex: {}, checkNumTxns: {}, dbTables: {}, snapshots: {}", new Object[]{str, Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(i2), list, list2});
    }

    public void execute() throws Exception {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashSet hashSet = new HashSet();
        for (ExactlyOnceLabelGeneratorSnapshot exactlyOnceLabelGeneratorSnapshot : this.snapshots) {
            if (exactlyOnceLabelGeneratorSnapshot.getCheckpointId() != this.restoredCheckpointId) {
                LOG.warn("The checkpoint id for snapshot is not equal to the restored id, restoredCheckpointId: {}, snapshot: {}", Long.valueOf(this.restoredCheckpointId), exactlyOnceLabelGeneratorSnapshot);
            } else {
                ExactlyOnceLabelGenerator.LabelDbTableSubtask createLabelDbTableSubtask = exactlyOnceLabelGeneratorSnapshot.createLabelDbTableSubtask();
                ExactlyOnceLabelGeneratorSnapshot exactlyOnceLabelGeneratorSnapshot2 = (ExactlyOnceLabelGeneratorSnapshot) linkedHashMap.get(createLabelDbTableSubtask);
                if (exactlyOnceLabelGeneratorSnapshot2 != null) {
                    LOG.warn("Find duplicate snapshot, old snapshot: {}, new snapshot: {}", exactlyOnceLabelGeneratorSnapshot2, exactlyOnceLabelGeneratorSnapshot);
                }
                linkedHashMap.put(createLabelDbTableSubtask, exactlyOnceLabelGeneratorSnapshot);
                hashSet.add(exactlyOnceLabelGeneratorSnapshot.getLabelPrefix());
            }
        }
        if (hashSet.size() > 1) {
            LOG.warn("There are multiple label prefix, {}", hashSet);
        }
        abortSnapshotLabelPrefix(new ArrayList(linkedHashMap.values()));
        if (this.currentLabelPrefix == null || hashSet.contains(this.currentLabelPrefix)) {
            return;
        }
        abortCurrentLabelPrefix();
    }

    private void abortSnapshotLabelPrefix(List<ExactlyOnceLabelGeneratorSnapshot> list) throws Exception {
        for (ExactlyOnceLabelGeneratorSnapshot exactlyOnceLabelGeneratorSnapshot : list) {
            long nextId = this.checkNumTxns < 0 ? Long.MAX_VALUE : exactlyOnceLabelGeneratorSnapshot.getNextId() + this.checkNumTxns;
            long nextId2 = exactlyOnceLabelGeneratorSnapshot.getNextId();
            while (true) {
                long j = nextId2;
                if (j < nextId) {
                    String genLabel = ExactlyOnceLabelGenerator.genLabel(exactlyOnceLabelGeneratorSnapshot.getLabelPrefix(), exactlyOnceLabelGeneratorSnapshot.getTable(), exactlyOnceLabelGeneratorSnapshot.getSubTaskIndex(), j);
                    try {
                        boolean tryAbortTransaction = tryAbortTransaction(exactlyOnceLabelGeneratorSnapshot.getDb(), exactlyOnceLabelGeneratorSnapshot.getTable(), genLabel);
                        if (tryAbortTransaction) {
                            LOG.info("Successful to abort transaction, label: {}, snapshot: {}", genLabel, exactlyOnceLabelGeneratorSnapshot);
                        } else {
                            LOG.info("Transaction does not need abort, label: {}, snapshot: {}, ", genLabel, exactlyOnceLabelGeneratorSnapshot);
                        }
                        if (this.checkNumTxns >= 0 || tryAbortTransaction) {
                            nextId2 = j + 1;
                        }
                    } catch (Exception e) {
                        String format = String.format("Failed to abort transactions with label %s, the snapshot is %s", genLabel, exactlyOnceLabelGeneratorSnapshot);
                        LOG.error("{}", format, e);
                        throw new Exception(format, e);
                    }
                }
            }
        }
    }

    private void abortCurrentLabelPrefix() throws Exception {
        for (Tuple2<String, String> tuple2 : this.dbTables) {
            long j = this.checkNumTxns < 0 ? Long.MAX_VALUE : this.restoredCheckpointId + 1 + this.checkNumTxns;
            long j2 = this.restoredCheckpointId;
            while (true) {
                long j3 = j2 + 1;
                if (j3 < j) {
                    String genLabel = ExactlyOnceLabelGenerator.genLabel(this.currentLabelPrefix, (String) tuple2.f1, this.subtaskIndex, j3);
                    try {
                        boolean tryAbortTransaction = tryAbortTransaction((String) tuple2.f0, (String) tuple2.f1, genLabel);
                        if (tryAbortTransaction) {
                            LOG.info("Successful to abort transaction, label: {}, db: {}, table: {}", new Object[]{genLabel, tuple2.f0, tuple2.f1});
                        } else {
                            LOG.info("Transaction does not need abort, label: {}, db: {}, table: {}", new Object[]{genLabel, tuple2.f0, tuple2.f1});
                        }
                        if (this.checkNumTxns >= 0 || tryAbortTransaction) {
                            j2 = j3;
                        }
                    } catch (Exception e) {
                        String format = String.format("Failed to abort transactions with label %s, db: %s, table: %s", genLabel, tuple2.f0, tuple2.f1);
                        LOG.error("{}", format, e);
                        throw new Exception(format, e);
                    }
                }
            }
        }
    }

    private boolean tryAbortTransaction(String str, String str2, String str3) throws Exception {
        try {
            TransactionStatus loadStatus = this.streamLoader.getLoadStatus(str, str2, str3);
            LOG.info("Transaction status for db: {}, table: {}, label: {}, status: {}", new Object[]{str, str2, str3, loadStatus});
            if (loadStatus == TransactionStatus.UNKNOWN || loadStatus == TransactionStatus.ABORTED) {
                return false;
            }
            if (loadStatus == TransactionStatus.COMMITTED || loadStatus == TransactionStatus.VISIBLE) {
                String format = String.format("Try to abort a finished transactions, db: %s, table: %s, label: %s, status: %s. The reason may be that you are restoring from an earlier checkpoint rather than the newest, and you can use a new sink.label-prefix, or report the issue", str, str2, str3, loadStatus);
                LOG.error(format);
                throw new Exception(format);
            }
            if (loadStatus != TransactionStatus.PREPARE && loadStatus != TransactionStatus.PREPARED) {
                String format2 = String.format("The status of the transaction is not supported when trying to abort lingering transactions, db: %s, table: %s, label: %s, status: %s", str, str2, str3, loadStatus);
                LOG.error(format2);
                throw new Exception(format2);
            }
            try {
                if (!this.streamLoader.rollback(new StreamLoadSnapshot.Transaction(str, str2, str3))) {
                    throw new Exception(String.format("Failed to abort transaction, and please find the reason in the taskmanager log, db: %s, table: %s, label: %s", str, str2, str3));
                }
                LOG.info("Successful to abort the lingering transaction, db: {}, table: {}, label: {}", new Object[]{str, str2, str3});
                return true;
            } catch (Exception e) {
                TransactionStatus transactionStatus = null;
                try {
                    transactionStatus = this.streamLoader.getLoadStatus(str, str2, str3);
                    LOG.info("Transaction status after trying to abort for db: {}, table: {}, label: {}, status: {}", new Object[]{str, str2, str3, transactionStatus});
                } catch (Exception e2) {
                    LOG.error("Fail to get new transaction status after failing to abort the transaction, db: {}, table: {}, label: {}", new Object[]{str, str2, str3, e2});
                }
                if (transactionStatus == TransactionStatus.UNKNOWN || transactionStatus == TransactionStatus.ABORTED) {
                    LOG.info("Successful to abort the lingering transaction, db: {}, table: {}, label: {}, new status: {}, but there is an exception when abort it", new Object[]{str, str2, str3, transactionStatus, e});
                    return true;
                }
                String format3 = String.format("Fail to abort lingering transaction, db: %s, table: %s, label: %s, status: %s", str, str2, str3, transactionStatus);
                LOG.error(format3, e);
                throw new Exception(format3, e);
            }
        } catch (Exception e3) {
            String format4 = String.format("Fail to get status of the label when trying to abort lingering transactions, db: %s, table: %s, label: %s", str, str2, str3);
            LOG.error(format4, e3);
            throw new Exception(format4, e3);
        }
    }
}
