package org.springframework.kafka.requestreply;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;

/* loaded from: input_file:WEB-INF/lib/spring-kafka-2.1.4.RELEASE.jar:org/springframework/kafka/requestreply/ReplyingKafkaTemplate.class */
public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R> {
    private static final long DEFAULT_REPLY_TIMEOUT = 5000;
    private final GenericMessageListenerContainer<K, R> replyContainer;
    private final ConcurrentMap<CorrelationKey, RequestReplyFuture<K, V, R>> futures;
    private TaskScheduler scheduler;
    private int phase;
    private boolean autoStartup;
    private long replyTimeout;
    private volatile boolean schedulerSet;
    private volatile boolean running;

    /* loaded from: input_file:WEB-INF/lib/spring-kafka-2.1.4.RELEASE.jar:org/springframework/kafka/requestreply/ReplyingKafkaTemplate$TemplateRequestReplyFuture.class */
    public static class TemplateRequestReplyFuture<K, V, R> extends RequestReplyFuture<K, V, R> {
        TemplateRequestReplyFuture() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.springframework.kafka.requestreply.RequestReplyFuture
        public void setSendFuture(ListenableFuture<SendResult<K, V>> listenableFuture) {
            super.setSendFuture(listenableFuture);
        }
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> genericMessageListenerContainer) {
        this(producerFactory, genericMessageListenerContainer, false);
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> genericMessageListenerContainer, boolean z) {
        super(producerFactory, z);
        this.futures = new ConcurrentHashMap();
        this.scheduler = new ThreadPoolTaskScheduler();
        this.autoStartup = true;
        this.replyTimeout = 5000L;
        Assert.notNull(genericMessageListenerContainer, "'replyContainer' cannot be null");
        this.replyContainer = genericMessageListenerContainer;
        this.replyContainer.setupMessageListener(this);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        Assert.notNull(taskScheduler, "'scheduler' cannot be null");
        this.scheduler = taskScheduler;
        this.schedulerSet = true;
    }

    public void setReplyTimeout(long j) {
        Assert.isTrue(j >= 0, "'replyTimeout' must be >= 0");
        this.replyTimeout = j;
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
        return this.replyContainer.getAssignedPartitions();
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() throws Exception {
        if (this.schedulerSet) {
            return;
        }
        ((ThreadPoolTaskScheduler) this.scheduler).initialize();
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void start() {
        if (this.running) {
            return;
        }
        try {
            afterPropertiesSet();
            this.replyContainer.start();
            this.running = true;
        } catch (Exception e) {
            throw new KafkaException("Failed to initialize", e);
        }
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.replyContainer.stop();
            this.futures.clear();
        }
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    @Override // org.springframework.kafka.requestreply.ReplyingKafkaOperations
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> producerRecord) {
        Assert.state(this.running, "Template has not been start()ed");
        CorrelationKey createCorrelationId = createCorrelationId(producerRecord);
        Assert.notNull(createCorrelationId, "the created 'correlationId' cannot be null");
        producerRecord.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, createCorrelationId.getCorrelationId()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Sending: " + producerRecord + " with correlationId: " + createCorrelationId);
        }
        TemplateRequestReplyFuture templateRequestReplyFuture = new TemplateRequestReplyFuture();
        this.futures.put(createCorrelationId, templateRequestReplyFuture);
        try {
            templateRequestReplyFuture.setSendFuture(send(producerRecord));
            this.scheduler.schedule(() -> {
                RequestReplyFuture<K, V, R> remove = this.futures.remove(createCorrelationId);
                if (remove != null) {
                    if (this.logger.isWarnEnabled()) {
                        this.logger.warn("Reply timed out for: " + producerRecord + " with correlationId: " + createCorrelationId);
                    }
                    remove.setException(new KafkaException("Reply timed out"));
                }
            }, Instant.now().plusMillis(this.replyTimeout));
            return templateRequestReplyFuture;
        } catch (Exception e) {
            this.futures.remove(createCorrelationId);
            throw new KafkaException("Send failed", e);
        }
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() throws Exception {
        if (this.schedulerSet) {
            return;
        }
        ((ThreadPoolTaskScheduler) this.scheduler).destroy();
    }

    protected CorrelationKey createCorrelationId(ProducerRecord<K, V> producerRecord) {
        UUID randomUUID = UUID.randomUUID();
        byte[] bArr = new byte[16];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.putLong(randomUUID.getMostSignificantBits());
        wrap.putLong(randomUUID.getLeastSignificantBits());
        return new CorrelationKey(bArr);
    }

    @Override // org.springframework.kafka.listener.GenericMessageListener
    public void onMessage(List<ConsumerRecord<K, R>> list) {
        list.forEach(consumerRecord -> {
            Iterator<Header> it = consumerRecord.headers().iterator();
            CorrelationKey correlationKey = null;
            while (correlationKey == null && it.hasNext()) {
                Header next = it.next();
                if (next.key().equals(KafkaHeaders.CORRELATION_ID)) {
                    correlationKey = new CorrelationKey(next.value());
                }
            }
            if (correlationKey == null) {
                this.logger.error("No correlationId found in reply: " + consumerRecord + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + KafkaHeaders.CORRELATION_ID + "' header");
                return;
            }
            RequestReplyFuture<K, V, R> remove = this.futures.remove(correlationKey);
            if (remove == null) {
                this.logger.error("No pending reply: " + consumerRecord + " with correlationId: " + correlationKey + ", perhaps timed out");
                return;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Received: " + consumerRecord + " with correlationId: " + correlationKey);
            }
            remove.set(consumerRecord);
        });
    }
}
