/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.ListTransactionsOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.TransactionListing;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.TransactionState;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AdminApiHandler;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.Node;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.ListTransactionsRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.protocol.Errors;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.AbstractResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.ListTransactionsRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.ListTransactionsResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.LogContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public class ListTransactionsHandler
extends AdminApiHandler.Batched<AllBrokersStrategy.BrokerKey, Collection<TransactionListing>> {
    private final Logger log;
    private final ListTransactionsOptions options;
    private final AllBrokersStrategy lookupStrategy;

    public ListTransactionsHandler(ListTransactionsOptions options, LogContext logContext) {
        this.options = options;
        this.log = logContext.logger(ListTransactionsHandler.class);
        this.lookupStrategy = new AllBrokersStrategy(logContext);
    }

    public static AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> newFuture() {
        return new AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>>();
    }

    @Override
    public String apiName() {
        return "listTransactions";
    }

    @Override
    public AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> lookupStrategy() {
        return this.lookupStrategy;
    }

    public ListTransactionsRequest.Builder buildBatchedRequest(int brokerId, Set<AllBrokersStrategy.BrokerKey> keys) {
        ListTransactionsRequestData request = new ListTransactionsRequestData();
        request.setProducerIdFilters(new ArrayList<Long>(this.options.filteredProducerIds()));
        request.setStateFilters(this.options.filteredStates().stream().map(TransactionState::toString).collect(Collectors.toList()));
        return new ListTransactionsRequest.Builder(request);
    }

    @Override
    public AdminApiHandler.ApiResult<AllBrokersStrategy.BrokerKey, Collection<TransactionListing>> handleResponse(Node broker, Set<AllBrokersStrategy.BrokerKey> keys, AbstractResponse abstractResponse) {
        int brokerId = broker.id();
        AllBrokersStrategy.BrokerKey key = this.requireSingleton(keys, brokerId);
        ListTransactionsResponse response = (ListTransactionsResponse)abstractResponse;
        Errors error = Errors.forCode(response.data().errorCode());
        if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
            this.log.debug("The `ListTransactions` request sent to broker {} failed because the coordinator is still loading state. Will try again after backing off", (Object)brokerId);
            return AdminApiHandler.ApiResult.empty();
        }
        if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
            this.log.debug("The `ListTransactions` request sent to broker {} failed because the coordinator is shutting down", (Object)brokerId);
            return AdminApiHandler.ApiResult.failed(key, new CoordinatorNotAvailableException("ListTransactions request sent to broker " + brokerId + " failed because the coordinator is shutting down"));
        }
        if (error != Errors.NONE) {
            this.log.error("The `ListTransactions` request sent to broker {} failed because of an unexpected error {}", (Object)brokerId, (Object)error);
            return AdminApiHandler.ApiResult.failed(key, error.exception("ListTransactions request sent to broker " + brokerId + " failed with an unexpected exception"));
        }
        List listings = response.data().transactionStates().stream().map(transactionState -> new TransactionListing(transactionState.transactionalId(), transactionState.producerId(), TransactionState.parse(transactionState.transactionState()))).collect(Collectors.toList());
        return AdminApiHandler.ApiResult.completed(key, listings);
    }

    private AllBrokersStrategy.BrokerKey requireSingleton(Set<AllBrokersStrategy.BrokerKey> keys, int brokerId) {
        if (keys.size() != 1) {
            throw new IllegalArgumentException("Unexpected key set: " + keys);
        }
        AllBrokersStrategy.BrokerKey key = keys.iterator().next();
        if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
            throw new IllegalArgumentException("Unexpected broker key: " + key);
        }
        return key;
    }
}

