/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.ai.mcp.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.mcp.spec.McpSchema;
import org.springframework.ai.mcp.spec.McpTransport;
import org.springframework.ai.mcp.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class StdioServerTransport
implements McpTransport {
    private static final Logger logger = LoggerFactory.getLogger(StdioServerTransport.class);
    private final Sinks.Many<McpSchema.JSONRPCMessage> inboundSink;
    private final Sinks.Many<McpSchema.JSONRPCMessage> outboundSink;
    private ObjectMapper objectMapper;
    private Scheduler inboundScheduler;
    private Scheduler outboundScheduler;
    private volatile boolean isClosing = false;
    private final InputStream inputStream;
    private final OutputStream outputStream;
    private final Sinks.One<Void> inboundReady = Sinks.one();
    private final Sinks.One<Void> outboundReady = Sinks.one();

    public StdioServerTransport() {
        this(new ObjectMapper(), System.in, System.out);
    }

    public StdioServerTransport(ObjectMapper objectMapper) {
        this(objectMapper, System.in, System.out);
    }

    public StdioServerTransport(ObjectMapper objectMapper, InputStream inputStream, OutputStream outputStream) {
        Assert.notNull(objectMapper, "The ObjectMapper can not be null");
        Assert.notNull(inputStream, "The InputStream can not be null");
        Assert.notNull(outputStream, "The OutputStream can not be null");
        this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.objectMapper = objectMapper;
        this.inputStream = inputStream;
        this.outputStream = outputStream;
        this.inboundScheduler = Schedulers.newBoundedElastic((int)1, (int)1, (String)"inbound");
        this.outboundScheduler = Schedulers.newBoundedElastic((int)1, (int)1, (String)"outbound");
    }

    @Override
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        return Mono.fromRunnable(() -> {
            this.handleIncomingMessages(handler);
            this.startInboundProcessing();
            this.startOutboundProcessing();
        }).subscribeOn(Schedulers.boundedElastic());
    }

    private void handleIncomingMessages(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> inboundMessageHandler) {
        this.inboundSink.asFlux().flatMap(message -> Mono.just((Object)message).transform(inboundMessageHandler).contextWrite(ctx -> ctx.put((Object)"observation", (Object)"myObservation"))).subscribe();
    }

    @Override
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        return Mono.zip((Mono)this.inboundReady.asMono(), (Mono)this.outboundReady.asMono()).then(Mono.defer(() -> {
            if (this.outboundSink.tryEmitNext((Object)message).isSuccess()) {
                return Mono.empty();
            }
            return Mono.error((Throwable)new RuntimeException("Failed to enqueue message"));
        }));
    }

    private void startInboundProcessing() {
        this.inboundScheduler.schedule(() -> {
            this.inboundReady.tryEmitValue(null);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.inputStream));){
                String line;
                while (!this.isClosing && (line = reader.readLine()) != null) {
                    try {
                        McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
                        if (this.inboundSink.tryEmitNext((Object)message).isSuccess()) continue;
                        if (!this.isClosing) {
                            logger.error("Failed to enqueue message");
                        }
                        break;
                    }
                    catch (Exception e) {
                        if (!this.isClosing) {
                            logger.error("Error processing inbound message", (Throwable)e);
                        }
                        break;
                    }
                }
            }
            catch (IOException e) {
                if (!this.isClosing) {
                    logger.error("Error reading from stdin", (Throwable)e);
                }
            }
            finally {
                this.isClosing = true;
                this.inboundSink.tryEmitComplete();
            }
        });
    }

    private void startOutboundProcessing() {
        Function<Flux, Flux> outboundConsumer = messages -> messages.doOnSubscribe(subscription -> this.outboundReady.tryEmitValue(null)).publishOn(this.outboundScheduler).handle((message, sink) -> {
            block8: {
                if (message != null && !this.isClosing) {
                    try {
                        String jsonMessage = this.objectMapper.writeValueAsString(message);
                        jsonMessage = jsonMessage.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
                        OutputStream outputStream = this.outputStream;
                        synchronized (outputStream) {
                            this.outputStream.write(jsonMessage.getBytes(StandardCharsets.UTF_8));
                            this.outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
                            this.outputStream.flush();
                        }
                        sink.next(message);
                    }
                    catch (IOException e) {
                        if (!this.isClosing) {
                            logger.error("Error writing message", (Throwable)e);
                            sink.error((Throwable)new RuntimeException(e));
                        }
                        break block8;
                    }
                }
                if (this.isClosing) {
                    sink.complete();
                }
            }
        }).doOnComplete(() -> {
            this.isClosing = true;
            this.outboundSink.tryEmitComplete();
        }).doOnError(e -> {
            if (!this.isClosing) {
                logger.error("Error in outbound processing", e);
                this.isClosing = true;
                this.outboundSink.tryEmitComplete();
            }
        }).map(msg -> (McpSchema.JSONRPCMessage)msg);
        outboundConsumer.apply(this.outboundSink.asFlux()).subscribe();
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            logger.debug("Initiating graceful shutdown");
        }).then(Mono.fromRunnable(() -> {
            try {
                this.inboundScheduler.dispose();
                this.outboundScheduler.dispose();
                if (!this.inboundScheduler.isDisposed()) {
                    this.inboundScheduler.disposeGracefully().block(Duration.ofSeconds(5L));
                }
                if (!this.outboundScheduler.isDisposed()) {
                    this.outboundScheduler.disposeGracefully().block(Duration.ofSeconds(5L));
                }
                logger.info("Graceful shutdown completed");
            }
            catch (Exception e) {
                logger.error("Error during graceful shutdown", (Throwable)e);
            }
        })).then().subscribeOn(Schedulers.boundedElastic());
    }

    @Override
    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }
}

