/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ClientIdAndBroker;
import kafka.common.KafkaException;
import kafka.server.AbstractFetcherThread$;
import kafka.server.FetcherLagStats;
import kafka.server.FetcherStats;
import kafka.server.PartitionFetchState;
import kafka.utils.CoreUtils$;
import kafka.utils.DelayedItem;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.EpochEndOffset;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\rg!B\u0001\u0003\u0003\u00039!!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0006\u0003\u0007\u0011\taa]3sm\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\tQ!\u001e;jYNL!!\u0004\u0006\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u0005\n\u001f\u0001\u0011\t\u0011)A\u0005!u\tAA\\1nKB\u0011\u0011C\u0007\b\u0003%a\u0001\"a\u0005\f\u000e\u0003QQ!!\u0006\u0004\u0002\rq\u0012xn\u001c;?\u0015\u00059\u0012!B:dC2\f\u0017BA\r\u0017\u0003\u0019\u0001&/\u001a3fM&\u00111\u0004\b\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005e1\u0012BA\b\r\u0011!y\u0002A!A!\u0002\u0013\u0001\u0012\u0001C2mS\u0016tG/\u00133\t\u0011\u0005\u0002!Q1A\u0005\u0002\t\nAb]8ve\u000e,'I]8lKJ,\u0012a\t\t\u0003I\u001dj\u0011!\n\u0006\u0003M\u0011\tqa\u00197vgR,'/\u0003\u0002)K\tq!I]8lKJ,e\u000e\u001a)pS:$\b\u0002\u0003\u0016\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\u0002\u001bM|WO]2f\u0005J|7.\u001a:!\u0011!a\u0003A!A!\u0002\u0013i\u0013A\u00044fi\u000eD')Y2l\u001f\u001a4Wj\u001d\t\u0003]=j\u0011AF\u0005\u0003aY\u00111!\u00138u\u0011%\u0011\u0004A!A!\u0002\u0013\u0019d'A\bjg&sG/\u001a:skB$\u0018N\u00197f!\tqC'\u0003\u00026-\t9!i\\8mK\u0006t\u0017B\u0001\u001a\r\u0011!A\u0004A!A!\u0002\u0013\u0019\u0014\u0001F5oG2,H-\u001a'pOR\u0013XO\\2bi&|g\u000eC\u0003;\u0001\u0011\u00051(\u0001\u0004=S:LGO\u0010\u000b\byyz\u0004)\u0011\"D!\ti\u0004!D\u0001\u0003\u0011\u0015y\u0011\b1\u0001\u0011\u0011\u0015y\u0012\b1\u0001\u0011\u0011\u0015\t\u0013\b1\u0001$\u0011\u001da\u0013\b%AA\u00025BqAM\u001d\u0011\u0002\u0003\u00071\u0007C\u00039s\u0001\u00071\u0007B\u0003F\u0001\t\u0005aIA\u0002S\u000bF\u000b\"a\u0012&\u0011\u00059B\u0015BA%\u0017\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"a\u0013,\u000f\u0005ubu!B'\u0003\u0011\u0003q\u0015!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\t\u0003{=3Q!\u0001\u0002\t\u0002A\u001b\"aT)\u0011\u00059\u0012\u0016BA*\u0017\u0005\u0019\te.\u001f*fM\")!h\u0014C\u0001+R\taJB\u0004X\u001fB\u0005\u0019\u0013\u0001-\u0003\u0019\u0019+Go\u00195SKF,Xm\u001d;\u0014\u0005Y\u000b\u0006\"\u0002.W\r\u0003Y\u0016aB5t\u000b6\u0004H/_\u000b\u0002g!)QL\u0016D\u0001=\u00061qN\u001a4tKR$\"a\u00182\u0011\u00059\u0002\u0017BA1\u0017\u0005\u0011auN\\4\t\u000b\rd\u0006\u0019\u00013\u0002\u001dQ|\u0007/[2QCJ$\u0018\u000e^5p]B\u0011Q-\\\u0007\u0002M*\u0011q\r[\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015I'B\u00016l\u0003\u0019\t\u0007/Y2iK*\tA.A\u0002pe\u001eL!A\u001c4\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u001a9\u0001o\u0014I\u0001$\u0003\t(!\u0004)beRLG/[8o\t\u0006$\u0018m\u0005\u0002p#\")1o\u001cD\u0001i\u0006)QM\u001d:peV\tQ\u000f\u0005\u0002ws6\tqO\u0003\u0002yM\u0006A\u0001O]8u_\u000e|G.\u0003\u0002{o\n1QI\u001d:peNDQ\u0001`8\u0007\u0002u\f\u0011\"\u001a=dKB$\u0018n\u001c8\u0016\u0003y\u0004BAL@\u0002\u0004%\u0019\u0011\u0011\u0001\f\u0003\r=\u0003H/[8o!\u0011\t)!a\u0004\u000f\t\u0005\u001d\u00111\u0002\b\u0004'\u0005%\u0011\"A\f\n\u0007\u00055a#A\u0004qC\u000e\\\u0017mZ3\n\t\u0005E\u00111\u0003\u0002\n)\"\u0014xn^1cY\u0016T1!!\u0004\u0017\u0011\u001d\t9b\u001cD\u0001\u00033\t\u0011\u0002^8SK\u000e|'\u000fZ:\u0016\u0005\u0005m\u0001\u0003BA\u000f\u0003Gi!!a\b\u000b\u0007\u0005\u0005b-\u0001\u0004sK\u000e|'\u000fZ\u0005\u0005\u0003K\tyBA\u0007NK6|'/\u001f*fG>\u0014Hm\u001d\u0005\b\u0003Syg\u0011AA\u0016\u00035A\u0017n\u001a5XCR,'/\\1sWV\tq\fC\u0005\u00020=\u000b\n\u0011\"\u0001\u00022\u0005YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIQ*\"!a\r+\u00075\n)d\u000b\u0002\u00028A!\u0011\u0011HA\"\u001b\t\tYD\u0003\u0003\u0002>\u0005}\u0012!C;oG\",7m[3e\u0015\r\t\tEF\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA#\u0003w\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011%\tIeTI\u0001\n\u0003\tY%A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H%N\u000b\u0003\u0003\u001bR3aMA\u001b\t\u001d\t\t\u0006\u0001B\u0001\u0003'\u0012!\u0001\u0015#\u0012\u0007\u001d\u000b)\u0006\u0005\u0002L_\"Q\u0011\u0011\f\u0001C\u0002\u0013\u0005!!a\u0017\u0002\u001fA\f'\u000f^5uS>t7\u000b^1uKN,\"!!\u0018\u0011\r\u0005}\u0013QMA5\u001b\t\t\tGC\u0002\u0002d\u0019\f\u0011\"\u001b8uKJt\u0017\r\\:\n\t\u0005\u001d\u0014\u0011\r\u0002\u0010!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;fgB\u0019Q(a\u001b\n\u0007\u00055$AA\nQCJ$\u0018\u000e^5p]\u001a+Go\u00195Ti\u0006$X\r\u0003\u0005\u0002r\u0001\u0001\u000b\u0011BA/\u0003A\u0001\u0018M\u001d;ji&|gn\u0015;bi\u0016\u001c\b\u0005C\u0005\u0002v\u0001\u0011\r\u0011\"\u0003\u0002x\u0005\u0001\u0002/\u0019:uSRLwN\\'ba2{7m[\u000b\u0003\u0003s\u0002B!a\u001f\u0002\u000e6\u0011\u0011Q\u0010\u0006\u0005\u0003\u007f\n\t)A\u0003m_\u000e\\7O\u0003\u0003\u0002\u0004\u0006\u0015\u0015AC2p]\u000e,(O]3oi*!\u0011qQAE\u0003\u0011)H/\u001b7\u000b\u0005\u0005-\u0015\u0001\u00026bm\u0006LA!a$\u0002~\ti!+Z3oiJ\fg\u000e\u001e'pG.D\u0001\"a%\u0001A\u0003%\u0011\u0011P\u0001\u0012a\u0006\u0014H/\u001b;j_:l\u0015\r\u001d'pG.\u0004\u0003\"CAL\u0001\t\u0007I\u0011BAM\u0003A\u0001\u0018M\u001d;ji&|g.T1q\u0007>tG-\u0006\u0002\u0002\u001cB!\u00111PAO\u0013\u0011\ty*! \u0003\u0013\r{g\u000eZ5uS>t\u0007\u0002CAR\u0001\u0001\u0006I!a'\u0002#A\f'\u000f^5uS>tW*\u00199D_:$\u0007\u0005C\u0005\u0002(\u0002\u0011\r\u0011\"\u0003\u0002*\u0006AQ.\u001a;sS\u000eLE-\u0006\u0002\u0002,B!\u0011QVAY\u001b\t\tyK\u0003\u0002h\t%!\u00111WAX\u0005E\u0019E.[3oi&#\u0017I\u001c3Ce>\\WM\u001d\u0005\t\u0003o\u0003\u0001\u0015!\u0003\u0002,\u0006IQ.\u001a;sS\u000eLE\r\t\u0005\n\u0003w\u0003!\u0019!C\u0001\u0003{\u000bABZ3uG\",'o\u0015;biN,\"!a0\u0011\u0007u\n\t-C\u0002\u0002D\n\u0011ABR3uG\",'o\u0015;biND\u0001\"a2\u0001A\u0003%\u0011qX\u0001\u000eM\u0016$8\r[3s'R\fGo\u001d\u0011\t\u0013\u0005-\u0007A1A\u0005\u0002\u00055\u0017a\u00044fi\u000eDWM\u001d'bON#\u0018\r^:\u0016\u0005\u0005=\u0007cA\u001f\u0002R&\u0019\u00111\u001b\u0002\u0003\u001f\u0019+Go\u00195fe2\u000bwm\u0015;biND\u0001\"a6\u0001A\u0003%\u0011qZ\u0001\u0011M\u0016$8\r[3s\u0019\u0006<7\u000b^1ug\u0002Bq!a7\u0001\r#\ti.\u0001\u000bqe>\u001cWm]:QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u000b\t\u0003?\f)/a:\u0002lB\u0019a&!9\n\u0007\u0005\rhC\u0001\u0003V]&$\bBB2\u0002Z\u0002\u0007A\rC\u0004\u0002j\u0006e\u0007\u0019A0\u0002\u0017\u0019,Go\u00195PM\u001a\u001cX\r\u001e\u0005\t\u0003[\fI\u000e1\u0001\u0002p\u0006i\u0001/\u0019:uSRLwN\u001c#bi\u0006\u0004B!!=\u0002P5\t\u0001\u0001C\u0004\u0002v\u00021\t\"a>\u0002-!\fg\u000e\u001a7f\u001f\u001a47/\u001a;PkR|eMU1oO\u0016$2aXA}\u0011\u0019\u0019\u00171\u001fa\u0001I\"9\u0011Q \u0001\u0007\u0012\u0005}\u0018A\u00075b]\u0012dW\rU1si&$\u0018n\u001c8t/&$\b.\u0012:s_J\u001cH\u0003BAp\u0005\u0003A\u0001Ba\u0001\u0002|\u0002\u0007!QA\u0001\u000ba\u0006\u0014H/\u001b;j_:\u001c\b#BA\u0003\u0005\u000f!\u0017\u0002\u0002B\u0005\u0003'\u0011\u0001\"\u0013;fe\u0006\u0014G.\u001a\u0005\b\u0005\u001b\u0001a\u0011\u0003B\b\u0003]\u0011W/\u001b7e\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH\u000f\u0006\u0003\u0003\u0012\tu\u0001C\u0002B\n\u00053!W&\u0004\u0002\u0003\u0016)\u0019!q\u0003\f\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0003\u001c\tU!aA'ba\"A!q\u0004B\u0006\u0001\u0004\u0011\t#A\u0007bY2\u0004\u0016M\u001d;ji&|gn\u001d\t\u0007\u0003\u000b\u0011\u0019Ca\n\n\t\t\u0015\u00121\u0003\u0002\u0004'\u0016\f\bC\u0002\u0018\u0003*\u0011\fI'C\u0002\u0003,Y\u0011a\u0001V;qY\u0016\u0014\u0004b\u0002B\u0018\u0001\u0019E!\u0011G\u0001\u0016M\u0016$8\r[#q_\u000eD7O\u0012:p[2+\u0017\rZ3s)\u0011\u0011\u0019D!\u0011\u0011\u000f\tM!\u0011\u00043\u00036A!!q\u0007B\u001f\u001b\t\u0011IDC\u0002\u0003<\u0019\f\u0001B]3rk\u0016\u001cHo]\u0005\u0005\u0005\u007f\u0011ID\u0001\bFa>\u001c\u0007.\u00128e\u001f\u001a47/\u001a;\t\u0011\t\r!Q\u0006a\u0001\u0005#AqA!\u0012\u0001\r#\u00119%A\u0007nCf\u0014W\r\u0016:v]\u000e\fG/\u001a\u000b\u0005\u0005\u0013\u0012Y\u0005\u0005\u0004\u0003\u0014\teAm\u0018\u0005\t\u0005\u001b\u0012\u0019\u00051\u0001\u00034\u0005ia-\u001a;dQ\u0016$W\t]8dQNDqA!\u0015\u0001\r#\u0011\u0019&A\tck&dGMR3uG\"\u0014V-];fgR$BA!\u0016\u0003XA\u0019\u0011\u0011\u001f#\t\u0011\te#q\na\u0001\u0005C\tA\u0002]1si&$\u0018n\u001c8NCBDqA!\u0018\u0001\r#\u0011y&A\u0003gKR\u001c\u0007\u000e\u0006\u0003\u0003b\t\u0015\u0004CBA\u0003\u0005G\u0011\u0019\u0007\u0005\u0004/\u0005S!\u0017q\u001e\u0005\t\u0005O\u0012Y\u00061\u0001\u0003V\u0005aa-\u001a;dQJ+\u0017/^3ti\"9!1\u000e\u0001\u0005B\t5\u0014\u0001C:ikR$wn\u001e8\u0015\u0005\u0005}\u0007b\u0002B9\u0001\u0011%!1O\u0001\u0007gR\fG/Z:\u0015\u0005\tU\u0004C\u0002B<\u0005{\u00129#\u0004\u0002\u0003z)!!1\u0010B\u000b\u0003\u001diW\u000f^1cY\u0016LAAa \u0003z\t1!)\u001e4gKJDqAa!\u0001\t\u0003\u0012i'\u0001\u0004e_^{'o\u001b\u0005\b\u0005\u000b\u0002A\u0011\u0001B7\u0011\u001d\u0011I\t\u0001C\u0005\u0005\u0017\u000b1\u0003\u001d:pG\u0016\u001c8OR3uG\"\u0014V-];fgR$B!a8\u0003\u000e\"A!q\rBD\u0001\u0004\u0011)\u0006C\u0004\u0003\u0012\u0002!\tAa%\u0002\u001b\u0005$G\rU1si&$\u0018n\u001c8t)\u0011\tyN!&\t\u0011\t]%q\u0012a\u0001\u0005\u0013\n1\u0003]1si&$\u0018n\u001c8B]\u0012|eMZ:fiNDqAa'\u0001\t\u0013\u0011i*\u0001\fnCJ\\GK];oG\u0006$\u0018n\u001c8D_6\u0004H.\u001a;f)\u0011\tyNa(\t\u0011\t\r!\u0011\u0014a\u0001\u0005\u0013BqAa)\u0001\t\u0003\u0011)+A\beK2\f\u0017\u0010U1si&$\u0018n\u001c8t)\u0019\tyNa*\u0003*\"A!1\u0001BQ\u0001\u0004\u0011)\u0001C\u0004\u0003,\n\u0005\u0006\u0019A0\u0002\u000b\u0011,G.Y=\t\u000f\t=\u0006\u0001\"\u0001\u00032\u0006\u0001\"/Z7pm\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0005\u0003?\u0014\u0019\f\u0003\u0005\u00036\n5\u0006\u0019\u0001B\\\u0003=!x\u000e]5d!\u0006\u0014H/\u001b;j_:\u001c\b#\u0002B\n\u0005s#\u0017\u0002\u0002B^\u0005+\u00111aU3u\u0011\u001d\u0011y\f\u0001C\u0001\u0005\u0003\fa\u0002]1si&$\u0018n\u001c8D_VtG\u000fF\u0001.\u0001")
public abstract class AbstractFetcherThread
extends ShutdownableThread {
    private final BrokerEndPoint sourceBroker;
    private final int fetchBackOffMs;
    private final boolean includeLogTruncation;
    private final PartitionStates<PartitionFetchState> partitionStates;
    private final ReentrantLock partitionMapLock;
    private final Condition partitionMapCond;
    private final ClientIdAndBroker metricId;
    private final FetcherStats fetcherStats;
    private final FetcherLagStats fetcherLagStats;

    public static boolean $lessinit$greater$default$5() {
        return AbstractFetcherThread$.MODULE$.$lessinit$greater$default$5();
    }

    public static int $lessinit$greater$default$4() {
        return AbstractFetcherThread$.MODULE$.$lessinit$greater$default$4();
    }

    public BrokerEndPoint sourceBroker() {
        return this.sourceBroker;
    }

    public PartitionStates<PartitionFetchState> partitionStates() {
        return this.partitionStates;
    }

    private ReentrantLock partitionMapLock() {
        return this.partitionMapLock;
    }

    private Condition partitionMapCond() {
        return this.partitionMapCond;
    }

    private ClientIdAndBroker metricId() {
        return this.metricId;
    }

    public FetcherStats fetcherStats() {
        return this.fetcherStats;
    }

    public FetcherLagStats fetcherLagStats() {
        return this.fetcherLagStats;
    }

    public abstract void processPartitionData(TopicPartition var1, long var2, PartitionData var4);

    public abstract long handleOffsetOutOfRange(TopicPartition var1);

    public abstract void handlePartitionsWithErrors(Iterable<TopicPartition> var1);

    public abstract Map<TopicPartition, Object> buildLeaderEpochRequest(Seq<Tuple2<TopicPartition, PartitionFetchState>> var1);

    public abstract Map<TopicPartition, EpochEndOffset> fetchEpochsFromLeader(Map<TopicPartition, Object> var1);

    public abstract Map<TopicPartition, Object> maybeTruncate(Map<TopicPartition, EpochEndOffset> var1);

    public abstract FetchRequest buildFetchRequest(Seq<Tuple2<TopicPartition, PartitionFetchState>> var1);

    public abstract Seq<Tuple2<TopicPartition, PartitionData>> fetch(FetchRequest var1);

    @Override
    public void shutdown() {
        this.initiateShutdown();
        CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.partitionMapCond().signalAll());
        this.awaitShutdown();
        this.fetcherStats().unregister();
        this.fetcherLagStats().unregister();
    }

    private Buffer<Tuple2<TopicPartition, PartitionFetchState>> states() {
        return (Buffer)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.partitionStates().partitionStates()).asScala()).map((Function1 & Serializable & scala.Serializable)state -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)state.topicPartition()), state.value()), Buffer$.MODULE$.canBuildFrom());
    }

    @Override
    public void doWork() {
        block0: {
            this.maybeTruncate();
            FetchRequest fetchRequest = (FetchRequest)CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (Function0 & Serializable & scala.Serializable)() -> {
                void var1_1;
                Object object;
                FetchRequest fetchRequest = this.buildFetchRequest((Seq<Tuple2<TopicPartition, PartitionFetchState>>)this.states());
                if (fetchRequest.isEmpty()) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("There are no active partitions. Back off for %d ms before sending a fetch request")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.fetchBackOffMs)})));
                    object = BoxesRunTime.boxToBoolean((boolean)this.partitionMapCond().await($this.fetchBackOffMs, TimeUnit.MILLISECONDS));
                } else {
                    object = BoxedUnit.UNIT;
                }
                return var1_1;
            });
            if (fetchRequest.isEmpty()) break block0;
            this.processFetchRequest(fetchRequest);
        }
    }

    public void maybeTruncate() {
        block0: {
            Map epochRequests = (Map)CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (Function0 & Serializable & scala.Serializable)() -> this.buildLeaderEpochRequest((Seq<Tuple2<TopicPartition, PartitionFetchState>>)this.states()));
            if (epochRequests.isEmpty()) break block0;
            Map<TopicPartition, EpochEndOffset> fetchedEpochs = this.fetchEpochsFromLeader((Map<TopicPartition, Object>)epochRequests);
            CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                Map leaderEpochs = (Map)fetchedEpochs.filter((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)AbstractFetcherThread.$anonfun$maybeTruncate$3(this, x0$1)));
                Map<TopicPartition, Object> truncationPoints = this.maybeTruncate((Map<TopicPartition, EpochEndOffset>)leaderEpochs);
                this.markTruncationComplete(truncationPoints);
            });
        }
    }

    private void processFetchRequest(FetchRequest fetchRequest) {
        block3: {
            Object object;
            scala.collection.mutable.Set partitionsWithError = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
            ObjectRef responseData = ObjectRef.create((Object)((Seq)Seq$.MODULE$.empty()));
            try {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Issuing fetch to broker ", ", request: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.sourceBroker().id()), fetchRequest})));
                responseData.elem = this.fetch(fetchRequest);
                object = BoxedUnit.UNIT;
            }
            catch (Throwable t) {
                if (this.isRunning().get()) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error in fetch to broker ", ", request ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.sourceBroker().id()), fetchRequest})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
                    object = CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(this.partitionStates().partitionSet()).asScala()).foreach((Function1 & Serializable & scala.Serializable)partition -> {
                            this.updatePartitionsWithError$1(partition, partitionsWithError);
                            return BoxedUnit.UNIT;
                        });
                        return this.partitionMapCond().await($this.fetchBackOffMs, TimeUnit.MILLISECONDS);
                    });
                }
                object = BoxedUnit.UNIT;
            }
            this.fetcherStats().requestRate().mark();
            Object object2 = ((Seq)responseData.elem).nonEmpty() ? CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> ((Seq)responseData$1.elem).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                AbstractFetcherThread.$anonfun$processFetchRequest$7(this, fetchRequest, partitionsWithError, x0$2);
                return BoxedUnit.UNIT;
            })) : BoxedUnit.UNIT;
            if (!partitionsWithError.nonEmpty()) break block3;
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("handling partitions with error for %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{partitionsWithError})));
            this.handlePartitionsWithErrors((Iterable<TopicPartition>)partitionsWithError);
        }
    }

    public void addPartitions(Map<TopicPartition, Object> partitionAndOffsets) {
        this.partitionMapLock().lockInterruptibly();
        try {
            Map newPartitionToState = (Map)((TraversableLike)partitionAndOffsets.filter((Function1 & Serializable & scala.Serializable)x0$3 -> BoxesRunTime.boxToBoolean((boolean)AbstractFetcherThread.$anonfun$addPartitions$1(this, x0$3)))).map((Function1 & Serializable & scala.Serializable)x0$4 -> {
                Tuple2 tuple2 = x0$4;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                TopicPartition tp = (TopicPartition)tuple2._1();
                long offset = tuple2._2$mcJ$sp();
                PartitionFetchState fetchState = offset < 0L ? new PartitionFetchState(this.handleOffsetOutOfRange(tp), $this.includeLogTruncation) : new PartitionFetchState(offset, $this.includeLogTruncation);
                Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)fetchState);
                return tuple22;
            }, Map$.MODULE$.canBuildFrom());
            scala.collection.immutable.Map existingPartitionToState = this.states().toMap(Predef$.MODULE$.$conforms());
            this.partitionStates().set((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)existingPartitionToState.$plus$plus((GenTraversableOnce)newPartitionToState)).asJava());
            this.partitionMapCond().signalAll();
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    private void markTruncationComplete(Map<TopicPartition, Object> partitions) {
        scala.collection.immutable.Map newStates = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.partitionStates().partitionStates()).asScala()).map((Function1 & Serializable & scala.Serializable)state -> {
            PartitionFetchState partitionFetchState;
            Option option = partitions.get((Object)state.topicPartition());
            if (option instanceof Some) {
                Some some = (Some)option;
                long offset = BoxesRunTime.unboxToLong((Object)some.value());
                partitionFetchState = new PartitionFetchState(offset, ((PartitionFetchState)state.value()).delay(), false);
            } else if (None$.MODULE$.equals(option)) {
                partitionFetchState = (PartitionFetchState)state.value();
            } else {
                throw new MatchError((Object)option);
            }
            PartitionFetchState maybeTruncationComplete = partitionFetchState;
            return new Tuple2((Object)state.topicPartition(), (Object)maybeTruncationComplete);
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        this.partitionStates().set((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)newStates).asJava());
    }

    public void delayPartitions(Iterable<TopicPartition> partitions, long delay) {
        this.partitionMapLock().lockInterruptibly();
        try {
            partitions.foreach((Function1 & Serializable & scala.Serializable)partition -> {
                AbstractFetcherThread.$anonfun$delayPartitions$1(this, delay, partition);
                return BoxedUnit.UNIT;
            });
            this.partitionMapCond().signalAll();
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public void removePartitions(Set<TopicPartition> topicPartitions) {
        this.partitionMapLock().lockInterruptibly();
        try {
            topicPartitions.foreach((Function1 & Serializable & scala.Serializable)topicPartition -> {
                AbstractFetcherThread.$anonfun$removePartitions$1(this, topicPartition);
                return BoxedUnit.UNIT;
            });
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public int partitionCount() {
        int n;
        this.partitionMapLock().lockInterruptibly();
        try {
            n = this.partitionStates().size();
        }
        finally {
            this.partitionMapLock().unlock();
        }
        return n;
    }

    public static final /* synthetic */ boolean $anonfun$maybeTruncate$3(AbstractFetcherThread $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition tp = (TopicPartition)tuple2._1();
        boolean bl = $this.partitionStates().contains(tp);
        return bl;
    }

    private final void updatePartitionsWithError$1(TopicPartition partition, scala.collection.mutable.Set partitionsWithError$1) {
        partitionsWithError$1.$plus$eq((Object)partition);
        this.partitionStates().moveToEnd(partition);
    }

    public static final /* synthetic */ void $anonfun$processFetchRequest$8(AbstractFetcherThread $this, FetchRequest fetchRequest$1, scala.collection.mutable.Set partitionsWithError$1, TopicPartition topicPartition$1, PartitionData partitionData$1, String topic$1, int partitionId$1, PartitionFetchState currentPartitionFetchState) {
        block14: {
            if (fetchRequest$1.offset(topicPartition$1) != currentPartitionFetchState.fetchOffset()) break block14;
            Errors errors = partitionData$1.error();
            if (Errors.NONE.equals(errors)) {
                BoxedUnit boxedUnit;
                try {
                    MemoryRecords records = partitionData$1.toRecords();
                    long newOffset = BoxesRunTime.unboxToLong((Object)((TraversableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(records.batches()).asScala()).lastOption().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1.nextOffset())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> currentPartitionFetchState.fetchOffset()));
                    $this.fetcherLagStats().getAndMaybePut(topic$1, partitionId$1).lag_$eq(Math.max(0L, partitionData$1.highWatermark() - newOffset));
                    $this.processPartitionData(topicPartition$1, currentPartitionFetchState.fetchOffset(), partitionData$1);
                    int validBytes = records.validBytes();
                    if (validBytes > 0) {
                        $this.partitionStates().updateAndMoveToEnd(topicPartition$1, (Object)new PartitionFetchState(newOffset));
                        $this.fetcherStats().byteRate().mark((long)validBytes);
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                }
                catch (CorruptRecordException ime) {
                    $this.logger().error((Object)("Found invalid messages during fetch for partition [" + topic$1 + "," + partitionId$1 + "] offset " + currentPartitionFetchState.fetchOffset() + " error " + ime.getMessage()));
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                }
                catch (Throwable e) {
                    throw new KafkaException(new StringOps(Predef$.MODULE$.augmentString("error processing data for partition [%s,%d] offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToLong((long)currentPartitionFetchState.fetchOffset())})), e);
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            } else if (Errors.OFFSET_OUT_OF_RANGE.equals(errors)) {
                BoxedUnit boxedUnit;
                try {
                    long newOffset = $this.handleOffsetOutOfRange(topicPartition$1);
                    $this.partitionStates().updateAndMoveToEnd(topicPartition$1, (Object)new PartitionFetchState(newOffset));
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Current offset %d for partition [%s,%d] out of range; reset offset to %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)currentPartitionFetchState.fetchOffset()), topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToLong((long)newOffset)})));
                    boxedUnit = BoxedUnit.UNIT;
                }
                catch (FatalExitError e) {
                    throw e;
                }
                catch (Throwable e) {
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Error getting offset for partition [%s,%d] to broker %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToInteger((int)$this.sourceBroker().id())})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = boxedUnit;
            } else {
                BoxedUnit boxedUnit;
                if ($this.isRunning().get()) {
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Error for partition [%s,%d] to broker %d:%s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToInteger((int)$this.sourceBroker().id()), partitionData$1.exception().get()})));
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit4 = boxedUnit;
            }
        }
    }

    public static final /* synthetic */ void $anonfun$processFetchRequest$7(AbstractFetcherThread $this, FetchRequest fetchRequest$1, scala.collection.mutable.Set partitionsWithError$1, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition topicPartition = (TopicPartition)tuple2._1();
        PartitionData partitionData = (PartitionData)tuple2._2();
        String topic = topicPartition.topic();
        int partitionId = topicPartition.partition();
        Option$.MODULE$.apply($this.partitionStates().stateValue(topicPartition)).foreach((Function1 & Serializable & scala.Serializable)currentPartitionFetchState -> {
            AbstractFetcherThread.$anonfun$processFetchRequest$8($this, fetchRequest$1, partitionsWithError$1, topicPartition, partitionData, topic, partitionId, currentPartitionFetchState);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$addPartitions$1(AbstractFetcherThread $this, Tuple2 x0$3) {
        Tuple2 tuple2 = x0$3;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition tp = (TopicPartition)tuple2._1();
        boolean bl = !$this.partitionStates().contains(tp);
        return bl;
    }

    public static final /* synthetic */ void $anonfun$delayPartitions$2(AbstractFetcherThread $this, long delay$1, TopicPartition partition$1, PartitionFetchState currentPartitionFetchState) {
        block0: {
            if (currentPartitionFetchState.isDelayed()) break block0;
            $this.partitionStates().updateAndMoveToEnd(partition$1, (Object)new PartitionFetchState(currentPartitionFetchState.fetchOffset(), new DelayedItem(delay$1), currentPartitionFetchState.truncatingLog()));
        }
    }

    public static final /* synthetic */ void $anonfun$delayPartitions$1(AbstractFetcherThread $this, long delay$1, TopicPartition partition) {
        Option$.MODULE$.apply($this.partitionStates().stateValue(partition)).foreach((Function1 & Serializable & scala.Serializable)currentPartitionFetchState -> {
            AbstractFetcherThread.$anonfun$delayPartitions$2($this, delay$1, partition, currentPartitionFetchState);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$removePartitions$1(AbstractFetcherThread $this, TopicPartition topicPartition) {
        $this.partitionStates().remove(topicPartition);
        $this.fetcherLagStats().unregister(topicPartition.topic(), topicPartition.partition());
    }

    public AbstractFetcherThread(String name, String clientId, BrokerEndPoint sourceBroker, int fetchBackOffMs, boolean isInterruptible, boolean includeLogTruncation) {
        this.sourceBroker = sourceBroker;
        this.fetchBackOffMs = fetchBackOffMs;
        this.includeLogTruncation = includeLogTruncation;
        super(name, isInterruptible);
        this.partitionStates = new PartitionStates();
        this.partitionMapLock = new ReentrantLock();
        this.partitionMapCond = this.partitionMapLock().newCondition();
        this.metricId = new ClientIdAndBroker(clientId, sourceBroker.host(), sourceBroker.port());
        this.fetcherStats = new FetcherStats(this.metricId());
        this.fetcherLagStats = new FetcherLagStats(this.metricId());
    }

    public static interface FetchRequest {
        public boolean isEmpty();

        public long offset(TopicPartition var1);
    }

    public static interface PartitionData {
        public Errors error();

        public Option<Throwable> exception();

        public MemoryRecords toRecords();

        public long highWatermark();
    }
}

