/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.tezplugins;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.LlapUmbilicalPolicyProvider;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapTaskCommunicator
extends TezTaskCommunicatorImpl {
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
    private static final boolean isInfoEnabled = LOG.isInfoEnabled();
    private final ConcurrentMap<LlapDaemonProtocolProtos.QueryIdentifierProto, ByteBuffer> credentialMap;
    private final EntityTracker entityTracker = new EntityTracker();
    private final SourceStateTracker sourceStateTracker;
    private final Set<LlapNodeId> nodesForQuery = new HashSet<LlapNodeId>();
    private LlapProtocolClientProxy communicator;
    private long deleteDelayOnDagComplete;
    private final LlapTaskUmbilicalProtocol umbilical;
    private final Token<LlapTokenIdentifier> token;
    private final int appAttemptId;
    private final String user;
    private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<LlapNodeId, Long>();
    private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<LlapNodeId, PingingNodeInfo>();
    private final LlapRegistryService serviceRegistry;
    private volatile LlapDaemonProtocolProtos.QueryIdentifierProto currentQueryIdentifierProto;
    private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0L);

    public LlapTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        super(taskCommunicatorContext);
        Token llapToken;
        Credentials credentials = taskCommunicatorContext.getAMCredentials();
        this.token = credentials != null ? (llapToken = credentials.getToken(LlapTokenIdentifier.KIND_NAME)) : null;
        if (LOG.isInfoEnabled()) {
            LOG.info("Task communicator with a token " + this.token);
        }
        Preconditions.checkState((this.token != null == UserGroupInformation.isSecurityEnabled() ? 1 : 0) != 0);
        this.serviceRegistry = LlapRegistryService.getClient((Configuration)this.conf);
        this.umbilical = new LlapTaskUmbilicalProtocolImpl(this.getUmbilical());
        this.user = System.getenv(ApplicationConstants.Environment.USER.name());
        this.appAttemptId = taskCommunicatorContext.getApplicationAttemptId().getAttemptId();
        this.credentialMap = new ConcurrentHashMap<LlapDaemonProtocolProtos.QueryIdentifierProto, ByteBuffer>();
        this.sourceStateTracker = new SourceStateTracker(this.getContext(), this);
    }

    public void initialize() throws Exception {
        super.initialize();
        Configuration conf = this.getConf();
        int numThreads = HiveConf.getIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
        this.communicator = this.createLlapProtocolClientProxy(numThreads, conf);
        this.deleteDelayOnDagComplete = HiveConf.getTimeVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, (TimeUnit)TimeUnit.SECONDS);
        LOG.info("Running LlapTaskCommunicator with fileCleanupDelay=" + this.deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads);
        this.communicator.init(conf);
    }

    public void start() {
        super.start();
        this.communicator.start();
    }

    public void shutdown() {
        super.shutdown();
        if (this.communicator != null) {
            this.communicator.stop();
        }
    }

    protected void startRpcServer() {
        Configuration conf = this.getConf();
        try {
            JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
            jobTokenSecretManager.addTokenForJob(this.tokenIdentifier, this.sessionToken);
            int numHandlers = conf.getInt("tez.am.task.listener.thread-count", 30);
            this.server = new RPC.Builder(conf).setProtocol(LlapTaskUmbilicalProtocol.class).setBindAddress("0.0.0.0").setPort(0).setInstance((Object)this.umbilical).setNumHandlers(numHandlers).setSecretManager((SecretManager)jobTokenSecretManager).build();
            if (conf.getBoolean("hadoop.security.authorization", false)) {
                this.server.refreshServiceAcl(conf, (PolicyProvider)new LlapUmbilicalPolicyProvider());
            }
            this.server.start();
            this.address = NetUtils.getConnectAddress((Server)this.server);
            LOG.info("Started LlapUmbilical: " + this.umbilical.getClass().getName() + " at address: " + this.address + " with numHandlers=" + numHandlers);
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    protected LlapProtocolClientProxy createLlapProtocolClientProxy(int numThreads, Configuration conf) {
        return new LlapProtocolClientProxy(numThreads, conf, this.token);
    }

    public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
        super.registerRunningContainer(containerId, hostname, port);
        this.entityTracker.registerContainer(containerId, hostname, port);
    }

    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
        super.registerContainerEnd(containerId, endReason, diagnostics);
        if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing containerEnd for container {} caused by internal preemption", (Object)containerId);
            TezTaskAttemptID taskAttemptId = this.entityTracker.getTaskAttemptIdForContainer(containerId);
            if (taskAttemptId != null) {
                this.sendTaskTerminated(taskAttemptId, true);
            }
        }
        this.entityTracker.unregisterContainer(containerId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority) {
        LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto;
        int port;
        String host;
        TezTaskCommunicatorImpl.ContainerInfo containerInfo;
        super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
        int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
        if (this.currentQueryIdentifierProto == null || dagId != this.currentQueryIdentifierProto.getDagIdentifier()) {
            this.resetCurrentDag(dagId);
        }
        if ((containerInfo = this.getContainerInfo(containerId)) != null) {
            TezTaskCommunicatorImpl.ContainerInfo containerInfo2 = containerInfo;
            synchronized (containerInfo2) {
                host = containerInfo.host;
                port = containerInfo.port;
            }
        } else {
            throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID());
        }
        LlapNodeId nodeId = LlapNodeId.getInstance((String)host, (int)port);
        this.registerKnownNode(nodeId);
        this.entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
        this.nodesForQuery.add(nodeId);
        this.sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs());
        LlapDaemonProtocolProtos.FragmentRuntimeInfo fragmentRuntimeInfo = this.sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
        try {
            requestProto = this.constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to construct request", e);
        }
        this.getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
        this.communicator.sendSubmitWork(requestProto, host, port, (LlapProtocolClientProxy.ExecuteRequestCallback)new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>(){

            public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
                if (response.hasSubmissionState()) {
                    LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState();
                    if (ss.equals((Object)LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
                        LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Service Busy");
                        LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
                        return;
                    }
                } else {
                    throw new RuntimeException("SubmissionState in response is expected!");
                }
                LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
            }

            public void indicateError(Throwable t) {
                if (t instanceof ServiceException) {
                    ServiceException se = (ServiceException)t;
                    t = se.getCause();
                }
                if (t instanceof RemoteException) {
                    LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
                    LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.toString());
                } else if (t instanceof IOException) {
                    LOG.info("Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId + ", Communication Error");
                    LlapTaskCommunicator.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
                } else {
                    LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
                    LlapTaskCommunicator.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.getMessage());
                }
            }
        });
    }

    public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason endReason, String diagnostics) {
        super.unregisterRunningTaskAttempt(taskAttemptId, endReason, diagnostics);
        if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
            LOG.info("Processing taskEnd for task {} caused by internal preemption", (Object)taskAttemptId);
            this.sendTaskTerminated(taskAttemptId, false);
        }
        this.entityTracker.unregisterTaskAttempt(taskAttemptId);
    }

    private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, boolean invokedByContainerEnd) {
        LOG.info("Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", (Object)taskAttemptId.toString(), (Object)(invokedByContainerEnd ? "containerEnd" : "taskEnd"));
        LlapNodeId nodeId = this.entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
        if (nodeId != null) {
            LlapDaemonProtocolProtos.TerminateFragmentRequestProto request = LlapDaemonProtocolProtos.TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(this.currentQueryIdentifierProto).setFragmentIdentifierString(taskAttemptId.toString()).build();
            this.communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), (LlapProtocolClientProxy.ExecuteRequestCallback)new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto>(){

                public void setResponse(LlapDaemonProtocolProtos.TerminateFragmentResponseProto response) {
                }

                public void indicateError(Throwable t) {
                    LOG.warn("Failed to send terminate fragment request for {}", (Object)taskAttemptId.toString());
                }
            });
        } else {
            LOG.info("Not sending terminate request for fragment {} since it's node is not known. Already unregistered", (Object)taskAttemptId.toString());
        }
    }

    public void dagComplete(final int dagIdentifier) {
        LlapDaemonProtocolProtos.QueryCompleteRequestProto request = LlapDaemonProtocolProtos.QueryCompleteRequestProto.newBuilder().setQueryIdentifier(this.constructQueryIdentifierProto(dagIdentifier)).setDeleteDelay(this.deleteDelayOnDagComplete).build();
        for (final LlapNodeId llapNodeId : this.nodesForQuery) {
            LOG.info("Sending dagComplete message for {}, to {}", (Object)dagIdentifier, (Object)llapNodeId);
            this.communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), (LlapProtocolClientProxy.ExecuteRequestCallback)new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>(){

                public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
                }

                public void indicateError(Throwable t) {
                    LOG.warn("Failed to indicate dag complete dagId={} to node {}", (Object)dagIdentifier, (Object)llapNodeId);
                }
            });
        }
        this.nodesForQuery.clear();
    }

    public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        this.sourceStateTracker.sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
    }

    public void sendStateUpdate(final LlapNodeId nodeId, final LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) {
        this.communicator.sendSourceStateUpdate(request, nodeId, (LlapProtocolClientProxy.ExecuteRequestCallback)new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto>(){

            public void setResponse(LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto response) {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void indicateError(Throwable t) {
                LOG.error("Failed to send state update to node: {}, Killing all attempts running on node. Attempted StateUpdate={}", new Object[]{nodeId, request, t});
                BiMap<ContainerId, TezTaskAttemptID> biMap = LlapTaskCommunicator.this.entityTracker.getContainerAttemptMapForNode(nodeId);
                if (biMap != null) {
                    BiMap<ContainerId, TezTaskAttemptID> biMap2 = biMap;
                    synchronized (biMap2) {
                        for (Map.Entry entry : biMap.entrySet()) {
                            LOG.info("Sending a kill for attempt {}, due to a communication failure while sending a finishable state update", entry.getValue());
                            LlapTaskCommunicator.this.getContext().taskKilled((TezTaskAttemptID)entry.getValue(), TaskAttemptEndReason.NODE_FAILED, "Failed to send finishable state update to node " + nodeId);
                        }
                    }
                }
            }
        });
    }

    public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
        return this.constructLogUrl(containerNodeId);
    }

    public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
        return this.constructLogUrl(containerNodeId);
    }

    private String constructLogUrl(NodeId containerNodeId) {
        Set instanceSet = null;
        try {
            instanceSet = this.serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
        }
        catch (IOException e) {
            LOG.warn("Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", (Object)containerNodeId, (Object)e.getMessage());
            return null;
        }
        if (instanceSet != null) {
            ServiceInstance matchedInstance = null;
            for (ServiceInstance instance : instanceSet) {
                if (instance.getRpcPort() != containerNodeId.getPort()) continue;
                matchedInstance = instance;
                break;
            }
            if (matchedInstance != null) {
                return this.constructLlapLogUrl(matchedInstance);
            }
        }
        return null;
    }

    private String constructLlapLogUrl(ServiceInstance serviceInstance) {
        return serviceInstance.getServicesAddress() + "/logs";
    }

    public void registerKnownNode(LlapNodeId nodeId) {
        Long old = this.knownNodeMap.putIfAbsent(nodeId, TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS));
        if (old == null && isInfoEnabled) {
            LOG.info("Added new known node: {}", (Object)nodeId);
        }
    }

    public void registerPingingNode(LlapNodeId nodeId) {
        long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
        PingingNodeInfo ni = new PingingNodeInfo(currentTs);
        PingingNodeInfo old = this.pingedNodeMap.put(nodeId, ni);
        if (old == null) {
            if (isInfoEnabled) {
                LOG.info("Added new pinging node: [{}]", (Object)nodeId);
            }
        } else {
            old.pingCount.incrementAndGet();
        }
        if (!this.knownNodeMap.containsKey(nodeId)) {
            if (old == null) {
                LOG.warn("Received ping from unknownNode: [{}], count={}", (Object)nodeId, (Object)ni.pingCount.get());
            } else if (currentTs > old.logTimestamp.get() + 5000L) {
                LOG.warn("Received ping from unknownNode: [{}], count={}", (Object)nodeId, (Object)old.pingCount.get());
                old.logTimestamp.set(currentTs);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void nodePinged(String hostname, int port) {
        LlapNodeId nodeId = LlapNodeId.getInstance((String)hostname, (int)port);
        this.registerPingingNode(nodeId);
        BiMap<ContainerId, TezTaskAttemptID> biMap = this.entityTracker.getContainerAttemptMapForNode(nodeId);
        if (biMap != null) {
            BiMap<ContainerId, TezTaskAttemptID> biMap2 = biMap;
            synchronized (biMap2) {
                for (Map.Entry entry : biMap.entrySet()) {
                    this.getContext().taskAlive((TezTaskAttemptID)entry.getValue());
                    this.getContext().containerAlive((ContainerId)entry.getKey());
                }
            }
        } else {
            long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
            if (currentTs > this.nodeNotFoundLogTime.get() + 5000L) {
                LOG.warn("Received ping from node without any registered tasks or containers: " + hostname + ":" + port + ". Could be caused by pre-emption by the AM," + " or a mismatched hostname. Enable debug logging for mismatched host names");
                this.nodeNotFoundLogTime.set(currentTs);
            }
        }
    }

    private void resetCurrentDag(int newDagId) {
        this.currentQueryIdentifierProto = this.constructQueryIdentifierProto(newDagId);
        this.sourceStateTracker.resetState(newDagId);
        this.nodesForQuery.clear();
        LOG.info("CurrentDagId set to: " + newDagId + ", name=" + this.getContext().getCurrentDagInfo().getName());
    }

    private LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, LlapDaemonProtocolProtos.FragmentRuntimeInfo fragmentRuntimeInfo) throws IOException {
        LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder builder = LlapDaemonProtocolProtos.SubmitWorkRequestProto.newBuilder();
        builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
        builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
        builder.setContainerIdString(containerId.toString());
        builder.setAmHost(this.getAddress().getHostName());
        builder.setAmPort(this.getAddress().getPort());
        Preconditions.checkState((this.currentQueryIdentifierProto.getDagIdentifier() == taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId() ? 1 : 0) != 0);
        ByteBuffer credentialsBinary = (ByteBuffer)this.credentialMap.get(this.currentQueryIdentifierProto);
        if (credentialsBinary == null) {
            credentialsBinary = this.serializeCredentials(this.getContext().getCurrentDagInfo().getCredentials());
            this.credentialMap.putIfAbsent(this.currentQueryIdentifierProto, credentialsBinary.duplicate());
        } else {
            credentialsBinary = credentialsBinary.duplicate();
        }
        builder.setCredentialsBinary(ByteString.copyFrom((ByteBuffer)credentialsBinary));
        builder.setWorkSpec(LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto((TaskSpec)taskSpec, (int)this.appAttemptId, (String)this.getTokenIdentifier(), null, (String)this.user)).build());
        builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
        return builder.build();
    }

    private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
        Credentials containerCredentials = new Credentials();
        containerCredentials.addAll(credentials);
        DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
        containerCredentials.writeTokenStorageToStream((DataOutputStream)containerTokens_dob);
        return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
    }

    private LlapDaemonProtocolProtos.QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) {
        return LlapDaemonProtocolProtos.QueryIdentifierProto.newBuilder().setAppIdentifier(this.getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier).build();
    }

    @VisibleForTesting
    static final class EntityTracker {
        @VisibleForTesting
        final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<TezTaskAttemptID, LlapNodeId>();
        @VisibleForTesting
        final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<ContainerId, LlapNodeId>();
        @VisibleForTesting
        final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>>();

        EntityTracker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
            BiMap<ContainerId, TezTaskAttemptID> usedInstance;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for node: " + host + ":" + port);
            }
            LlapNodeId llapNodeId = LlapNodeId.getInstance((String)host, (int)port);
            this.attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
            this.registerContainer(containerId, host, port);
            BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
            BiMap<ContainerId, TezTaskAttemptID> old = this.nodeMap.putIfAbsent(llapNodeId, tmpMap);
            BiMap<ContainerId, TezTaskAttemptID> biMap = usedInstance = old == null ? tmpMap : old;
            synchronized (biMap) {
                usedInstance.put((Object)containerId, (Object)taskAttemptId);
            }
            this.nodeMap.putIfAbsent(llapNodeId, usedInstance);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
            LlapNodeId llapNodeId = (LlapNodeId)this.attemptToNodeMap.remove(attemptId);
            if (llapNodeId == null) {
                return;
            }
            BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
            ContainerId matched = null;
            if (bMap != null) {
                BiMap biMap = bMap;
                synchronized (biMap) {
                    matched = (ContainerId)bMap.inverse().remove((Object)attemptId);
                }
                if (bMap.isEmpty()) {
                    this.nodeMap.remove(llapNodeId);
                }
            }
            if (matched != null) {
                this.containerToNodeMap.remove(matched);
            }
        }

        void registerContainer(ContainerId containerId, String hostname, int port) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port);
            }
            this.containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance((String)hostname, (int)port));
        }

        LlapNodeId getNodeIdForContainer(ContainerId containerId) {
            return (LlapNodeId)this.containerToNodeMap.get(containerId);
        }

        LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
            return (LlapNodeId)this.attemptToNodeMap.get(taskAttemptId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
            LlapNodeId llapNodeId = this.getNodeIdForTaskAttempt(taskAttemptId);
            if (llapNodeId != null) {
                BiMap bMap = ((BiMap)this.nodeMap.get(llapNodeId)).inverse();
                if (bMap != null) {
                    BiMap biMap = bMap;
                    synchronized (biMap) {
                        return (ContainerId)bMap.get((Object)taskAttemptId);
                    }
                }
                return null;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
            LlapNodeId llapNodeId = this.getNodeIdForContainer(containerId);
            if (llapNodeId != null) {
                BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
                if (bMap != null) {
                    BiMap biMap = bMap;
                    synchronized (biMap) {
                        return (TezTaskAttemptID)bMap.get((Object)containerId);
                    }
                }
                return null;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void unregisterContainer(ContainerId containerId) {
            LlapNodeId llapNodeId = (LlapNodeId)this.containerToNodeMap.remove(containerId);
            if (llapNodeId == null) {
                return;
            }
            BiMap bMap = (BiMap)this.nodeMap.get(llapNodeId);
            TezTaskAttemptID matched = null;
            if (bMap != null) {
                BiMap biMap = bMap;
                synchronized (biMap) {
                    matched = (TezTaskAttemptID)bMap.remove((Object)containerId);
                }
                if (bMap.isEmpty()) {
                    this.nodeMap.remove(llapNodeId);
                }
            }
            if (matched != null) {
                this.attemptToNodeMap.remove(matched);
            }
        }

        BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
            BiMap biMap = (BiMap)this.nodeMap.get(llapNodeId);
            return biMap;
        }
    }

    protected class LlapTaskUmbilicalProtocolImpl
    implements LlapTaskUmbilicalProtocol {
        private final TezTaskUmbilicalProtocol tezUmbilical;

        public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezUmbilical) {
            this.tezUmbilical = tezUmbilical;
        }

        public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
            return this.tezUmbilical.canCommit(taskid);
        }

        public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException {
            return this.tezUmbilical.heartbeat(request);
        }

        public void nodeHeartbeat(Text hostname, int port) throws IOException {
            LlapTaskCommunicator.this.nodePinged(hostname.toString(), port);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received heartbeat from [" + hostname + ":" + port + "]");
            }
        }

        public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
            LlapTaskCommunicator.this.getContext().taskKilled(taskAttemptId, TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
            LlapTaskCommunicator.this.entityTracker.unregisterTaskAttempt(taskAttemptId);
        }

        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return 1L;
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
        }
    }

    private static class PingingNodeInfo {
        final AtomicLong logTimestamp;
        final AtomicInteger pingCount;

        PingingNodeInfo(long currentTs) {
            this.logTimestamp = new AtomicLong(currentTs);
            this.pingCount = new AtomicInteger(1);
        }
    }
}

