package org.springframework.ai.mcp.spec;

import com.fasterxml.jackson.core.type.TypeReference;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.mcp.spec.McpSchema;
import org.springframework.ai.mcp.util.Assert;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:org/springframework/ai/mcp/spec/DefaultMcpSession.class */
public class DefaultMcpSession implements McpSession {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMcpSession.class);
    private final Duration requestTimeout;
    private final McpTransport transport;
    private final ConcurrentHashMap<Object, MonoSink<McpSchema.JSONRPCResponse>> pendingResponses = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, RequestHandler> requestHandlers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, NotificationHandler> notificationHandlers = new ConcurrentHashMap<>();
    private final String sessionPrefix = UUID.randomUUID().toString().substring(0, 8);
    private final AtomicLong requestCounter = new AtomicLong(0);
    private final Disposable connection;

    @FunctionalInterface
    /* loaded from: input_file:org/springframework/ai/mcp/spec/DefaultMcpSession$NotificationHandler.class */
    public interface NotificationHandler {
        Mono<Void> handle(Object obj);
    }

    @FunctionalInterface
    /* loaded from: input_file:org/springframework/ai/mcp/spec/DefaultMcpSession$RequestHandler.class */
    public interface RequestHandler {
        Mono<Object> handle(Object obj);
    }

    public DefaultMcpSession(Duration duration, McpTransport mcpTransport, Map<String, RequestHandler> map, Map<String, NotificationHandler> map2) {
        Assert.notNull(duration, "The requstTimeout can not be null");
        Assert.notNull(mcpTransport, "The transport can not be null");
        Assert.notNull(map, "The requestHandlers can not be null");
        Assert.notNull(map2, "The notificationHandlers can not be null");
        this.requestTimeout = duration;
        this.transport = mcpTransport;
        this.requestHandlers.putAll(map);
        this.notificationHandlers.putAll(map2);
        this.connection = this.transport.connect(mono -> {
            return mono.doOnNext(jSONRPCMessage -> {
                if (jSONRPCMessage instanceof McpSchema.JSONRPCResponse) {
                    McpSchema.JSONRPCResponse jSONRPCResponse = (McpSchema.JSONRPCResponse) jSONRPCMessage;
                    logger.info("Received Response: {}", jSONRPCResponse);
                    MonoSink<McpSchema.JSONRPCResponse> remove = this.pendingResponses.remove(jSONRPCResponse.id());
                    if (remove == null) {
                        logger.warn("Unexpected response for unkown id {}", jSONRPCResponse.id());
                        return;
                    } else {
                        remove.success(jSONRPCResponse);
                        return;
                    }
                }
                if (jSONRPCMessage instanceof McpSchema.JSONRPCRequest) {
                    McpSchema.JSONRPCRequest jSONRPCRequest = (McpSchema.JSONRPCRequest) jSONRPCMessage;
                    logger.info("Received request: {}", jSONRPCRequest);
                    handleIncomingRequest(jSONRPCRequest).subscribe(jSONRPCResponse2 -> {
                        mcpTransport.sendMessage(jSONRPCResponse2).subscribe();
                    }, th -> {
                        mcpTransport.sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jSONRPCRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, th.getMessage(), null))).subscribe();
                    });
                } else if (jSONRPCMessage instanceof McpSchema.JSONRPCNotification) {
                    McpSchema.JSONRPCNotification jSONRPCNotification = (McpSchema.JSONRPCNotification) jSONRPCMessage;
                    logger.info("Received notification: {}", jSONRPCNotification);
                    handleIncomingNotification(jSONRPCNotification).subscribe((Consumer) null, th2 -> {
                        logger.error("Error handling notification: {}", th2.getMessage());
                    });
                }
            });
        }).subscribe();
    }

    private Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCRequest jSONRPCRequest) {
        return Mono.defer(() -> {
            RequestHandler requestHandler = this.requestHandlers.get(jSONRPCRequest.method());
            return requestHandler == null ? Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jSONRPCRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND, "Method not found: " + jSONRPCRequest.method(), null))) : requestHandler.handle(jSONRPCRequest.params()).map(obj -> {
                return new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jSONRPCRequest.id(), obj, null);
            }).onErrorResume(th -> {
                return Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jSONRPCRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, th.getMessage(), null)));
            });
        });
    }

    private Mono<Void> handleIncomingNotification(McpSchema.JSONRPCNotification jSONRPCNotification) {
        return Mono.defer(() -> {
            NotificationHandler notificationHandler = this.notificationHandlers.get(jSONRPCNotification.method());
            if (notificationHandler != null) {
                return notificationHandler.handle(jSONRPCNotification.params());
            }
            logger.error("No handler registered for notification method: {}", jSONRPCNotification.method());
            return Mono.empty();
        });
    }

    private String generateRequestId() {
        return this.sessionPrefix + "-" + this.requestCounter.getAndIncrement();
    }

    @Override // org.springframework.ai.mcp.spec.McpSession
    public <T> Mono<T> sendRequest(String str, Object obj, TypeReference<T> typeReference) {
        String generateRequestId = generateRequestId();
        return Mono.create(monoSink -> {
            this.pendingResponses.put(generateRequestId, monoSink);
            this.transport.sendMessage(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, str, generateRequestId, obj)).subscribe(r1 -> {
            }, th -> {
                this.pendingResponses.remove(generateRequestId);
                monoSink.error(th);
            });
        }).timeout(this.requestTimeout).handle((jSONRPCResponse, synchronousSink) -> {
            if (jSONRPCResponse.error() != null) {
                synchronousSink.error(new McpError(jSONRPCResponse.error()));
            } else if (typeReference.getType().equals(Void.class)) {
                synchronousSink.complete();
            } else {
                synchronousSink.next(this.transport.unmarshalFrom(jSONRPCResponse.result(), typeReference));
            }
        });
    }

    @Override // org.springframework.ai.mcp.spec.McpSession
    public Mono<Void> sendNotification(String str, Map<String, Object> map) {
        return this.transport.sendMessage(new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, str, map));
    }

    @Override // org.springframework.ai.mcp.spec.McpSession
    public Mono<Void> closeGracefully() {
        this.connection.dispose();
        return this.transport.closeGracefully();
    }

    @Override // org.springframework.ai.mcp.spec.McpSession
    public void close() {
        this.connection.dispose();
        this.transport.close();
    }
}
