/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.ClientResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.GroupRebalanceConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.Heartbeat;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.KafkaException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.Node;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.AuthenticationException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.DisconnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.FencedInstanceIdException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.GroupAuthorizationException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.IllegalGenerationException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.InterruptException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.MemberIdRequiredException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.RebalanceInProgressException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.RetriableException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.UnknownMemberIdException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.FindCoordinatorRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.FindCoordinatorResponseData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.HeartbeatRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.JoinGroupRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.JoinGroupResponseData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.LeaveGroupRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.LeaveGroupResponseData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.message.SyncGroupRequestData;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.Measurable;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.Metrics;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.Sensor;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.Avg;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.CumulativeCount;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.CumulativeSum;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.Max;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.Meter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.Rate;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.metrics.stats.WindowedCount;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.protocol.ApiKeys;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.protocol.Errors;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.AbstractResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.FindCoordinatorRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.FindCoordinatorResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.HeartbeatRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.HeartbeatResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.JoinGroupRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.JoinGroupResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.LeaveGroupRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.LeaveGroupResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.SyncGroupRequest;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.requests.SyncGroupResponse;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.KafkaThread;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.LogContext;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Timer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public abstract class AbstractCoordinator
implements Closeable {
    public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread";
    public static final int JOIN_GROUP_TIMEOUT_LAPSE = 5000;
    private final Logger log;
    private final Heartbeat heartbeat;
    private final GroupCoordinatorMetrics sensors;
    private final GroupRebalanceConfig rebalanceConfig;
    protected final Time time;
    protected final ConsumerNetworkClient client;
    private Node coordinator = null;
    private String rejoinReason = "";
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private HeartbeatThread heartbeatThread = null;
    private RequestFuture<ByteBuffer> joinFuture = null;
    private RequestFuture<Void> findCoordinatorFuture = null;
    private volatile RuntimeException fatalFindCoordinatorException = null;
    private Generation generation = Generation.NO_GENERATION;
    private long lastRebalanceStartMs = -1L;
    private long lastRebalanceEndMs = -1L;
    private long lastTimeOfConnectionMs = -1L;
    protected MemberState state = MemberState.UNJOINED;

    public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, LogContext logContext, ConsumerNetworkClient client, Metrics metrics, String metricGrpPrefix, Time time) {
        Objects.requireNonNull(rebalanceConfig.groupId, "Expected a non-null group id for coordinator construction");
        this.rebalanceConfig = rebalanceConfig;
        this.log = logContext.logger(this.getClass());
        this.client = client;
        this.time = time;
        this.heartbeat = new Heartbeat(rebalanceConfig, time);
        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
    }

    protected abstract String protocolType();

    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata();

    protected abstract boolean onJoinPrepare(int var1, String var2);

    protected abstract Map<String, ByteBuffer> onLeaderElected(String var1, String var2, List<JoinGroupResponseData.JoinGroupResponseMember> var3, boolean var4);

    protected abstract void onJoinComplete(int var1, String var2, String var3, ByteBuffer var4);

    protected void onLeavePrepare() {
    }

    protected synchronized boolean ensureCoordinatorReady(Timer timer) {
        if (!this.coordinatorUnknown()) {
            return true;
        }
        do {
            if (this.fatalFindCoordinatorException != null) {
                RuntimeException fatalException = this.fatalFindCoordinatorException;
                this.fatalFindCoordinatorException = null;
                throw fatalException;
            }
            RequestFuture<Void> future = this.lookupCoordinator();
            this.client.poll(future, timer);
            if (!future.isDone()) break;
            RuntimeException fatalException = null;
            if (future.failed()) {
                if (future.isRetriable()) {
                    this.log.debug("Coordinator discovery failed, refreshing metadata", (Throwable)future.exception());
                    this.client.awaitMetadataUpdate(timer);
                } else {
                    fatalException = future.exception();
                    this.log.info("FindCoordinator request hit fatal exception", (Throwable)fatalException);
                }
            } else if (this.coordinator != null && this.client.isUnavailable(this.coordinator)) {
                this.markCoordinatorUnknown("coordinator unavailable");
                timer.sleep(this.rebalanceConfig.retryBackoffMs);
            }
            this.clearFindCoordinatorFuture();
            if (fatalException == null) continue;
            throw fatalException;
        } while (this.coordinatorUnknown() && timer.notExpired());
        return !this.coordinatorUnknown();
    }

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (this.findCoordinatorFuture == null) {
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                this.log.debug("No broker available to send FindCoordinator request");
                return RequestFuture.noBrokersAvailable();
            }
            this.findCoordinatorFuture = this.sendFindCoordinatorRequest(node);
        }
        return this.findCoordinatorFuture;
    }

    private synchronized void clearFindCoordinatorFuture() {
        this.findCoordinatorFuture = null;
    }

    protected synchronized boolean rejoinNeededOrPending() {
        return this.rejoinNeeded || this.joinFuture != null;
    }

    protected synchronized void pollHeartbeat(long now) {
        if (this.heartbeatThread != null) {
            if (this.heartbeatThread.hasFailed()) {
                RuntimeException cause = this.heartbeatThread.failureCause();
                this.heartbeatThread = null;
                throw cause;
            }
            if (this.heartbeat.shouldHeartbeat(now)) {
                this.notify();
            }
            this.heartbeat.poll(now);
        }
    }

    protected synchronized long timeToNextHeartbeat(long now) {
        if (this.state.hasNotJoinedGroup()) {
            return Long.MAX_VALUE;
        }
        return this.heartbeat.timeToNextHeartbeat(now);
    }

    public void ensureActiveGroup() {
        while (!this.ensureActiveGroup(this.time.timer(Long.MAX_VALUE))) {
            this.log.warn("still waiting to ensure active group");
        }
    }

    boolean ensureActiveGroup(Timer timer) {
        if (!this.ensureCoordinatorReady(timer)) {
            return false;
        }
        this.startHeartbeatThreadIfNeeded();
        return this.joinGroupIfNeeded(timer);
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (this.heartbeatThread == null) {
            this.heartbeatThread = new HeartbeatThread();
            this.heartbeatThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeHeartbeatThread() {
        HeartbeatThread thread;
        AbstractCoordinator abstractCoordinator = this;
        synchronized (abstractCoordinator) {
            if (this.heartbeatThread == null) {
                return;
            }
            this.heartbeatThread.close();
            thread = this.heartbeatThread;
            this.heartbeatThread = null;
        }
        try {
            thread.join();
        }
        catch (InterruptedException e) {
            this.log.warn("Interrupted while waiting for consumer heartbeat thread to close");
            throw new InterruptException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean joinGroupIfNeeded(Timer timer) {
        while (this.rejoinNeededOrPending()) {
            if (!this.ensureCoordinatorReady(timer)) {
                return false;
            }
            if (this.needsJoinPrepare) {
                this.needsJoinPrepare = false;
                if (!this.onJoinPrepare(this.generation.generationId, this.generation.memberId)) {
                    this.needsJoinPrepare = true;
                    return false;
                }
            }
            RequestFuture<ByteBuffer> future = this.initiateJoinGroup();
            this.client.poll(future, timer);
            if (!future.isDone()) {
                return false;
            }
            if (future.succeeded()) {
                MemberState stateSnapshot;
                Generation generationSnapshot;
                AbstractCoordinator abstractCoordinator = this;
                synchronized (abstractCoordinator) {
                    generationSnapshot = this.generation;
                    stateSnapshot = this.state;
                }
                if (!this.hasGenerationReset(generationSnapshot) && stateSnapshot == MemberState.STABLE) {
                    ByteBuffer memberAssignment = future.value().duplicate();
                    this.onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);
                    this.resetJoinGroupFuture();
                    this.needsJoinPrepare = true;
                    continue;
                }
                String reason = String.format("rebalance failed since the generation/state was modified by heartbeat thread to %s/%s before the rebalance callback triggered", new Object[]{generationSnapshot, stateSnapshot});
                this.resetStateAndRejoin(reason, true);
                this.resetJoinGroupFuture();
                continue;
            }
            RuntimeException exception = future.exception();
            this.resetJoinGroupFuture();
            AbstractCoordinator abstractCoordinator = this;
            synchronized (abstractCoordinator) {
                this.rejoinReason = String.format("rebalance failed due to '%s' (%s)", exception.getMessage(), exception.getClass().getSimpleName());
                this.rejoinNeeded = true;
            }
            if (exception instanceof UnknownMemberIdException || exception instanceof IllegalGenerationException || exception instanceof RebalanceInProgressException || exception instanceof MemberIdRequiredException) continue;
            if (!future.isRetriable()) {
                throw exception;
            }
            timer.sleep(this.rebalanceConfig.retryBackoffMs);
        }
        return true;
    }

    private synchronized void resetJoinGroupFuture() {
        this.joinFuture = null;
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        if (this.joinFuture == null) {
            this.state = MemberState.PREPARING_REBALANCE;
            if (this.lastRebalanceStartMs == -1L) {
                this.lastRebalanceStartMs = this.time.milliseconds();
            }
            this.joinFuture = this.sendJoinGroupRequest();
            this.joinFuture.addListener(new RequestFutureListener<ByteBuffer>(){

                @Override
                public void onSuccess(ByteBuffer value) {
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        ((AbstractCoordinator)AbstractCoordinator.this).sensors.failedRebalanceSensor.record();
                    }
                }
            });
        }
        return this.joinFuture;
    }

    RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(new JoinGroupRequestData().setGroupId(this.rebalanceConfig.groupId).setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs).setMemberId(this.generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setProtocolType(this.protocolType()).setProtocols(this.metadata()).setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs).setReason(this.rejoinReason));
        this.log.debug("Sending JoinGroup ({}) to coordinator {}", (Object)requestBuilder, (Object)this.coordinator);
        int joinGroupTimeoutMs = Math.max(this.client.defaultRequestTimeoutMs(), Math.max(this.rebalanceConfig.rebalanceTimeoutMs + 5000, this.rebalanceConfig.rebalanceTimeoutMs));
        return this.client.send(this.coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler(this.generation));
    }

    private RequestFuture<ByteBuffer> onJoinFollower() {
        SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(new SyncGroupRequestData().setGroupId(this.rebalanceConfig.groupId).setMemberId(this.generation.memberId).setProtocolType(this.protocolType()).setProtocolName(this.generation.protocolName).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setGenerationId(this.generation.generationId).setAssignments(Collections.emptyList()));
        this.log.debug("Sending follower SyncGroup to coordinator {}: {}", (Object)this.coordinator, (Object)requestBuilder);
        return this.sendSyncGroupRequest(requestBuilder);
    }

    private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
        try {
            Map<String, ByteBuffer> groupAssignment = this.onLeaderElected(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members(), joinResponse.data().skipAssignment());
            ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment>();
            for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
                groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
            }
            SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(new SyncGroupRequestData().setGroupId(this.rebalanceConfig.groupId).setMemberId(this.generation.memberId).setProtocolType(this.protocolType()).setProtocolName(this.generation.protocolName).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setGenerationId(this.generation.generationId).setAssignments(groupAssignmentList));
            this.log.debug("Sending leader SyncGroup to coordinator {}: {}", (Object)this.coordinator, (Object)requestBuilder);
            return this.sendSyncGroupRequest(requestBuilder);
        }
        catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        return this.client.send(this.coordinator, requestBuilder).compose(new SyncGroupResponseHandler(this.generation));
    }

    private boolean hasGenerationReset(Generation gen) {
        return gen.generationId == Generation.NO_GENERATION.generationId && gen.protocolName == null;
    }

    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
        this.log.debug("Sending FindCoordinator request to broker {}", (Object)node);
        FindCoordinatorRequestData data = new FindCoordinatorRequestData().setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()).setKey(this.rebalanceConfig.groupId);
        FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
        return this.client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler());
    }

    public boolean coordinatorUnknown() {
        return this.checkAndGetCoordinator() == null;
    }

    protected synchronized Node checkAndGetCoordinator() {
        if (this.coordinator != null && this.client.isUnavailable(this.coordinator)) {
            this.markCoordinatorUnknown(true, "coordinator unavailable");
            return null;
        }
        return this.coordinator;
    }

    private synchronized Node coordinator() {
        return this.coordinator;
    }

    protected synchronized void markCoordinatorUnknown(Errors error) {
        this.markCoordinatorUnknown(false, "error response " + error.name());
    }

    protected synchronized void markCoordinatorUnknown(String cause) {
        this.markCoordinatorUnknown(false, cause);
    }

    protected synchronized void markCoordinatorUnknown(boolean isDisconnected, String cause) {
        if (this.coordinator != null) {
            this.log.info("Group coordinator {} is unavailable or invalid due to cause: {}.isDisconnected: {}. Rediscovery will be attempted.", new Object[]{this.coordinator, cause, isDisconnected});
            Node oldCoordinator = this.coordinator;
            this.coordinator = null;
            if (!isDisconnected) {
                this.log.info("Requesting disconnect from last known coordinator {}", (Object)oldCoordinator);
                this.client.disconnectAsync(oldCoordinator);
            }
            this.lastTimeOfConnectionMs = this.time.milliseconds();
        } else {
            long durationOfOngoingDisconnect = this.time.milliseconds() - this.lastTimeOfConnectionMs;
            if (durationOfOngoingDisconnect > (long)this.rebalanceConfig.rebalanceTimeoutMs) {
                this.log.warn("Consumer has been disconnected from the group coordinator for {}ms", (Object)durationOfOngoingDisconnect);
            }
        }
    }

    protected synchronized Generation generation() {
        return this.generation;
    }

    protected synchronized Generation generationIfStable() {
        if (this.state != MemberState.STABLE) {
            return null;
        }
        return this.generation;
    }

    protected synchronized boolean rebalanceInProgress() {
        return this.state == MemberState.PREPARING_REBALANCE || this.state == MemberState.COMPLETING_REBALANCE;
    }

    protected synchronized String memberId() {
        return this.generation.memberId;
    }

    private synchronized void resetStateAndGeneration(String reason, boolean shouldResetMemberId) {
        this.log.info("Resetting generation {}due to: {}", (Object)(shouldResetMemberId ? "and member id " : ""), (Object)reason);
        this.state = MemberState.UNJOINED;
        this.generation = shouldResetMemberId ? Generation.NO_GENERATION : new Generation(Generation.NO_GENERATION.generationId, this.generation.memberId, null);
    }

    private synchronized void resetStateAndRejoin(String reason, boolean shouldResetMemberId) {
        this.resetStateAndGeneration(reason, shouldResetMemberId);
        this.requestRejoin(reason);
        this.needsJoinPrepare = true;
    }

    synchronized void resetStateOnResponseError(ApiKeys api, Errors error, boolean shouldResetMemberId) {
        String reason = String.format("encountered %s from %s response", new Object[]{error, api});
        this.resetStateAndRejoin(reason, shouldResetMemberId);
    }

    synchronized void resetGenerationOnLeaveGroup() {
        this.resetStateAndRejoin("consumer pro-actively leaving the group", true);
    }

    public synchronized void requestRejoinIfNecessary(String shortReason, String fullReason) {
        if (!this.rejoinNeeded) {
            this.requestRejoin(shortReason, fullReason);
        }
    }

    public synchronized void requestRejoin(String shortReason) {
        this.requestRejoin(shortReason, shortReason);
    }

    public synchronized void requestRejoin(String shortReason, String fullReason) {
        this.log.info("Request joining group due to: {}", (Object)fullReason);
        this.rejoinReason = shortReason;
        this.rejoinNeeded = true;
    }

    private boolean isProtocolTypeInconsistent(String protocolType) {
        return protocolType != null && !protocolType.equals(this.protocolType());
    }

    @Override
    public final void close() {
        this.close(this.time.timer(0L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void close(Timer timer) {
        try {
            this.closeHeartbeatThread();
        }
        finally {
            AbstractCoordinator abstractCoordinator = this;
            synchronized (abstractCoordinator) {
                Node coordinator;
                if (this.rebalanceConfig.leaveGroupOnClose) {
                    this.onLeavePrepare();
                    this.maybeLeaveGroup("the consumer is being closed");
                }
                if ((coordinator = this.checkAndGetCoordinator()) != null && !this.client.awaitPendingRequests(coordinator, timer)) {
                    this.log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", (Object)this.client.pendingRequestCount(coordinator));
                }
            }
        }
    }

    public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
        RequestFuture<Void> future = null;
        if (this.isDynamicMember() && !this.coordinatorUnknown() && this.state != MemberState.UNJOINED && this.generation.hasMemberId()) {
            this.log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", new Object[]{this.generation.memberId, this.coordinator, leaveReason});
            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(this.rebalanceConfig.groupId, Collections.singletonList(new LeaveGroupRequestData.MemberIdentity().setMemberId(this.generation.memberId).setReason(leaveReason)));
            future = this.client.send(this.coordinator, request).compose(new LeaveGroupResponseHandler(this.generation));
            this.client.pollNoWakeup();
        }
        this.resetGenerationOnLeaveGroup();
        return future;
    }

    protected boolean isDynamicMember() {
        return !this.rebalanceConfig.groupInstanceId.isPresent();
    }

    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        this.log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}", new Object[]{this.generation.generationId, this.generation.memberId, this.coordinator});
        HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId(this.rebalanceConfig.groupId).setMemberId(this.generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setGenerationId(this.generation.generationId));
        return this.client.send(this.coordinator, requestBuilder).compose(new HeartbeatResponseHandler(this.generation));
    }

    protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
        return new Meter(new WindowedCount(), metrics.metricName(baseName + "-rate", groupName, String.format("The number of %s per second", descriptiveName)), metrics.metricName(baseName + "-total", groupName, String.format("The total number of %s", descriptiveName)));
    }

    final Heartbeat heartbeat() {
        return this.heartbeat;
    }

    final String rejoinReason() {
        return this.rejoinReason;
    }

    final synchronized void setLastRebalanceTime(long timestamp) {
        this.lastRebalanceEndMs = timestamp;
    }

    final boolean hasMatchingGenerationId(int generationId) {
        return !this.generation.equals(Generation.NO_GENERATION) && this.generation.generationId == generationId;
    }

    final boolean hasUnknownGeneration() {
        return this.generation.equals(Generation.NO_GENERATION);
    }

    final boolean hasValidMemberId() {
        return !this.hasUnknownGeneration() && this.generation.hasMemberId();
    }

    final synchronized void setNewGeneration(Generation generation) {
        this.generation = generation;
    }

    final synchronized void setNewState(MemberState state) {
        this.state = state;
    }

    private static class UnjoinedGroupException
    extends RetriableException {
        private UnjoinedGroupException() {
        }
    }

    protected static class Generation {
        public static final Generation NO_GENERATION = new Generation(-1, "", null);
        public final int generationId;
        public final String memberId;
        public final String protocolName;

        public Generation(int generationId, String memberId, String protocolName) {
            this.generationId = generationId;
            this.memberId = memberId;
            this.protocolName = protocolName;
        }

        public boolean hasMemberId() {
            return !this.memberId.isEmpty();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Generation that = (Generation)o;
            return this.generationId == that.generationId && Objects.equals(this.memberId, that.memberId) && Objects.equals(this.protocolName, that.protocolName);
        }

        public int hashCode() {
            return Objects.hash(this.generationId, this.memberId, this.protocolName);
        }

        public String toString() {
            return "Generation{generationId=" + this.generationId + ", memberId='" + this.memberId + '\'' + ", protocol='" + this.protocolName + '\'' + '}';
        }
    }

    private class HeartbeatThread
    extends KafkaThread
    implements AutoCloseable {
        private boolean enabled;
        private boolean closed;
        private final AtomicReference<RuntimeException> failed;

        private HeartbeatThread() {
            super(AbstractCoordinator.HEARTBEAT_THREAD_PREFIX + (((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId.isEmpty() ? "" : " | " + ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId), true);
            this.enabled = false;
            this.closed = false;
            this.failed = new AtomicReference<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                AbstractCoordinator.this.log.debug("Enabling heartbeat thread");
                this.enabled = true;
                AbstractCoordinator.this.heartbeat.resetTimeouts();
                AbstractCoordinator.this.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void disable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                AbstractCoordinator.this.log.debug("Disabling heartbeat thread");
                this.enabled = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                this.closed = true;
                AbstractCoordinator.this.notify();
            }
        }

        private boolean hasFailed() {
            return this.failed.get() != null;
        }

        private RuntimeException failureCause() {
            return this.failed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                AbstractCoordinator.this.log.debug("Heartbeat thread started");
                while (true) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    // MONITORENTER : abstractCoordinator
                    if (this.closed) {
                        // MONITOREXIT : abstractCoordinator
                        return;
                    }
                    if (!this.enabled) {
                        AbstractCoordinator.this.wait();
                        // MONITOREXIT : abstractCoordinator
                        continue;
                    }
                    if (AbstractCoordinator.this.state.hasNotJoinedGroup() || this.hasFailed()) {
                        this.disable();
                        // MONITOREXIT : abstractCoordinator
                        continue;
                    }
                    AbstractCoordinator.this.client.pollNoWakeup();
                    long now = AbstractCoordinator.this.time.milliseconds();
                    if (AbstractCoordinator.this.coordinatorUnknown()) {
                        if (AbstractCoordinator.this.findCoordinatorFuture != null) {
                            AbstractCoordinator.this.clearFindCoordinatorFuture();
                            AbstractCoordinator.this.wait(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.retryBackoffMs);
                        } else {
                            AbstractCoordinator.this.lookupCoordinator();
                        }
                    } else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
                        AbstractCoordinator.this.markCoordinatorUnknown("session timed out without receiving a heartbeat response");
                    } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
                        AbstractCoordinator.this.log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
                        AbstractCoordinator.this.maybeLeaveGroup("consumer poll timeout has expired.");
                    } else if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
                        AbstractCoordinator.this.wait(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.retryBackoffMs);
                    } else {
                        AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
                        RequestFuture<Void> heartbeatFuture = AbstractCoordinator.this.sendHeartbeatRequest();
                        heartbeatFuture.addListener(new RequestFutureListener<Void>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void onSuccess(Void value) {
                                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                synchronized (abstractCoordinator) {
                                    AbstractCoordinator.this.heartbeat.receiveHeartbeat();
                                }
                            }

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void onFailure(RuntimeException e) {
                                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                synchronized (abstractCoordinator) {
                                    if (e instanceof RebalanceInProgressException) {
                                        AbstractCoordinator.this.heartbeat.receiveHeartbeat();
                                    } else if (e instanceof FencedInstanceIdException) {
                                        AbstractCoordinator.this.log.error("Caught fenced group.instance.id {} error in heartbeat thread", ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupInstanceId);
                                        AbstractCoordinator.this.heartbeatThread.failed.set(e);
                                    } else {
                                        AbstractCoordinator.this.heartbeat.failHeartbeat();
                                        AbstractCoordinator.this.notify();
                                    }
                                }
                            }
                        });
                    }
                    // MONITOREXIT : abstractCoordinator
                    continue;
                    break;
                }
            }
            catch (AuthenticationException e) {
                AbstractCoordinator.this.log.error("An authentication error occurred in the heartbeat thread", (Throwable)e);
                this.failed.set(e);
                return;
            }
            catch (GroupAuthorizationException e) {
                AbstractCoordinator.this.log.error("A group authorization error occurred in the heartbeat thread", (Throwable)e);
                this.failed.set(e);
                return;
            }
            catch (InterruptException | InterruptedException e) {
                Thread.interrupted();
                AbstractCoordinator.this.log.error("Unexpected interrupt received in heartbeat thread", (Throwable)e);
                this.failed.set(new RuntimeException(e));
                return;
            }
            catch (Throwable e) {
                AbstractCoordinator.this.log.error("Heartbeat thread failed due to unexpected error", e);
                if (e instanceof RuntimeException) {
                    this.failed.set((RuntimeException)e);
                    return;
                }
                this.failed.set(new RuntimeException(e));
                return;
            }
            finally {
                AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
            }
        }
    }

    private class GroupCoordinatorMetrics {
        public final String metricGrpName;
        public final Sensor heartbeatSensor;
        public final Sensor joinSensor;
        public final Sensor syncSensor;
        public final Sensor successfulRebalanceSensor;
        public final Sensor failedRebalanceSensor;

        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.heartbeatSensor = metrics.sensor("heartbeat-latency");
            this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a heartbeat request"), new Max());
            this.heartbeatSensor.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "heartbeat", "heartbeats"));
            this.joinSensor = metrics.sensor("join-latency");
            this.joinSensor.add(metrics.metricName("join-time-avg", this.metricGrpName, "The average time taken for a group rejoin"), new Avg());
            this.joinSensor.add(metrics.metricName("join-time-max", this.metricGrpName, "The max time taken for a group rejoin"), new Max());
            this.joinSensor.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "join", "group joins"));
            this.syncSensor = metrics.sensor("sync-latency");
            this.syncSensor.add(metrics.metricName("sync-time-avg", this.metricGrpName, "The average time taken for a group sync"), new Avg());
            this.syncSensor.add(metrics.metricName("sync-time-max", this.metricGrpName, "The max time taken for a group sync"), new Max());
            this.syncSensor.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "sync", "group syncs"));
            this.successfulRebalanceSensor = metrics.sensor("rebalance-latency");
            this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg", this.metricGrpName, "The average time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded"), new Avg());
            this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max", this.metricGrpName, "The max time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded"), new Max());
            this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-total", this.metricGrpName, "The total number of milliseconds this consumer has spent in successful rebalances since creation"), new CumulativeSum());
            this.successfulRebalanceSensor.add(metrics.metricName("rebalance-total", this.metricGrpName, "The total number of successful rebalance events, each event is composed of several failed re-trials until it succeeded"), new CumulativeCount());
            this.successfulRebalanceSensor.add(metrics.metricName("rebalance-rate-per-hour", this.metricGrpName, "The number of successful rebalance events per hour, each event is composed of several failed re-trials until it succeeded"), new Rate(TimeUnit.HOURS, new WindowedCount()));
            this.failedRebalanceSensor = metrics.sensor("failed-rebalance");
            this.failedRebalanceSensor.add(metrics.metricName("failed-rebalance-total", this.metricGrpName, "The total number of failed rebalance events"), new CumulativeCount());
            this.failedRebalanceSensor.add(metrics.metricName("failed-rebalance-rate-per-hour", this.metricGrpName, "The number of failed rebalance events per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()));
            Measurable lastRebalance = (config, now) -> {
                if (AbstractCoordinator.this.lastRebalanceEndMs == -1L) {
                    return -1.0;
                }
                return TimeUnit.SECONDS.convert(now - AbstractCoordinator.this.lastRebalanceEndMs, TimeUnit.MILLISECONDS);
            };
            metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago", this.metricGrpName, "The number of seconds since the last successful rebalance event"), lastRebalance);
            Measurable lastHeartbeat = (config, now) -> {
                if (AbstractCoordinator.this.heartbeat.lastHeartbeatSend() == 0L) {
                    return -1.0;
                }
                return TimeUnit.SECONDS.convert(now - AbstractCoordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
            };
            metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last coordinator heartbeat was sent"), lastHeartbeat);
        }
    }

    protected abstract class CoordinatorResponseHandler<R, T>
    extends RequestFutureAdapter<ClientResponse, T> {
        final Generation sentGeneration;
        ClientResponse response;

        CoordinatorResponseHandler(Generation generation) {
            this.sentGeneration = generation;
        }

        public abstract void handle(R var1, RequestFuture<T> var2);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            if (e instanceof DisconnectException) {
                AbstractCoordinator.this.markCoordinatorUnknown(true, e.getMessage());
            }
            future.raise(e);
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            block2: {
                try {
                    this.response = clientResponse;
                    AbstractResponse responseObj = clientResponse.responseBody();
                    this.handle(responseObj, future);
                }
                catch (RuntimeException e) {
                    if (future.isDone()) break block2;
                    future.raise(e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean generationUnchanged() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                return AbstractCoordinator.this.generation.equals(this.sentGeneration);
            }
        }
    }

    private class HeartbeatResponseHandler
    extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        private HeartbeatResponseHandler(Generation generation) {
            super(generation);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            ((AbstractCoordinator)AbstractCoordinator.this).sensors.heartbeatSensor.record(this.response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid", (Object)AbstractCoordinator.this.coordinator());
                AbstractCoordinator.this.markCoordinatorUnknown(error);
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    if (AbstractCoordinator.this.state == MemberState.STABLE) {
                        AbstractCoordinator.this.requestRejoin("group is already rebalancing");
                        future.raise(error);
                    } else {
                        AbstractCoordinator.this.log.debug("Ignoring heartbeat response with error {} during {} state", (Object)error, (Object)AbstractCoordinator.this.state);
                        future.complete(null);
                    }
                }
            } else if (error == Errors.ILLEGAL_GENERATION || error == Errors.UNKNOWN_MEMBER_ID || error == Errors.FENCED_INSTANCE_ID) {
                if (this.generationUnchanged()) {
                    AbstractCoordinator.this.log.info("Attempt to heartbeat with {} and group instance id {} failed due to {}, resetting generation", new Object[]{this.sentGeneration, ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupInstanceId, error});
                    AbstractCoordinator.this.resetStateOnResponseError(ApiKeys.HEARTBEAT, error, error != Errors.ILLEGAL_GENERATION);
                    future.raise(error);
                } else {
                    AbstractCoordinator.this.log.info("Attempt to heartbeat with stale {} and group instance id {} failed due to {}, ignoring the error", new Object[]{this.sentGeneration, ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupInstanceId, error});
                    future.complete(null);
                }
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(GroupAuthorizationException.forGroupId(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    private class LeaveGroupResponseHandler
    extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
        private LeaveGroupResponseHandler(Generation generation) {
            super(generation);
        }

        @Override
        public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
            Errors error;
            List<LeaveGroupResponseData.MemberResponse> members = leaveResponse.memberResponses();
            if (members.size() > 1) {
                future.raise(new IllegalStateException("The expected leave group response should only contain no more than one member info, however get " + members));
            }
            if ((error = leaveResponse.error()) == Errors.NONE) {
                AbstractCoordinator.this.log.debug("LeaveGroup response with {} returned successfully: {}", (Object)this.sentGeneration, (Object)this.response);
                future.complete(null);
            } else {
                AbstractCoordinator.this.log.error("LeaveGroup request with {} failed with error: {}", (Object)this.sentGeneration, (Object)error.message());
                future.raise(error);
            }
        }
    }

    private class FindCoordinatorResponseHandler
    extends RequestFutureAdapter<ClientResponse, Void> {
        private FindCoordinatorResponseHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
            FindCoordinatorResponseData.Coordinator coordinatorData;
            Errors error;
            AbstractCoordinator.this.log.debug("Received FindCoordinator response {}", (Object)resp);
            List<FindCoordinatorResponseData.Coordinator> coordinators = ((FindCoordinatorResponse)resp.responseBody()).coordinators();
            if (coordinators.size() != 1) {
                AbstractCoordinator.this.log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator");
                future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
            }
            if ((error = Errors.forCode((coordinatorData = coordinators.get(0)).errorCode())) == Errors.NONE) {
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId();
                    AbstractCoordinator.this.coordinator = new Node(coordinatorConnectionId, coordinatorData.host(), coordinatorData.port());
                    AbstractCoordinator.this.log.info("Discovered group coordinator {}", (Object)AbstractCoordinator.this.coordinator);
                    AbstractCoordinator.this.client.tryConnect(AbstractCoordinator.this.coordinator);
                    AbstractCoordinator.this.heartbeat.resetSessionTimeout();
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(GroupAuthorizationException.forGroupId(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId));
            } else {
                AbstractCoordinator.this.log.debug("Group coordinator lookup failed: {}", (Object)coordinatorData.errorMessage());
                future.raise(error);
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
            AbstractCoordinator.this.log.debug("FindCoordinator request failed due to {}", (Object)e.toString());
            if (!(e instanceof RetriableException)) {
                AbstractCoordinator.this.fatalFindCoordinatorException = e;
            }
            super.onFailure(e, future);
        }
    }

    private class SyncGroupResponseHandler
    extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        private SyncGroupResponseHandler(Generation generation) {
            super(generation);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
            Errors error = syncResponse.error();
            if (error == Errors.NONE) {
                if (AbstractCoordinator.this.isProtocolTypeInconsistent(syncResponse.data().protocolType())) {
                    AbstractCoordinator.this.log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}", (Object)syncResponse.data().protocolType(), (Object)AbstractCoordinator.this.protocolType());
                    future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                } else {
                    AbstractCoordinator.this.log.debug("Received successful SyncGroup response: {}", (Object)syncResponse);
                    ((AbstractCoordinator)AbstractCoordinator.this).sensors.syncSensor.record(this.response.requestLatencyMs());
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        if (!AbstractCoordinator.this.hasGenerationReset(AbstractCoordinator.this.generation) && AbstractCoordinator.this.state == MemberState.COMPLETING_REBALANCE) {
                            boolean protocolNameInconsistent;
                            String protocolName = syncResponse.data().protocolName();
                            boolean bl = protocolNameInconsistent = protocolName != null && !protocolName.equals(((AbstractCoordinator)AbstractCoordinator.this).generation.protocolName);
                            if (protocolNameInconsistent) {
                                AbstractCoordinator.this.log.error("SyncGroup failed due to inconsistent Protocol Name, received {} but expected {}", (Object)protocolName, (Object)((AbstractCoordinator)AbstractCoordinator.this).generation.protocolName);
                                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                            } else {
                                AbstractCoordinator.this.log.info("Successfully synced group in generation {}", (Object)AbstractCoordinator.this.generation);
                                AbstractCoordinator.this.state = MemberState.STABLE;
                                AbstractCoordinator.this.rejoinReason = "";
                                AbstractCoordinator.this.rejoinNeeded = false;
                                AbstractCoordinator.this.lastRebalanceEndMs = AbstractCoordinator.this.time.milliseconds();
                                ((AbstractCoordinator)AbstractCoordinator.this).sensors.successfulRebalanceSensor.record(AbstractCoordinator.this.lastRebalanceEndMs - AbstractCoordinator.this.lastRebalanceStartMs);
                                AbstractCoordinator.this.lastRebalanceStartMs = -1L;
                                future.complete(ByteBuffer.wrap(syncResponse.data().assignment()));
                            }
                        } else {
                            AbstractCoordinator.this.log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before receiving SyncGroup response, marking this rebalance as failed and retry", (Object)AbstractCoordinator.this.generation, (Object)AbstractCoordinator.this.state);
                            future.raise(Errors.ILLEGAL_GENERATION);
                        }
                    }
                }
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(GroupAuthorizationException.forGroupId(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId));
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator.this.log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was {}", (Object)this.sentGeneration);
                future.raise(error);
            } else if (error == Errors.FENCED_INSTANCE_ID) {
                AbstractCoordinator.this.log.error("SyncGroup failed: The group instance id {} has been fenced by another instance. Sent generation was {}", ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupInstanceId, (Object)this.sentGeneration);
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.log.info("SyncGroup failed: {} Need to re-join the group. Sent generation was {}", (Object)error.message(), (Object)this.sentGeneration);
                if (this.generationUnchanged()) {
                    AbstractCoordinator.this.resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, error != Errors.ILLEGAL_GENERATION);
                }
                future.raise(error);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", (Object)error.message(), (Object)this.sentGeneration);
                AbstractCoordinator.this.markCoordinatorUnknown(error);
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
            }
        }
    }

    private class JoinGroupResponseHandler
    extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        private JoinGroupResponseHandler(Generation generation) {
            super(generation);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = joinResponse.error();
            if (error == Errors.NONE) {
                if (AbstractCoordinator.this.isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
                    AbstractCoordinator.this.log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}", (Object)joinResponse.data().protocolType(), (Object)AbstractCoordinator.this.protocolType());
                    future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                } else {
                    AbstractCoordinator.this.log.debug("Received successful JoinGroup response: {}", (Object)joinResponse);
                    ((AbstractCoordinator)AbstractCoordinator.this).sensors.joinSensor.record(this.response.requestLatencyMs());
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        if (AbstractCoordinator.this.state != MemberState.PREPARING_REBALANCE) {
                            future.raise(new UnjoinedGroupException());
                        } else {
                            AbstractCoordinator.this.state = MemberState.COMPLETING_REBALANCE;
                            if (AbstractCoordinator.this.heartbeatThread != null) {
                                AbstractCoordinator.this.heartbeatThread.enable();
                            }
                            AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
                            AbstractCoordinator.this.log.info("Successfully joined group with generation {}", (Object)AbstractCoordinator.this.generation);
                            if (joinResponse.isLeader()) {
                                AbstractCoordinator.this.onLeaderElected(joinResponse).chain(future);
                            } else {
                                AbstractCoordinator.this.onJoinFollower().chain(future);
                            }
                        }
                    }
                }
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                AbstractCoordinator.this.log.info("JoinGroup failed: Coordinator {} is loading the group.", (Object)AbstractCoordinator.this.coordinator());
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}", (Object)error.message(), (Object)this.sentGeneration);
                if (this.generationUnchanged()) {
                    AbstractCoordinator.this.resetStateOnResponseError(ApiKeys.JOIN_GROUP, error, true);
                }
                future.raise(error);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.markCoordinatorUnknown(error);
                AbstractCoordinator.this.log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}", (Object)error.message(), (Object)this.sentGeneration);
                future.raise(error);
            } else if (error == Errors.FENCED_INSTANCE_ID) {
                AbstractCoordinator.this.log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. Sent generation was {}", ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupInstanceId, (Object)this.sentGeneration);
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID || error == Errors.GROUP_AUTHORIZATION_FAILED || error == Errors.GROUP_MAX_SIZE_REACHED) {
                AbstractCoordinator.this.log.error("JoinGroup failed due to fatal error: {}", (Object)error.message());
                if (error == Errors.GROUP_MAX_SIZE_REACHED) {
                    future.raise(new GroupMaxSizeReachedException("Consumer group " + ((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId + " already has the configured maximum number of members."));
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(GroupAuthorizationException.forGroupId(((AbstractCoordinator)AbstractCoordinator.this).rebalanceConfig.groupId));
                } else {
                    future.raise(error);
                }
            } else if (error == Errors.UNSUPPORTED_VERSION) {
                AbstractCoordinator.this.log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id and retry to see if the problem resolves");
                future.raise(error);
            } else if (error == Errors.MEMBER_ID_REQUIRED) {
                String memberId = joinResponse.data().memberId();
                AbstractCoordinator.this.log.debug("JoinGroup failed due to non-fatal error: {}. Will set the member id as {} and then rejoin. Sent generation was {}", new Object[]{error, memberId, this.sentGeneration});
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    AbstractCoordinator.this.generation = new Generation(-1, memberId, null);
                }
                AbstractCoordinator.this.requestRejoin("need to re-join with the given member-id: " + memberId);
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator.this.log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, which could indicate a replication timeout on the broker. Will retry.");
                future.raise(error);
            } else {
                AbstractCoordinator.this.log.error("JoinGroup failed due to unexpected error: {}", (Object)error.message());
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }

    protected static enum MemberState {
        UNJOINED,
        PREPARING_REBALANCE,
        COMPLETING_REBALANCE,
        STABLE;


        public boolean hasNotJoinedGroup() {
            return this.equals((Object)UNJOINED) || this.equals((Object)PREPARING_REBALANCE);
        }
    }
}

